Ver Fonte

优化任务执行器,添加账号排除机制以处理连接失败,增强错误处理逻辑,更新群组邀请和账号管理流程,提升代码可读性和可维护性。

wuyi há 6 dias atrás
pai
commit
01017e4a07

+ 118 - 25
src/executor/task.executor.ts

@@ -32,6 +32,8 @@ export class TaskExecutor {
   private accountCursor = 0
   // 账号缓存
   private accountCache: TgUser[] = []
+  // 本批次临时排除的账号(连接/功能异常等),避免在同一轮里反复选中导致任务整体失败
+  private accountExcludedInBatch: Set<string> = new Set()
 
   constructor(private app: FastifyInstance) {
     const ds = app.dataSource
@@ -58,6 +60,7 @@ export class TaskExecutor {
       const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
       this.currentAccountCacheTake = this.computeAccountCacheTake(task, concurrency)
       await this.refreshAccountCache()
+      this.accountExcludedInBatch.clear()
 
       await this.process(task)
       await this.finalize(task.id)
@@ -130,6 +133,7 @@ export class TaskExecutor {
     let tgUser: TgUser | null = null
     const workerTgClient = new TgClientManager()
     let accountUsageInRound = 0
+    // 仅用于 INVITE_TO_GROUP:缓存当前账号已加入的群实体,避免每条都重复解析/入群
     let inviteGroupEntity: any | null = null
 
     try {
@@ -155,6 +159,8 @@ export class TaskExecutor {
 
         // tgUser 轮换逻辑
         if (!tgUser || accountUsageInRound >= this.currentAccountLimit) {
+          // 换号前:若当前 tgUser 已加入群聊,则先退出群聊再断开
+          await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
           await workerTgClient.disconnect()
           tgUser = await this.pickAccount()
           const sessionString = await this.ensureSessionString(tgUser)
@@ -163,12 +169,23 @@ export class TaskExecutor {
             await workerTgClient.connect(sessionString)
           } catch (error) {
             const msg = error instanceof Error ? error.message : String(error)
+            // 会话失效:直接移除该账号
             if (this.isSessionRevokedMessage(msg)) {
               await this.handleSessionRevoked(tgUser)
               tgUser = null
               continue
             }
-            throw error
+            // 连接失败:不让错误冒泡导致任务被调度器判死;先排除此账号,换号继续
+            this.app.log.warn(
+              { taskId, sender: tgUser.id, err: msg },
+              'TelegramClient connect failed, rotate account'
+            )
+            this.accountExcludedInBatch.add(tgUser.id)
+            tgUser = null
+            accountUsageInRound = 0
+            inviteGroupEntity = null
+            await workerTgClient.disconnect().catch(() => {})
+            continue
           }
 
           accountUsageInRound = 0
@@ -181,15 +198,38 @@ export class TaskExecutor {
         await this.sleep(delaySeconds * 1000)
 
         try {
-          await this.processTaskItem(task, taskItem, tgUser, workerTgClient)
+          const result = await this.processTaskItem(task, taskItem, tgUser, workerTgClient, inviteGroupEntity)
+          // 更新缓存(仅 INVITE_TO_GROUP 会返回)
+          if (result?.inviteGroupEntity !== undefined) {
+            inviteGroupEntity = result.inviteGroupEntity
+          }
+          // 邀请失败:按你的预期,立即换一个 tgUser 继续流程
+          if (result?.rotateAccount) {
+            this.app.log.info(
+              { taskId, itemId: taskItem.id, sender: tgUser.id, reason: result.reason ?? 'rotate' },
+              'rotate account due to task item failure'
+            )
+            this.accountExcludedInBatch.add(tgUser.id)
+            // 换号前:退出群聊(如果已加入)
+            await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
+            await workerTgClient.disconnect().catch(() => {})
+            tgUser = null
+            accountUsageInRound = 0
+            inviteGroupEntity = null
+          }
         } catch (error) {
+          // 兜底:理论上 processTaskItem 不应抛错;如果抛错,也不要影响整任务
           const msg = error instanceof Error ? error.message : '未知错误'
+          this.app.log.error({ taskId, itemId: taskItem.id, sender: tgUser?.id, err: msg }, 'processTaskItem crashed')
           if (tgUser && this.isSessionRevokedMessage(msg)) {
             await this.handleSessionRevoked(tgUser)
-            await workerTgClient.disconnect()
-            tgUser = null
-            accountUsageInRound = 0
+          } else if (tgUser) {
+            this.accountExcludedInBatch.add(tgUser.id)
           }
+          await workerTgClient.disconnect().catch(() => {})
+          tgUser = null
+          accountUsageInRound = 0
+          inviteGroupEntity = null
         } finally {
           accountUsageInRound++
           if (tgUser) {
@@ -205,6 +245,8 @@ export class TaskExecutor {
         }
       }
     } finally {
+      // worker 结束前:尽量退出群聊,避免账号一直挂在群里
+      await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
       await workerTgClient.disconnect()
     }
   }
@@ -254,10 +296,11 @@ export class TaskExecutor {
     task: Task,
     item: TaskItem,
     sender: TgUser,
-    workerTgClient: TgClientManager
-  ): Promise<void> {
+    workerTgClient: TgClientManager,
+    inviteGroupEntity: any | null
+  ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
     if (task.type === TaskType.INVITE_TO_GROUP) {
-      return await this.processInviteToGroup(task, item, sender, workerTgClient)
+      return await this.processInviteToGroup(task, item, sender, workerTgClient, inviteGroupEntity)
     }
 
     return await this.processSendMessage(task, item, sender, workerTgClient)
@@ -268,7 +311,7 @@ export class TaskExecutor {
     item: TaskItem,
     sender: TgUser,
     workerTgClient: TgClientManager
-  ): Promise<void> {
+  ): Promise<{ rotateAccount?: boolean; reason?: string } | void> {
     const message = String(task.payload?.message ?? '').trim()
     if (!message) {
       await this.taskItemRepo.update(item.id, {
@@ -326,7 +369,8 @@ export class TaskExecutor {
       await this.taskRepo.increment({ id: task.id }, 'processed', 1)
 
       this.app.log.warn(`❌ 发送失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
-      throw error
+      // SEND_MESSAGE 默认不强制换号(避免快速耗尽账号池);如需也换号,可在此处返回 rotateAccount: true
+      return { rotateAccount: false, reason: msg }
     }
   }
 
@@ -334,8 +378,9 @@ export class TaskExecutor {
     task: Task,
     item: TaskItem,
     sender: TgUser,
-    workerTgClient: TgClientManager
-  ): Promise<void> {
+    workerTgClient: TgClientManager,
+    inviteGroupEntity: any | null
+  ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
     try {
       const inviteLink = String(task.payload?.inviteLink ?? '').trim()
       if (!inviteLink) {
@@ -357,13 +402,21 @@ export class TaskExecutor {
         throw new Error('TelegramClient 未连接')
       }
 
-      // tgUser 加入群组,获取群组实体
-      const inviteGroupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
-      if (!inviteGroupEntity) {
+      // tgUser 加入群组,获取群组实体(每个账号缓存一次)
+      let groupEntity = inviteGroupEntity
+      if (!groupEntity) {
+        groupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
+      }
+      if (!groupEntity) {
         throw new Error('群拉人任务:未获取到群组实体(inviteGroupEntity 为空)')
       }
+      const chatId = groupEntity.chatId ?? groupEntity.id
+      const accessHash = groupEntity.accessHash
+      if (chatId === undefined || chatId === null || accessHash === undefined || accessHash === null) {
+        throw new Error('群拉人任务:群组实体缺少 id/chatId/accessHash(请检查 resolveGroupEntityByInviteLink 返回值)')
+      }
 
-      const inputChannel = await workerTgClient.getInputChannel(inviteGroupEntity.chatId, inviteGroupEntity.accessHash)
+      const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
       await workerTgClient.inviteMembersToChannelGroup(inputChannel, [targetUser])
 
       await this.taskItemRepo.update(item.id, {
@@ -378,6 +431,7 @@ export class TaskExecutor {
       await this.taskRepo.increment({ id: task.id }, 'success', 1)
 
       this.app.log.info(`✅ 邀请成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
+      return { rotateAccount: false, inviteGroupEntity: groupEntity }
     } catch (error) {
       const msg = error instanceof Error ? error.message : '未知错误'
 
@@ -392,7 +446,7 @@ export class TaskExecutor {
         await this.taskRepo.increment({ id: task.id }, 'processed', 1)
         await this.taskRepo.increment({ id: task.id }, 'success', 1)
         this.app.log.info(`ℹ️ 成员已在群组中 taskId=${task.id}, itemId=${item.id}, target=${item.target}`)
-        return
+        return { rotateAccount: false }
       }
 
       await this.taskItemRepo.update(item.id, {
@@ -406,7 +460,26 @@ export class TaskExecutor {
       await this.taskRepo.increment({ id: task.id }, 'processed', 1)
 
       this.app.log.warn(`❌ 邀请失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
-      throw error
+      // INVITE_TO_GROUP:按需求,失败就换号继续(避免单号被冻结/受限导致整体停滞)
+      return { rotateAccount: true, reason: msg }
+    }
+  }
+
+  /**
+   * 换号/断开前:让当前账号退出已加入的群聊
+   * - 仅对 INVITE_TO_GROUP 有意义;其它任务 inviteGroupEntity 为空会直接跳过
+   * - 不抛错:避免退群失败影响整体任务流程
+   */
+  private async safeLeaveInviteGroup(workerTgClient: TgClientManager, inviteGroupEntity: any | null): Promise<void> {
+    if (!inviteGroupEntity) return
+    const chatId = inviteGroupEntity.chatId ?? inviteGroupEntity.id
+    const accessHash = inviteGroupEntity.accessHash
+    if (chatId === undefined || chatId === null || accessHash === undefined || accessHash === null) return
+    try {
+      const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
+      await workerTgClient.leaveGroup(inputChannel)
+    } catch {
+      // 忽略退群异常(可能已不在群、权限问题等)
     }
   }
 
@@ -494,19 +567,39 @@ export class TaskExecutor {
     }
 
     const total = this.accountCache.length
-    for (let i = 0; i < total; i++) {
-      const index = (this.accountCursor + i) % total
-      const account = this.accountCache[index]
-      const used = this.accountUsageInBatch.get(account.id) ?? 0
-      if (used < this.currentAccountLimit) {
-        this.accountCursor = (index + 1) % total
-        return account
+    const tryPick = (): TgUser | null => {
+      for (let i = 0; i < total; i++) {
+        const index = (this.accountCursor + i) % total
+        const account = this.accountCache[index]
+        if (this.accountExcludedInBatch.has(account.id)) continue
+        const used = this.accountUsageInBatch.get(account.id) ?? 0
+        if (used < this.currentAccountLimit) {
+          this.accountCursor = (index + 1) % total
+          return account
+        }
       }
+      return null
     }
 
+    const picked1 = tryPick()
+    if (picked1) return picked1
+
     this.app.log.info('所有 tgUser 均已达到当前批次上限,重置计数后重新轮询')
     this.accountUsageInBatch.clear()
     this.accountCursor = 0
+
+    const picked2 = tryPick()
+    if (picked2) return picked2
+
+    // 如果全部被排除,说明这一批账号都异常/受限:清空排除集再尝试一次
+    if (this.accountExcludedInBatch.size > 0) {
+      this.app.log.warn('本批次所有 tgUser 均被排除,清空排除列表后重试')
+      this.accountExcludedInBatch.clear()
+      const picked3 = tryPick()
+      if (picked3) return picked3
+    }
+
+    // 兜底:返回第一个,避免直接抛错导致任务中断(但大概率会在连接失败后再次被排除并轮换)
     return this.accountCache[0]
   }
 

+ 25 - 7
src/schedulers/task.scheduler.ts

@@ -9,11 +9,18 @@ export class TaskScheduler {
   private taskRepo: Repository<Task>
   private taskExecutor: TaskExecutor
   private running = false
-  private readonly taskExecutionTimeoutMs = 3 * 60 * 1000
+  /**
+   * 任务执行超时(ms)
+   * - 默认 0:不启用超时(群拉人/批量任务通常远超 3 分钟)
+   * - 可通过环境变量覆盖:TASK_EXEC_TIMEOUT_MS=3600000
+   */
+  private readonly taskExecutionTimeoutMs: number
 
   private constructor(private app: FastifyInstance) {
     this.taskRepo = app.dataSource.getRepository(Task)
     this.taskExecutor = new TaskExecutor(app)
+    const raw = Number(process.env.TASK_EXEC_TIMEOUT_MS ?? 0)
+    this.taskExecutionTimeoutMs = !Number.isNaN(raw) && raw > 0 ? raw : 0
   }
 
   static getInstance(app: FastifyInstance): TaskScheduler {
@@ -42,13 +49,18 @@ export class TaskScheduler {
     this.running = true
 
     try {
-      // 1️⃣ 是否已有执行中的任务
+      // 1️⃣ 优先执行已处于 SENDING 状态的任务(处理应用重启后任务状态未重置的情况)
       const runningTask = await this.taskRepo.findOne({
-        where: { status: TaskStatus.SENDING, delFlag: false }
+        where: { status: TaskStatus.SENDING, delFlag: false },
+        order: { startedAt: 'ASC' }
       })
-      if (runningTask) return
+      if (runningTask) {
+        // 直接执行已处于 SENDING 状态的任务
+        await this.executeTask(runningTask)
+        return
+      }
 
-      // 2️⃣ 找下一个任务
+      // 2️⃣ 找下一个 QUEUING 状态的任务
       const nextTask = await this.taskRepo.findOne({
         where: { status: TaskStatus.QUEUING, delFlag: false },
         order: { startedAt: 'ASC' }
@@ -103,13 +115,15 @@ export class TaskScheduler {
     }
 
     if (err.message === 'TASK_TIMEOUT') {
-      await this.taskRepo.update(taskId, { status: TaskStatus.CANCELED })
+      // 超时本质属于调度器保护机制,默认改为 PAUSED(避免误杀任务)
+      await this.taskRepo.update(taskId, { status: TaskStatus.PAUSED })
       return
     }
 
     console.error('[TaskScheduler] task failed:', err)
     await this.taskRepo.update(taskId, {
-      status: TaskStatus.CANCELED
+      // 未知错误默认暂停(更安全,支持用户手动 resume)
+      status: TaskStatus.PAUSED
     })
   }
 
@@ -117,6 +131,10 @@ export class TaskScheduler {
    * 写死的超时包装
    */
   private async runWithTimeout(fn: () => Promise<void>): Promise<void> {
+    if (!this.taskExecutionTimeoutMs || this.taskExecutionTimeoutMs <= 0) {
+      await fn()
+      return
+    }
     return Promise.race([
       fn(),
       new Promise((_, reject) => setTimeout(() => reject(new Error('TASK_TIMEOUT')), this.taskExecutionTimeoutMs))

+ 72 - 0
src/services/clients/tg-client.manager.ts

@@ -353,6 +353,60 @@ export class TgClientManager {
     }
   }
 
+  /**
+   * 退出普通群(Chat 类型,没有 accessHash)
+   * @param chatId - 普通群的 chatId(int)
+   */
+  async leaveBasicChat(chatId: string | number): Promise<void> {
+    this.ensureConnected()
+    const cid = Number(chatId)
+    if (Number.isNaN(cid)) {
+      throw new Error(`退出普通群失败: chatId 无效 ${chatId}`)
+    }
+
+    try {
+      await this.client!.invoke(
+        new Api.messages.DeleteChatUser({
+          chatId: bigInt(String(cid)),
+          userId: new Api.InputUserSelf()
+        })
+      )
+      this.app.log.info(`成功退出普通群: ${cid}`)
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      throw new Error(`退出普通群失败: ${errorMessage}`)
+    }
+  }
+
+  /**
+   * 邀请用户加入普通群(Chat 类型)
+   * @param chatId - 普通群的 chatId(int)
+   * @param userEntity - 用户实体(Api.User 等),需要包含 id/accessHash
+   */
+  async inviteUserToBasicChat(chatId: string | number, userEntity: any): Promise<void> {
+    this.ensureConnected()
+    const cid = Number(chatId)
+    if (Number.isNaN(cid)) {
+      throw new Error(`邀请成员到普通群失败: chatId 无效 ${chatId}`)
+    }
+
+    const inputUser = this.toInputUser(userEntity)
+
+    try {
+      await this.client!.invoke(
+        new Api.messages.AddChatUser({
+          chatId: bigInt(String(cid)),
+          userId: inputUser,
+          fwdLimit: 0
+        })
+      )
+      this.app.log.info(`已邀请成员到普通群: ${cid}`)
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      throw new Error(`邀请成员到普通群失败: ${errorMessage}`)
+    }
+  }
+
   /**
    * 通过邀请链接进群后获取群组实体
    * - 用于“拉人进群”任务:需要拿到 chat/channel 实体,进而拿到 accessHash
@@ -653,6 +707,24 @@ export class TgClientManager {
     return '未知错误'
   }
 
+  /**
+   * 将用户实体转换为 InputUser(用于 AddChatUser / InviteToChannel)
+   */
+  private toInputUser(user: any): Api.InputUser {
+    if (user instanceof Api.InputUser) {
+      return user
+    }
+    const id = user?.id
+    const accessHash = user?.accessHash
+    if (id === undefined || id === null || accessHash === undefined || accessHash === null) {
+      throw new Error('无法构造 InputUser:缺少 id/accessHash')
+    }
+    return new Api.InputUser({
+      userId: bigInt(String(id)),
+      accessHash: bigInt(String(accessHash))
+    })
+  }
+
   /**
    * 从邀请链接中提取 hash 值
    * @private