Selaa lähdekoodia

增强任务执行器的账号管理逻辑,添加账号租约机制以避免并发 worker 复用同一 tgUser,优化账号获取和释放流程,提升代码可读性和可维护性。

wuyi 6 päivää sitten
vanhempi
commit
344dafb82f
1 muutettua tiedostoa jossa 83 lisäystä ja 16 poistoa
  1. 83 16
      src/executor/task.executor.ts

+ 83 - 16
src/executor/task.executor.ts

@@ -34,6 +34,9 @@ export class TaskExecutor {
   private accountCache: TgUser[] = []
   // 本批次临时排除的账号(连接/功能异常等),避免在同一轮里反复选中导致任务整体失败
   private accountExcludedInBatch: Set<string> = new Set()
+  // 并发 worker 的账号租约:同一时刻尽量保证一个 tgUser 只被一个 worker 使用
+  private accountLeaseByWorker: Map<number, string> = new Map()
+  private accountLeaseOwner: Map<string, number> = new Map()
 
   constructor(private app: FastifyInstance) {
     const ds = app.dataSource
@@ -56,6 +59,8 @@ export class TaskExecutor {
         task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultAccountLimit
       this.accountUsageInBatch.clear()
       this.accountCursor = 0
+      this.accountLeaseByWorker.clear()
+      this.accountLeaseOwner.clear()
       // 计算任务需要拉取的账号池大小
       const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
       this.currentAccountCacheTake = this.computeAccountCacheTake(task, concurrency)
@@ -120,7 +125,7 @@ export class TaskExecutor {
     const workers: Promise<void>[] = []
 
     for (let i = 0; i < concurrency; i++) {
-      workers.push(this.workerLoop(task.id))
+      workers.push(this.workerLoop(task.id, i))
     }
 
     await Promise.all(workers)
@@ -129,7 +134,7 @@ export class TaskExecutor {
   /**
    * 单 worker 循环
    */
-  private async workerLoop(taskId: number): Promise<void> {
+  private async workerLoop(taskId: number, workerId: number): Promise<void> {
     let tgUser: TgUser | null = null
     const workerTgClient = new TgClientManager()
     let accountUsageInRound = 0
@@ -162,7 +167,11 @@ export class TaskExecutor {
           // 换号前:若当前 tgUser 已加入群聊,则先退出群聊再断开
           await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
           await workerTgClient.disconnect()
-          tgUser = await this.pickAccount()
+          // 释放旧租约后再重新获取,避免一个 worker 永久占用账号
+          if (tgUser) {
+            this.releaseAccountLease(workerId, tgUser.id)
+          }
+          tgUser = await this.acquireAccount(workerId)
           const sessionString = await this.ensureSessionString(tgUser)
 
           try {
@@ -181,6 +190,7 @@ export class TaskExecutor {
               'TelegramClient connect failed, rotate account'
             )
             this.accountExcludedInBatch.add(tgUser.id)
+            this.releaseAccountLease(workerId, tgUser.id)
             tgUser = null
             accountUsageInRound = 0
             inviteGroupEntity = null
@@ -213,6 +223,7 @@ export class TaskExecutor {
             // 换号前:退出群聊(如果已加入)
             await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
             await workerTgClient.disconnect().catch(() => {})
+            this.releaseAccountLease(workerId, tgUser.id)
             tgUser = null
             accountUsageInRound = 0
             inviteGroupEntity = null
@@ -226,6 +237,9 @@ export class TaskExecutor {
           } else if (tgUser) {
             this.accountExcludedInBatch.add(tgUser.id)
           }
+          if (tgUser) {
+            this.releaseAccountLease(workerId, tgUser.id)
+          }
           await workerTgClient.disconnect().catch(() => {})
           tgUser = null
           accountUsageInRound = 0
@@ -248,6 +262,9 @@ export class TaskExecutor {
       // worker 结束前:尽量退出群聊,避免账号一直挂在群里
       await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
       await workerTgClient.disconnect()
+      if (tgUser) {
+        this.releaseAccountLease(workerId, tgUser.id)
+      }
     }
   }
 
@@ -381,6 +398,8 @@ export class TaskExecutor {
     workerTgClient: TgClientManager,
     inviteGroupEntity: any | null
   ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
+    // 注意:邀请失败时也需要能拿到已加入的群实体用于退群
+    let groupEntity: any | null = inviteGroupEntity
     try {
       const inviteLink = String(task.payload?.inviteLink ?? '').trim()
       if (!inviteLink) {
@@ -403,7 +422,6 @@ export class TaskExecutor {
       }
 
       // tgUser 加入群组,获取群组实体(每个账号缓存一次)
-      let groupEntity = inviteGroupEntity
       if (!groupEntity) {
         groupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
       }
@@ -466,8 +484,11 @@ export class TaskExecutor {
       await this.taskRepo.increment({ id: task.id }, 'processed', 1)
 
       this.app.log.warn(`❌ 邀请失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
+      // 关键:邀请失败时也要尽量退出刚加入的群,避免账号一直挂在群里
+      await this.safeLeaveInviteGroup(workerTgClient, groupEntity).catch(() => {})
       // INVITE_TO_GROUP:按需求,失败就换号继续(避免单号被冻结/受限导致整体停滞)
-      return { rotateAccount: true, reason: msg }
+      // 同时清空缓存,避免后续误以为仍在群内
+      return { rotateAccount: true, reason: msg, inviteGroupEntity: null }
     }
   }
 
@@ -560,9 +581,9 @@ export class TaskExecutor {
   }
 
   /**
-   * 选择可用的tgUser 账号
+   * 获取账号(带 worker 租约,尽量避免并发 worker 复用同一 tgUser)
    */
-  private async pickAccount(): Promise<TgUser> {
+  private async acquireAccount(workerId: number): Promise<TgUser> {
     if (this.accountCache.length === 0) {
       this.accountCache = await this.senderRepo.find({
         where: { delFlag: false },
@@ -577,11 +598,15 @@ export class TaskExecutor {
     }
 
     const total = this.accountCache.length
-    const tryPick = (): TgUser | null => {
+    const tryPick = (respectLease: boolean): TgUser | null => {
       for (let i = 0; i < total; i++) {
         const index = (this.accountCursor + i) % total
         const account = this.accountCache[index]
         if (this.accountExcludedInBatch.has(account.id)) continue
+        if (respectLease) {
+          const leasedBy = this.accountLeaseOwner.get(account.id)
+          if (leasedBy !== undefined && leasedBy !== workerId) continue
+        }
         const used = this.accountUsageInBatch.get(account.id) ?? 0
         if (used < this.currentAccountLimit) {
           this.accountCursor = (index + 1) % total
@@ -591,26 +616,68 @@ export class TaskExecutor {
       return null
     }
 
-    const picked1 = tryPick()
-    if (picked1) return picked1
+    // 先按“租约”挑选:一个账号尽量只服务一个 worker
+    const picked1 = tryPick(true)
+    if (picked1) {
+      this.setAccountLease(workerId, picked1.id)
+      return picked1
+    }
 
     this.app.log.info('所有 tgUser 均已达到当前批次上限,重置计数后重新轮询')
     this.accountUsageInBatch.clear()
     this.accountCursor = 0
 
-    const picked2 = tryPick()
-    if (picked2) return picked2
+    const picked2 = tryPick(true)
+    if (picked2) {
+      this.setAccountLease(workerId, picked2.id)
+      return picked2
+    }
 
     // 如果全部被排除,说明这一批账号都异常/受限:清空排除集再尝试一次
     if (this.accountExcludedInBatch.size > 0) {
       this.app.log.warn('本批次所有 tgUser 均被排除,清空排除列表后重试')
       this.accountExcludedInBatch.clear()
-      const picked3 = tryPick()
-      if (picked3) return picked3
+      const picked3 = tryPick(true)
+      if (picked3) {
+        this.setAccountLease(workerId, picked3.id)
+        return picked3
+      }
     }
 
-    // 兜底:返回第一个,避免直接抛错导致任务中断(但大概率会在连接失败后再次被排除并轮换)
-    return this.accountCache[0]
+    // 兜底:若账号不足/都被占用,允许复用(不阻塞任务),但给出明显告警
+    this.app.log.warn(
+      { workerId, totalAccounts: total },
+      '没有可用的未占用 tgUser(账号可能不足或都被占用),worker 将复用账号'
+    )
+    const fallback = tryPick(false) ?? this.accountCache[0]
+    this.setAccountLease(workerId, fallback.id)
+    return fallback
+  }
+
+  private setAccountLease(workerId: number, accountId: string): void {
+    // 一个 worker 只持有一个租约:覆盖前先释放旧租约
+    const old = this.accountLeaseByWorker.get(workerId)
+    if (old && old !== accountId) {
+      this.releaseAccountLease(workerId, old)
+    }
+    const owner = this.accountLeaseOwner.get(accountId)
+    if (owner !== undefined && owner !== workerId) {
+      // 理论上 acquireAccount(respectLease=true) 不会走到这里;兜底覆盖,避免状态不一致
+      this.accountLeaseByWorker.delete(owner)
+    }
+    this.accountLeaseByWorker.set(workerId, accountId)
+    this.accountLeaseOwner.set(accountId, workerId)
+  }
+
+  private releaseAccountLease(workerId: number, accountId: string): void {
+    const owned = this.accountLeaseByWorker.get(workerId)
+    if (owned === accountId) {
+      this.accountLeaseByWorker.delete(workerId)
+    }
+    const owner = this.accountLeaseOwner.get(accountId)
+    if (owner === workerId) {
+      this.accountLeaseOwner.delete(accountId)
+    }
   }
 
   /**