Просмотр исходного кода

添加 Telegram 群组管理功能,包括创建群组的控制器、服务和路由,更新相关 DTO,增强输入验证和错误处理,提升代码可读性和可维护性。

wuyi 1 месяц назад
Родитель
Сommit
40d452031b

+ 2 - 0
src/app.ts

@@ -19,6 +19,7 @@ import taskRoutes from './routes/task.routes'
 import testRoutes from './routes/test.routes'
 import senderRoutes from './routes/sender.routes'
 import chatGroupRoutes from './routes/chat-group.routes'
+import tgGroupRoutes from './routes/tg-group.routes'
 
 const options: FastifyEnvOptions = {
   schema: schema,
@@ -93,6 +94,7 @@ export const createApp = async () => {
   app.register(taskRoutes, { prefix: '/api/tasks' })
   app.register(senderRoutes, { prefix: '/api/senders' })
   app.register(chatGroupRoutes, { prefix: '/api/chat-groups' })
+  app.register(tgGroupRoutes, { prefix: '/api/tg-groups' })
   app.register(testRoutes, { prefix: '/api/test' })
   const dataSource = createDataSource(app)
   await dataSource.initialize()

+ 52 - 0
src/controllers/tg-group.controller.ts

@@ -0,0 +1,52 @@
+import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify'
+import { TgGroupService } from '../services/tg-group.service'
+import { CreateTgGroupBody } from '../dto/tg-group.dto'
+
+export class TgGroupController {
+  private readonly tgGroupService: TgGroupService
+  private readonly app: FastifyInstance
+
+  constructor(app: FastifyInstance) {
+    this.app = app
+    this.tgGroupService = new TgGroupService(app)
+  }
+
+  async createGroup(request: FastifyRequest<{ Body: CreateTgGroupBody }>, reply: FastifyReply) {
+    try {
+      const { groupName, groupDescription, groupType, initMsg, senderId } = request.body
+
+      if (!groupName || !groupType) {
+        return reply.code(400).send({
+          success: false,
+          message: 'groupName 和 groupType 为必填参数'
+        })
+      }
+
+      const validGroupTypes = ['megagroup', 'channel']
+      if (!validGroupTypes.includes(groupType)) {
+        return reply.code(400).send({
+          success: false,
+          message: `groupType 必须是以下之一: ${validGroupTypes.join(', ')}`
+        })
+      }
+
+      const result = await this.tgGroupService.createTgGroup(
+        groupName,
+        groupDescription || '',
+        groupType,
+        initMsg,
+        senderId
+      )
+
+      return reply.code(result.success ? 200 : 500).send(result)
+    } catch (error) {
+      const errorMessage = error instanceof Error ? error.message : '未知错误'
+      this.app.log.error(`创建群组接口异常: ${errorMessage}`)
+      return reply.code(500).send({
+        success: false,
+        message: '创建群组失败',
+        error: errorMessage
+      })
+    }
+  }
+}

+ 7 - 0
src/dto/tg-group.dto.ts

@@ -0,0 +1,7 @@
+export interface CreateTgGroupBody {
+  groupName: string
+  groupDescription?: string
+  groupType: 'megagroup' | 'channel'
+  initMsg?: string
+  senderId?: string
+}

+ 9 - 0
src/routes/tg-group.routes.ts

@@ -0,0 +1,9 @@
+import { FastifyInstance } from 'fastify'
+import { TgGroupController } from '../controllers/tg-group.controller'
+import { CreateTgGroupBody } from '../dto/tg-group.dto'
+
+export default async function tgGroupRoutes(fastify: FastifyInstance) {
+  const controller = new TgGroupController(fastify)
+
+  fastify.post<{ Body: CreateTgGroupBody }>('/create', controller.createGroup.bind(controller))
+}

+ 19 - 8
src/services/chat-group.service.ts

@@ -15,14 +15,24 @@ export class ChatGroupService {
 
   async upsertGroup(payload: CreateChatGroupRecordBody): Promise<ChatGroup> {
     const existing = await this.chatGroupRepository.findOne({ where: { chatId: payload.chatId } })
-    const data = {
-      ...existing,
-      ...payload,
-      chatId: payload.chatId,
-      accessHash: payload.accessHash
+    
+    if (existing) {
+      // 更新现有记录
+      Object.assign(existing, {
+        ...payload,
+        chatId: String(payload.chatId),
+        accessHash: String(payload.accessHash)
+      })
+      return await this.chatGroupRepository.save(existing)
+    } else {
+      // 创建新记录
+      const entity = this.chatGroupRepository.create({
+        ...payload,
+        chatId: String(payload.chatId),
+        accessHash: String(payload.accessHash)
+      })
+      return await this.chatGroupRepository.save(entity)
     }
-    const entity = this.chatGroupRepository.create(data)
-    return await this.chatGroupRepository.save(entity)
   }
 
   async updateGroup(payload: UpdateChatGroupRecordBody): Promise<ChatGroup | null> {
@@ -30,7 +40,8 @@ export class ChatGroupService {
     if (!existing) {
       return null
     }
-    await this.chatGroupRepository.update(payload.chatId, payload)
+    const { chatId, ...updateData } = payload
+    await this.chatGroupRepository.update({ chatId }, updateData)
     return await this.chatGroupRepository.findOne({ where: { chatId: payload.chatId } })
   }
 

+ 15 - 2
src/services/task.service.ts

@@ -210,8 +210,21 @@ export class TaskService {
     try {
       await this.startTaskSend()
     } catch (error) {
-      const msg = error instanceof Error ? `${error.message}; stack=${error.stack ?? 'no stack'}` : '未知错误'
-      this.app.log.error(`处理发送任务失败: ${msg}`)
+      const msg = error instanceof Error ? error.message : '未知错误'
+      const isDbConnectionError =
+        msg.includes('ECONNRESET') ||
+        msg.includes('EPIPE') ||
+        msg.includes('ETIMEDOUT') ||
+        msg.includes('ENOTFOUND') ||
+        msg.includes('Connection lost') ||
+        msg.includes('Connection closed')
+
+      if (isDbConnectionError) {
+        this.app.log.debug(`数据库连接异常,跳过本次轮询: ${msg}`)
+      } else {
+        const stack = error instanceof Error ? error.stack : undefined
+        this.app.log.error(`处理发送任务失败: ${msg}${stack ? `; stack=${stack}` : ''}`)
+      }
     } finally {
       this.processing = false
       this.processingSince = null

+ 7 - 7
src/services/test.service.ts

@@ -466,9 +466,9 @@ export class TestService {
               try {
                 await this.taskItemRepository.update(taskItem.id, {
                   status: TaskItemStatus.SUCCESS,
-                    sentAt: new Date(),
-                    senderId: senderId,
-                    errorMsg: null
+                  sentAt: new Date(),
+                  senderId: senderId,
+                  errorMsg: null
                 })
               } catch (updateError) {
                 const updateErrorMessage = updateError instanceof Error ? updateError.message : '未知错误'
@@ -488,9 +488,9 @@ export class TestService {
               try {
                 await this.taskItemRepository.update(taskItem.id, {
                   status: TaskItemStatus.FAILED,
-                    sentAt: new Date(),
-                    senderId: senderId,
-                    errorMsg: errorMessage
+                  sentAt: new Date(),
+                  senderId: senderId,
+                  errorMsg: errorMessage
                 })
               } catch (updateError) {
                 const updateErrorMessage = updateError instanceof Error ? updateError.message : '未知错误'
@@ -1075,7 +1075,7 @@ export class TestService {
         }
       }
     } else {
-      sender = await this.senderRepository.findOne({ where: { id: senderId, delFlag: false } })
+      sender = await this.senderRepository.findOne({ where: { id: senderId } })
       if (!sender) {
         return {
           error: {

+ 134 - 0
src/services/tg-group.service.ts

@@ -0,0 +1,134 @@
+import { FastifyInstance } from 'fastify'
+import { Repository } from 'typeorm'
+import { Sender } from '../entities/sender.entity'
+import { TgClientService } from './tgClient.service'
+import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
+import { TelegramClient } from 'telegram'
+import { ChatGroupService } from './chat-group.service'
+
+export class TgGroupService {
+  private readonly app: FastifyInstance
+  private readonly tgClientService: TgClientService
+  private readonly chatGroupService: ChatGroupService
+  private readonly senderRepository: Repository<Sender>
+  constructor(app: FastifyInstance) {
+    this.app = app
+    this.tgClientService = TgClientService.getInstance()
+    this.chatGroupService = new ChatGroupService(app)
+    this.senderRepository = app.dataSource.getRepository(Sender)
+  }
+
+  async createTgGroup(
+    groupName: string,
+    groupDescription: string,
+    groupType: string,
+    initMsg?: string,
+    senderId?: string
+  ): Promise<any> {
+    let sender: Sender | null = null
+    let client: TelegramClient | null = null
+
+    try {
+      if (senderId) {
+        sender = await this.senderRepository.findOne({
+          where: { id: senderId }
+        })
+
+        if (!sender) {
+          return {
+            success: false,
+            message: `指定的发送账号不存在或已被删除: ${senderId}`
+          }
+        }
+      } else {
+        sender = await this.senderRepository.findOne({
+          where: { delFlag: false },
+          order: { lastUsageTime: 'DESC' }
+        })
+
+        if (!sender) {
+          return {
+            success: false,
+            message: '没有可用的发送账号,请先添加 sender'
+          }
+        }
+      }
+
+      if (!sender.sessionStr) {
+        if (sender.dcId === undefined || sender.dcId === null || !sender.authKey) {
+          return {
+            success: false,
+            message: `发送账号 ${sender.id} 缺少 session,且缺少 dcId/authKey`
+          }
+        }
+
+        const sessionStr = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
+        await this.senderRepository.update(sender.id, { sessionStr })
+        sender.sessionStr = sessionStr
+      }
+
+      if (groupType !== 'channel' && groupType !== 'megagroup') {
+        return {
+          success: false,
+          message: `不支持的群组类型: ${groupType},仅支持: megagroup(超级群组), channel(频道)`
+        }
+      }
+
+      client = await this.tgClientService.createConnectedClient(sender.sessionStr)
+
+      const groupInfo = await this.tgClientService.createChannelGroup(
+        client,
+        groupName,
+        groupDescription || '',
+        groupType as 'megagroup' | 'channel'
+      )
+
+      const inputChannel = await this.tgClientService.getInputChannel(groupInfo.chatId, groupInfo.accessHash)
+      const inviteLink = await this.tgClientService.getInviteLink(client, inputChannel)
+      const publicLink = await this.tgClientService.getPublicLink(client, inputChannel, groupName)
+
+      if (initMsg && initMsg.trim().length > 0) {
+        await this.tgClientService.sendMessageToChannelGroup(client, inputChannel, initMsg.trim())
+      }
+
+      await this.chatGroupService.upsertGroup({
+        chatId: groupInfo.chatId,
+        accessHash: groupInfo.accessHash,
+        name: groupName,
+        groupType,
+        publicLink: publicLink || undefined,
+        inviteLink: inviteLink || undefined,
+        senderId: sender.id
+      })
+
+      await this.senderRepository.update(sender.id, {
+        usageCount: sender.usageCount + 1,
+        lastUsageTime: new Date()
+      })
+
+      return {
+        success: true,
+        message: '群组创建成功',
+        data: {
+          chatId: groupInfo.chatId,
+          accessHash: groupInfo.accessHash,
+          name: groupName,
+          groupType,
+          senderId: sender.id
+        }
+      }
+    } catch (error) {
+      const errorMessage = error instanceof Error ? error.message : '未知错误'
+      this.app.log.error(`创建群组失败: ${errorMessage}`)
+
+      return {
+        success: false,
+        message: `创建群组失败: ${errorMessage}`
+      }
+    } finally {
+      if (client) {
+        await this.tgClientService.disconnectClient(client)
+      }
+    }
+  }
+}

+ 192 - 46
src/services/tgClient.service.ts

@@ -94,9 +94,7 @@ export class TgClientService {
     for (const strategy of this.connectionStrategies) {
       const connectionOptions = { ...this.defaultConnectionOptions, ...strategy.options }
       this.app.log.info(
-        `正在建立连接[${strategy.label}]... useWSS=${connectionOptions.useWSS ? 'true' : 'false'} (443 优先), retries=${
-          connectionOptions.connectionRetries
-        }, retryDelay=${connectionOptions.retryDelay}ms`
+        `正在建立连接[${strategy.label}]... retries=${connectionOptions.connectionRetries}, retryDelay=${connectionOptions.retryDelay}ms`
       )
 
       const client = new TelegramClient(stringSession, this.apiId, this.apiHash, connectionOptions)
@@ -107,7 +105,9 @@ export class TgClientService {
         const me = await this.ensureValidSession(client)
         if (me) {
           this.app.log.info(
-            `当前登录账号: id: ${me.id} ,name: ${`${me.firstName || ''} ${me.lastName || ''}`.trim()} ${me.username || ''}`.trim()
+            `当前登录账号: id: ${me.id} ,name: ${`${me.firstName || ''} ${me.lastName || ''}`.trim()} ${
+              me.username || ''
+            }`.trim()
           )
         } else {
           this.app.log.warn('无法获取账号信息')
@@ -146,12 +146,107 @@ export class TgClientService {
     await this.disposeClient(client, 'TelegramClient')
   }
 
-  getActiveClientCount(): number {
-    return this.activeClients.size
+  async createChannelGroup(
+    client: TelegramClient,
+    groupName: string,
+    groupDescription: string,
+    groupType: 'megagroup' | 'channel'
+  ): Promise<{ chatId: string; accessHash: string; client: TelegramClient }> {
+    const waitTime = Math.floor(Math.random() * 21) + 20
+    this.app.log.info(`连接成功后等待 ${waitTime} 秒,避免新 Session 被限制`)
+    await new Promise(resolve => setTimeout(resolve, waitTime * 1000))
+
+    this.app.log.info(`开始创建${groupType === 'channel' ? '频道' : '超级群组'}: ${groupName}`)
+
+    const result = await client.invoke(
+      new Api.channels.CreateChannel({
+        title: groupName,
+        about: groupDescription || '',
+        megagroup: groupType === 'megagroup'
+      })
+    )
+
+    const updates = result as any
+    const createdChat = updates.chats?.[0]
+
+    if (!createdChat) {
+      throw new Error('创建群组失败,未返回群组信息')
+    }
+
+    const chatId = createdChat.id?.toString() || createdChat.id
+    const accessHash = createdChat.accessHash?.toString() || createdChat.accessHash
+
+    if (!chatId || accessHash === undefined || accessHash === null) {
+      throw new Error('创建群组失败,缺少 chatId 或 accessHash')
+    }
+
+    this.app.log.info(`群组创建成功,ID: ${chatId}, accessHash: ${accessHash}`)
+
+    return {
+      chatId: String(chatId),
+      accessHash: String(accessHash),
+      client
+    }
   }
 
-  private logActiveClientCount(): void {
-    this.app?.log?.info?.(`当前活跃 Telegram 客户端数量: ${this.activeClients.size}`)
+  async getInputChannel(chatId: string | number, accessHash: string | number): Promise<Api.InputChannel> {
+    return new Api.InputChannel({
+      channelId: bigInt(chatId.toString()),
+      accessHash: bigInt(accessHash.toString())
+    })
+  }
+
+  async sendMessageToChannelGroup(
+    client: TelegramClient,
+    inputChannel: Api.InputChannel,
+    message: string
+  ): Promise<void> {
+    this.app.log.info('正在发送群组消息...')
+
+    try {
+      await client.sendMessage(inputChannel, {
+        message: message.trim()
+      })
+      this.app.log.info('已向群组发送消息')
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      throw new Error(`发送群组消息失败: ${errorMessage}`)
+    }
+  }
+
+  async getInviteLink(client: TelegramClient, inputChannel: Api.InputChannel): Promise<string | null> {
+    try {
+      const inviteLink = await client.invoke(new Api.messages.ExportChatInvite({ peer: inputChannel }))
+      const invite = inviteLink as any
+      if (invite?.link) {
+        return invite.link
+      }
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      this.app.log.error(`获取群组邀请链接失败: ${errorMessage}`)
+    }
+    return null
+  }
+
+  async getPublicLink(
+    client: TelegramClient,
+    inputChannel: Api.InputChannel,
+    groupName?: string
+  ): Promise<string | null> {
+    try {
+      const username = this.generateGroupUsername(groupName)
+      await client.invoke(
+        new Api.channels.UpdateUsername({
+          channel: inputChannel,
+          username
+        })
+      )
+      return `https://t.me/${username}`
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      this.app.log.error(`获取群组公开链接失败: ${errorMessage}`)
+    }
+    return null
   }
 
   async getTargetPeer(client: TelegramClient, parsedTarget: string | number): Promise<any> {
@@ -252,6 +347,37 @@ export class TgClientService {
     }
   }
 
+  private async disposeClient(client: TelegramClient | null, context: string): Promise<void> {
+    if (!client) {
+      return
+    }
+
+    try {
+      if (client.connected) {
+        await client.disconnect()
+        this.app.log.info(`${context} 已断开连接`)
+      }
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      if (!errorMessage.includes('TIMEOUT')) {
+        this.app.log.error({ msg: `${context} 断开时发生错误`, error: errorMessage })
+      }
+    }
+
+    try {
+      await client.destroy()
+      this.app.log.info(`${context} 已销毁`)
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      if (!errorMessage.includes('TIMEOUT')) {
+        this.app.log.error({ msg: `${context} 销毁时发生错误`, error: errorMessage })
+      }
+    }
+
+    this.activeClients.delete(client)
+    this.logActiveClientCount()
+  }
+
   private extractErrorMessage(error: unknown): string {
     if (error instanceof Error) {
       return error.message
@@ -262,13 +388,62 @@ export class TgClientService {
     return '未知错误'
   }
 
+  private generateGroupUsername(groupName?: string): string {
+    const random = Math.random().toString(36).slice(2, 8)
+    const maxLength = 32
+    const baseName = this.buildEnglishName(groupName)
+    const availableLength = Math.max(1, maxLength - 'group_'.length - random.length - 1)
+    const safeName = baseName.slice(0, availableLength)
+    return `group_${safeName}_${random}`
+  }
+
+  private buildEnglishName(name?: string): string {
+    if (name && this.isPureEnglish(name)) {
+      const sanitized = this.sanitizeName(name)
+      if (sanitized.length > 0) {
+        return sanitized
+      }
+    }
+    return this.getRandomEnglishWord()
+  }
+
+  private isPureEnglish(value: string): boolean {
+    return /^[A-Za-z0-9 _-]+$/.test(value)
+  }
+
+  private sanitizeName(name: string): string {
+    return name
+      .trim()
+      .toLowerCase()
+      .replace(/[^a-z0-9]+/g, '_')
+      .replace(/^_+|_+$/g, '')
+      .replace(/_+/g, '_')
+  }
+
+  private getRandomEnglishWord(): string {
+    const words = [
+      'apple',
+      'bridge',
+      'cloud',
+      'forest',
+      'garden',
+      'lamp',
+      'ocean',
+      'paper',
+      'planet',
+      'rocket',
+      'stone',
+      'stream',
+      'sunrise',
+      'valley',
+      'wind'
+    ]
+    return words[Math.floor(Math.random() * words.length)]
+  }
+
   private isSessionRevokedError(error: unknown): boolean {
     const msg = this.extractErrorMessage(error)
-    return (
-      msg.includes('SESSION_REVOKED') ||
-      msg.includes('AUTH_KEY_UNREGISTERED') ||
-      msg.includes('AUTH_KEY_INVALID')
-    )
+    return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
   }
 
   private async connectWithTimeout(client: TelegramClient): Promise<void> {
@@ -301,40 +476,7 @@ export class TgClientService {
         throw new Error('Telegram Session 已失效或被吊销,需要重新登录获取新的 session')
       }
       throw error
-    } finally {
-      this.app.log.info('账号信息获取完成')
-    }
-  }
-
-  private async disposeClient(client: TelegramClient | null, context: string): Promise<void> {
-    if (!client) {
-      return
     }
-
-    try {
-      if (client.connected) {
-        await client.disconnect()
-        this.app.log.info(`${context} 已断开连接`)
-      }
-    } catch (error) {
-      const errorMessage = this.extractErrorMessage(error)
-      if (!errorMessage.includes('TIMEOUT')) {
-        this.app.log.error({ msg: `${context} 断开时发生错误`, error: errorMessage })
-      }
-    }
-
-    try {
-      await client.destroy()
-      this.app.log.info(`${context} 已销毁`)
-    } catch (error) {
-      const errorMessage = this.extractErrorMessage(error)
-      if (!errorMessage.includes('TIMEOUT')) {
-        this.app.log.error({ msg: `${context} 销毁时发生错误`, error: errorMessage })
-      }
-    }
-
-    this.activeClients.delete(client)
-    this.logActiveClientCount()
   }
 
   private logTargetInfo(targetPeer: any): void {
@@ -360,6 +502,10 @@ export class TgClientService {
     this.app.log.info(logData)
   }
 
+  private logActiveClientCount(): void {
+    this.app?.log?.info?.(`当前活跃 Telegram 客户端数量: ${this.activeClients.size}`)
+  }
+
   getClient(): TelegramClient | null {
     return this.client
   }