1
0

3 Коммиты 30fb17c705 ... 344dafb82f

Автор SHA1 Сообщение Дата
  wuyi 344dafb82f 增强任务执行器的账号管理逻辑,添加账号租约机制以避免并发 worker 复用同一 tgUser,优化账号获取和释放流程,提升代码可读性和可维护性。 6 дней назад
  wuyi 0c741b38b9 优化任务执行器的群组邀请和退群逻辑,区分处理普通群和超级群的邀请方式,增强错误处理,提升代码可读性和可维护性。 6 дней назад
  wuyi 01017e4a07 优化任务执行器,添加账号排除机制以处理连接失败,增强错误处理逻辑,更新群组邀请和账号管理流程,提升代码可读性和可维护性。 6 дней назад

+ 202 - 32
src/executor/task.executor.ts

@@ -32,6 +32,11 @@ export class TaskExecutor {
   private accountCursor = 0
   // 账号缓存
   private accountCache: TgUser[] = []
+  // 本批次临时排除的账号(连接/功能异常等),避免在同一轮里反复选中导致任务整体失败
+  private accountExcludedInBatch: Set<string> = new Set()
+  // 并发 worker 的账号租约:同一时刻尽量保证一个 tgUser 只被一个 worker 使用
+  private accountLeaseByWorker: Map<number, string> = new Map()
+  private accountLeaseOwner: Map<string, number> = new Map()
 
   constructor(private app: FastifyInstance) {
     const ds = app.dataSource
@@ -54,10 +59,13 @@ export class TaskExecutor {
         task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultAccountLimit
       this.accountUsageInBatch.clear()
       this.accountCursor = 0
+      this.accountLeaseByWorker.clear()
+      this.accountLeaseOwner.clear()
       // 计算任务需要拉取的账号池大小
       const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
       this.currentAccountCacheTake = this.computeAccountCacheTake(task, concurrency)
       await this.refreshAccountCache()
+      this.accountExcludedInBatch.clear()
 
       await this.process(task)
       await this.finalize(task.id)
@@ -117,7 +125,7 @@ export class TaskExecutor {
     const workers: Promise<void>[] = []
 
     for (let i = 0; i < concurrency; i++) {
-      workers.push(this.workerLoop(task.id))
+      workers.push(this.workerLoop(task.id, i))
     }
 
     await Promise.all(workers)
@@ -126,10 +134,11 @@ export class TaskExecutor {
   /**
    * 单 worker 循环
    */
-  private async workerLoop(taskId: number): Promise<void> {
+  private async workerLoop(taskId: number, workerId: number): Promise<void> {
     let tgUser: TgUser | null = null
     const workerTgClient = new TgClientManager()
     let accountUsageInRound = 0
+    // 仅用于 INVITE_TO_GROUP:缓存当前账号已加入的群实体,避免每条都重复解析/入群
     let inviteGroupEntity: any | null = null
 
     try {
@@ -155,20 +164,38 @@ export class TaskExecutor {
 
         // tgUser 轮换逻辑
         if (!tgUser || accountUsageInRound >= this.currentAccountLimit) {
+          // 换号前:若当前 tgUser 已加入群聊,则先退出群聊再断开
+          await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
           await workerTgClient.disconnect()
-          tgUser = await this.pickAccount()
+          // 释放旧租约后再重新获取,避免一个 worker 永久占用账号
+          if (tgUser) {
+            this.releaseAccountLease(workerId, tgUser.id)
+          }
+          tgUser = await this.acquireAccount(workerId)
           const sessionString = await this.ensureSessionString(tgUser)
 
           try {
             await workerTgClient.connect(sessionString)
           } catch (error) {
             const msg = error instanceof Error ? error.message : String(error)
+            // 会话失效:直接移除该账号
             if (this.isSessionRevokedMessage(msg)) {
               await this.handleSessionRevoked(tgUser)
               tgUser = null
               continue
             }
-            throw error
+            // 连接失败:不让错误冒泡导致任务被调度器判死;先排除此账号,换号继续
+            this.app.log.warn(
+              { taskId, sender: tgUser.id, err: msg },
+              'TelegramClient connect failed, rotate account'
+            )
+            this.accountExcludedInBatch.add(tgUser.id)
+            this.releaseAccountLease(workerId, tgUser.id)
+            tgUser = null
+            accountUsageInRound = 0
+            inviteGroupEntity = null
+            await workerTgClient.disconnect().catch(() => {})
+            continue
           }
 
           accountUsageInRound = 0
@@ -181,15 +208,42 @@ export class TaskExecutor {
         await this.sleep(delaySeconds * 1000)
 
         try {
-          await this.processTaskItem(task, taskItem, tgUser, workerTgClient)
+          const result = await this.processTaskItem(task, taskItem, tgUser, workerTgClient, inviteGroupEntity)
+          // 更新缓存(仅 INVITE_TO_GROUP 会返回)
+          if (result?.inviteGroupEntity !== undefined) {
+            inviteGroupEntity = result.inviteGroupEntity
+          }
+          // 邀请失败:按你的预期,立即换一个 tgUser 继续流程
+          if (result?.rotateAccount) {
+            this.app.log.info(
+              { taskId, itemId: taskItem.id, sender: tgUser.id, reason: result.reason ?? 'rotate' },
+              'rotate account due to task item failure'
+            )
+            this.accountExcludedInBatch.add(tgUser.id)
+            // 换号前:退出群聊(如果已加入)
+            await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
+            await workerTgClient.disconnect().catch(() => {})
+            this.releaseAccountLease(workerId, tgUser.id)
+            tgUser = null
+            accountUsageInRound = 0
+            inviteGroupEntity = null
+          }
         } catch (error) {
+          // 兜底:理论上 processTaskItem 不应抛错;如果抛错,也不要影响整任务
           const msg = error instanceof Error ? error.message : '未知错误'
+          this.app.log.error({ taskId, itemId: taskItem.id, sender: tgUser?.id, err: msg }, 'processTaskItem crashed')
           if (tgUser && this.isSessionRevokedMessage(msg)) {
             await this.handleSessionRevoked(tgUser)
-            await workerTgClient.disconnect()
-            tgUser = null
-            accountUsageInRound = 0
+          } else if (tgUser) {
+            this.accountExcludedInBatch.add(tgUser.id)
+          }
+          if (tgUser) {
+            this.releaseAccountLease(workerId, tgUser.id)
           }
+          await workerTgClient.disconnect().catch(() => {})
+          tgUser = null
+          accountUsageInRound = 0
+          inviteGroupEntity = null
         } finally {
           accountUsageInRound++
           if (tgUser) {
@@ -205,7 +259,12 @@ export class TaskExecutor {
         }
       }
     } finally {
+      // worker 结束前:尽量退出群聊,避免账号一直挂在群里
+      await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
       await workerTgClient.disconnect()
+      if (tgUser) {
+        this.releaseAccountLease(workerId, tgUser.id)
+      }
     }
   }
 
@@ -254,10 +313,11 @@ export class TaskExecutor {
     task: Task,
     item: TaskItem,
     sender: TgUser,
-    workerTgClient: TgClientManager
-  ): Promise<void> {
+    workerTgClient: TgClientManager,
+    inviteGroupEntity: any | null
+  ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
     if (task.type === TaskType.INVITE_TO_GROUP) {
-      return await this.processInviteToGroup(task, item, sender, workerTgClient)
+      return await this.processInviteToGroup(task, item, sender, workerTgClient, inviteGroupEntity)
     }
 
     return await this.processSendMessage(task, item, sender, workerTgClient)
@@ -268,7 +328,7 @@ export class TaskExecutor {
     item: TaskItem,
     sender: TgUser,
     workerTgClient: TgClientManager
-  ): Promise<void> {
+  ): Promise<{ rotateAccount?: boolean; reason?: string } | void> {
     const message = String(task.payload?.message ?? '').trim()
     if (!message) {
       await this.taskItemRepo.update(item.id, {
@@ -326,7 +386,8 @@ export class TaskExecutor {
       await this.taskRepo.increment({ id: task.id }, 'processed', 1)
 
       this.app.log.warn(`❌ 发送失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
-      throw error
+      // SEND_MESSAGE 默认不强制换号(避免快速耗尽账号池);如需也换号,可在此处返回 rotateAccount: true
+      return { rotateAccount: false, reason: msg }
     }
   }
 
@@ -334,8 +395,11 @@ export class TaskExecutor {
     task: Task,
     item: TaskItem,
     sender: TgUser,
-    workerTgClient: TgClientManager
-  ): Promise<void> {
+    workerTgClient: TgClientManager,
+    inviteGroupEntity: any | null
+  ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
+    // 注意:邀请失败时也需要能拿到已加入的群实体用于退群
+    let groupEntity: any | null = inviteGroupEntity
     try {
       const inviteLink = String(task.payload?.inviteLink ?? '').trim()
       if (!inviteLink) {
@@ -357,14 +421,27 @@ export class TaskExecutor {
         throw new Error('TelegramClient 未连接')
       }
 
-      // tgUser 加入群组,获取群组实体
-      const inviteGroupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
-      if (!inviteGroupEntity) {
+      // tgUser 加入群组,获取群组实体(每个账号缓存一次)
+      if (!groupEntity) {
+        groupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
+      }
+      if (!groupEntity) {
         throw new Error('群拉人任务:未获取到群组实体(inviteGroupEntity 为空)')
       }
+      const chatId = groupEntity.chatId ?? groupEntity.id
+      const accessHash = groupEntity.accessHash
+      if (chatId === undefined || chatId === null) {
+        throw new Error('群拉人任务:群组实体缺少 id/chatId(请检查 resolveGroupEntityByInviteLink 返回值)')
+      }
 
-      const inputChannel = await workerTgClient.getInputChannel(inviteGroupEntity.chatId, inviteGroupEntity.accessHash)
-      await workerTgClient.inviteMembersToChannelGroup(inputChannel, [targetUser])
+      // 超级群/频道:有 accessHash,走 channels.InviteToChannel
+      if (accessHash !== undefined && accessHash !== null) {
+        const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
+        await workerTgClient.inviteMembersToChannelGroup(inputChannel, [targetUser])
+      } else {
+        // 普通群(Chat):没有 accessHash,走 messages.AddChatUser
+        await workerTgClient.inviteUserToBasicChat(chatId, targetUser)
+      }
 
       await this.taskItemRepo.update(item.id, {
         status: TaskItemStatus.SUCCESS,
@@ -378,6 +455,7 @@ export class TaskExecutor {
       await this.taskRepo.increment({ id: task.id }, 'success', 1)
 
       this.app.log.info(`✅ 邀请成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
+      return { rotateAccount: false, inviteGroupEntity: groupEntity }
     } catch (error) {
       const msg = error instanceof Error ? error.message : '未知错误'
 
@@ -392,7 +470,7 @@ export class TaskExecutor {
         await this.taskRepo.increment({ id: task.id }, 'processed', 1)
         await this.taskRepo.increment({ id: task.id }, 'success', 1)
         this.app.log.info(`ℹ️ 成员已在群组中 taskId=${task.id}, itemId=${item.id}, target=${item.target}`)
-        return
+        return { rotateAccount: false }
       }
 
       await this.taskItemRepo.update(item.id, {
@@ -406,7 +484,33 @@ export class TaskExecutor {
       await this.taskRepo.increment({ id: task.id }, 'processed', 1)
 
       this.app.log.warn(`❌ 邀请失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
-      throw error
+      // 关键:邀请失败时也要尽量退出刚加入的群,避免账号一直挂在群里
+      await this.safeLeaveInviteGroup(workerTgClient, groupEntity).catch(() => {})
+      // INVITE_TO_GROUP:按需求,失败就换号继续(避免单号被冻结/受限导致整体停滞)
+      // 同时清空缓存,避免后续误以为仍在群内
+      return { rotateAccount: true, reason: msg, inviteGroupEntity: null }
+    }
+  }
+
+  /**
+   * 换号/断开前:让当前账号退出已加入的群聊
+   * - 仅对 INVITE_TO_GROUP 有意义;其它任务 inviteGroupEntity 为空会直接跳过
+   * - 不抛错:避免退群失败影响整体任务流程
+   */
+  private async safeLeaveInviteGroup(workerTgClient: TgClientManager, inviteGroupEntity: any | null): Promise<void> {
+    if (!inviteGroupEntity) return
+    const chatId = inviteGroupEntity.chatId ?? inviteGroupEntity.id
+    const accessHash = inviteGroupEntity.accessHash
+    try {
+      if (chatId === undefined || chatId === null) return
+      if (accessHash !== undefined && accessHash !== null) {
+        const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
+        await workerTgClient.leaveGroup(inputChannel)
+      } else {
+        await workerTgClient.leaveBasicChat(chatId)
+      }
+    } catch {
+      // 忽略退群异常(可能已不在群、权限问题等)
     }
   }
 
@@ -477,9 +581,9 @@ export class TaskExecutor {
   }
 
   /**
-   * 选择可用的tgUser 账号
+   * 获取账号(带 worker 租约,尽量避免并发 worker 复用同一 tgUser)
    */
-  private async pickAccount(): Promise<TgUser> {
+  private async acquireAccount(workerId: number): Promise<TgUser> {
     if (this.accountCache.length === 0) {
       this.accountCache = await this.senderRepo.find({
         where: { delFlag: false },
@@ -494,20 +598,86 @@ export class TaskExecutor {
     }
 
     const total = this.accountCache.length
-    for (let i = 0; i < total; i++) {
-      const index = (this.accountCursor + i) % total
-      const account = this.accountCache[index]
-      const used = this.accountUsageInBatch.get(account.id) ?? 0
-      if (used < this.currentAccountLimit) {
-        this.accountCursor = (index + 1) % total
-        return account
+    const tryPick = (respectLease: boolean): TgUser | null => {
+      for (let i = 0; i < total; i++) {
+        const index = (this.accountCursor + i) % total
+        const account = this.accountCache[index]
+        if (this.accountExcludedInBatch.has(account.id)) continue
+        if (respectLease) {
+          const leasedBy = this.accountLeaseOwner.get(account.id)
+          if (leasedBy !== undefined && leasedBy !== workerId) continue
+        }
+        const used = this.accountUsageInBatch.get(account.id) ?? 0
+        if (used < this.currentAccountLimit) {
+          this.accountCursor = (index + 1) % total
+          return account
+        }
       }
+      return null
+    }
+
+    // 先按“租约”挑选:一个账号尽量只服务一个 worker
+    const picked1 = tryPick(true)
+    if (picked1) {
+      this.setAccountLease(workerId, picked1.id)
+      return picked1
     }
 
     this.app.log.info('所有 tgUser 均已达到当前批次上限,重置计数后重新轮询')
     this.accountUsageInBatch.clear()
     this.accountCursor = 0
-    return this.accountCache[0]
+
+    const picked2 = tryPick(true)
+    if (picked2) {
+      this.setAccountLease(workerId, picked2.id)
+      return picked2
+    }
+
+    // 如果全部被排除,说明这一批账号都异常/受限:清空排除集再尝试一次
+    if (this.accountExcludedInBatch.size > 0) {
+      this.app.log.warn('本批次所有 tgUser 均被排除,清空排除列表后重试')
+      this.accountExcludedInBatch.clear()
+      const picked3 = tryPick(true)
+      if (picked3) {
+        this.setAccountLease(workerId, picked3.id)
+        return picked3
+      }
+    }
+
+    // 兜底:若账号不足/都被占用,允许复用(不阻塞任务),但给出明显告警
+    this.app.log.warn(
+      { workerId, totalAccounts: total },
+      '没有可用的未占用 tgUser(账号可能不足或都被占用),worker 将复用账号'
+    )
+    const fallback = tryPick(false) ?? this.accountCache[0]
+    this.setAccountLease(workerId, fallback.id)
+    return fallback
+  }
+
+  private setAccountLease(workerId: number, accountId: string): void {
+    // 一个 worker 只持有一个租约:覆盖前先释放旧租约
+    const old = this.accountLeaseByWorker.get(workerId)
+    if (old && old !== accountId) {
+      this.releaseAccountLease(workerId, old)
+    }
+    const owner = this.accountLeaseOwner.get(accountId)
+    if (owner !== undefined && owner !== workerId) {
+      // 理论上 acquireAccount(respectLease=true) 不会走到这里;兜底覆盖,避免状态不一致
+      this.accountLeaseByWorker.delete(owner)
+    }
+    this.accountLeaseByWorker.set(workerId, accountId)
+    this.accountLeaseOwner.set(accountId, workerId)
+  }
+
+  private releaseAccountLease(workerId: number, accountId: string): void {
+    const owned = this.accountLeaseByWorker.get(workerId)
+    if (owned === accountId) {
+      this.accountLeaseByWorker.delete(workerId)
+    }
+    const owner = this.accountLeaseOwner.get(accountId)
+    if (owner === workerId) {
+      this.accountLeaseOwner.delete(accountId)
+    }
   }
 
   /**

+ 25 - 7
src/schedulers/task.scheduler.ts

@@ -9,11 +9,18 @@ export class TaskScheduler {
   private taskRepo: Repository<Task>
   private taskExecutor: TaskExecutor
   private running = false
-  private readonly taskExecutionTimeoutMs = 3 * 60 * 1000
+  /**
+   * 任务执行超时(ms)
+   * - 默认 0:不启用超时(群拉人/批量任务通常远超 3 分钟)
+   * - 可通过环境变量覆盖:TASK_EXEC_TIMEOUT_MS=3600000
+   */
+  private readonly taskExecutionTimeoutMs: number
 
   private constructor(private app: FastifyInstance) {
     this.taskRepo = app.dataSource.getRepository(Task)
     this.taskExecutor = new TaskExecutor(app)
+    const raw = Number(process.env.TASK_EXEC_TIMEOUT_MS ?? 0)
+    this.taskExecutionTimeoutMs = !Number.isNaN(raw) && raw > 0 ? raw : 0
   }
 
   static getInstance(app: FastifyInstance): TaskScheduler {
@@ -42,13 +49,18 @@ export class TaskScheduler {
     this.running = true
 
     try {
-      // 1️⃣ 是否已有执行中的任务
+      // 1️⃣ 优先执行已处于 SENDING 状态的任务(处理应用重启后任务状态未重置的情况)
       const runningTask = await this.taskRepo.findOne({
-        where: { status: TaskStatus.SENDING, delFlag: false }
+        where: { status: TaskStatus.SENDING, delFlag: false },
+        order: { startedAt: 'ASC' }
       })
-      if (runningTask) return
+      if (runningTask) {
+        // 直接执行已处于 SENDING 状态的任务
+        await this.executeTask(runningTask)
+        return
+      }
 
-      // 2️⃣ 找下一个任务
+      // 2️⃣ 找下一个 QUEUING 状态的任务
       const nextTask = await this.taskRepo.findOne({
         where: { status: TaskStatus.QUEUING, delFlag: false },
         order: { startedAt: 'ASC' }
@@ -103,13 +115,15 @@ export class TaskScheduler {
     }
 
     if (err.message === 'TASK_TIMEOUT') {
-      await this.taskRepo.update(taskId, { status: TaskStatus.CANCELED })
+      // 超时本质属于调度器保护机制,默认改为 PAUSED(避免误杀任务)
+      await this.taskRepo.update(taskId, { status: TaskStatus.PAUSED })
       return
     }
 
     console.error('[TaskScheduler] task failed:', err)
     await this.taskRepo.update(taskId, {
-      status: TaskStatus.CANCELED
+      // 未知错误默认暂停(更安全,支持用户手动 resume)
+      status: TaskStatus.PAUSED
     })
   }
 
@@ -117,6 +131,10 @@ export class TaskScheduler {
    * 写死的超时包装
    */
   private async runWithTimeout(fn: () => Promise<void>): Promise<void> {
+    if (!this.taskExecutionTimeoutMs || this.taskExecutionTimeoutMs <= 0) {
+      await fn()
+      return
+    }
     return Promise.race([
       fn(),
       new Promise((_, reject) => setTimeout(() => reject(new Error('TASK_TIMEOUT')), this.taskExecutionTimeoutMs))

+ 72 - 0
src/services/clients/tg-client.manager.ts

@@ -353,6 +353,60 @@ export class TgClientManager {
     }
   }
 
+  /**
+   * 退出普通群(Chat 类型,没有 accessHash)
+   * @param chatId - 普通群的 chatId(int)
+   */
+  async leaveBasicChat(chatId: string | number): Promise<void> {
+    this.ensureConnected()
+    const cid = Number(chatId)
+    if (Number.isNaN(cid)) {
+      throw new Error(`退出普通群失败: chatId 无效 ${chatId}`)
+    }
+
+    try {
+      await this.client!.invoke(
+        new Api.messages.DeleteChatUser({
+          chatId: bigInt(String(cid)),
+          userId: new Api.InputUserSelf()
+        })
+      )
+      this.app.log.info(`成功退出普通群: ${cid}`)
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      throw new Error(`退出普通群失败: ${errorMessage}`)
+    }
+  }
+
+  /**
+   * 邀请用户加入普通群(Chat 类型)
+   * @param chatId - 普通群的 chatId(int)
+   * @param userEntity - 用户实体(Api.User 等),需要包含 id/accessHash
+   */
+  async inviteUserToBasicChat(chatId: string | number, userEntity: any): Promise<void> {
+    this.ensureConnected()
+    const cid = Number(chatId)
+    if (Number.isNaN(cid)) {
+      throw new Error(`邀请成员到普通群失败: chatId 无效 ${chatId}`)
+    }
+
+    const inputUser = this.toInputUser(userEntity)
+
+    try {
+      await this.client!.invoke(
+        new Api.messages.AddChatUser({
+          chatId: bigInt(String(cid)),
+          userId: inputUser,
+          fwdLimit: 0
+        })
+      )
+      this.app.log.info(`已邀请成员到普通群: ${cid}`)
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      throw new Error(`邀请成员到普通群失败: ${errorMessage}`)
+    }
+  }
+
   /**
    * 通过邀请链接进群后获取群组实体
    * - 用于“拉人进群”任务:需要拿到 chat/channel 实体,进而拿到 accessHash
@@ -653,6 +707,24 @@ export class TgClientManager {
     return '未知错误'
   }
 
+  /**
+   * 将用户实体转换为 InputUser(用于 AddChatUser / InviteToChannel)
+   */
+  private toInputUser(user: any): Api.InputUser {
+    if (user instanceof Api.InputUser) {
+      return user
+    }
+    const id = user?.id
+    const accessHash = user?.accessHash
+    if (id === undefined || id === null || accessHash === undefined || accessHash === null) {
+      throw new Error('无法构造 InputUser:缺少 id/accessHash')
+    }
+    return new Api.InputUser({
+      userId: bigInt(String(id)),
+      accessHash: bigInt(String(accessHash))
+    })
+  }
+
   /**
    * 从邀请链接中提取 hash 值
    * @private