Просмотр исходного кода

更新 Sender 实体的索引,添加 sendInterval、sendBatchSize 和 concurrentCount 字段到 Task 实体,优化 TaskService 的任务处理逻辑,增强错误处理和日志记录,提升代码可读性和稳定性。

wuyi 1 месяц назад
Родитель
Сommit
d3074f1b0d

+ 1 - 1
src/entities/sender.entity.ts

@@ -1,7 +1,7 @@
 import { Column, Entity, Index, PrimaryColumn } from 'typeorm'
 
 @Entity()
-@Index(['delFlag', 'lastUsageTime'])
+@Index('idx_sender_del_usage_last', ['delFlag', 'usageCount', 'lastUsageTime'])
 export class Sender {
   @PrimaryColumn({ type: 'bigint' })
   id: string

+ 9 - 0
src/entities/task.entity.ts

@@ -43,6 +43,15 @@ export class Task {
   @Column({ default: 5 })
   sendLimit: number
 
+  @Column({ default: 5 })
+  sendInterval: number
+
+  @Column({ default: 50 })
+  sendBatchSize: number
+
+  @Column({ default: 5 })
+  concurrentCount: number
+
   @Column({ type: 'datetime', precision: 6, default: null })
   startedAt: Date
 

+ 165 - 75
src/services/task.service.ts

@@ -26,6 +26,8 @@ export class TaskService {
   private readonly pollIntervalMs = 5000
   private readonly taskBatchSize = 50
   private readonly instanceId = `${process.pid}-${Math.random().toString(36).slice(2, 8)}`
+  private processingSince: number | null = null
+  private readonly maxProcessingMs = 2 * 60 * 1000 // 防护:单轮处理超过 2 分钟则自愈
 
   constructor(app: FastifyInstance) {
     this.app = app
@@ -157,6 +159,9 @@ export class TaskService {
       status: TaskStatus.SENDING,
       startedAt: task.startedAt ?? new Date()
     })
+
+    // 立即触发一次发送周期,避免依赖下次轮询
+    void this.taskSendCycle()
   }
 
   async pauseTask(id: number): Promise<void> {
@@ -193,9 +198,17 @@ export class TaskService {
 
   private async taskSendCycle() {
     if (this.processing) {
-      return
+      const now = Date.now()
+      if (this.processingSince && now - this.processingSince > this.maxProcessingMs) {
+        this.app.log.warn('taskSendCycle 卡死自愈,重置 processing 标记')
+        this.processing = false
+      } else {
+        this.app.log.debug?.('taskSendCycle skipped: processing=true')
+        return
+      }
     }
     this.processing = true
+    this.processingSince = Date.now()
     try {
       await this.startTaskSend()
     } catch (error) {
@@ -203,6 +216,7 @@ export class TaskService {
       this.app.log.error(`处理发送任务失败: ${msg}`)
     } finally {
       this.processing = false
+      this.processingSince = null
     }
   }
 
@@ -213,20 +227,30 @@ export class TaskService {
     })
 
     if (!task) {
+      this.app.log.debug?.('taskSendCycle: 未发现发送中的任务')
       return
     }
+    this.app.log.info(
+      `taskSendCycle: 捕获发送任务 id=${task.id}, startedAt=${task.startedAt?.toISOString?.() ?? 'null'}`
+    )
 
     const configuredSendLimit =
       task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
+    const sendIntervalMs = Math.max(0, Number(task.sendInterval ?? 0) * 1000)
+    const batchSize =
+      task.sendBatchSize && Number(task.sendBatchSize) > 0 ? Number(task.sendBatchSize) : this.taskBatchSize
+    const concurrentCount = task.concurrentCount && Number(task.concurrentCount) > 0 ? Number(task.concurrentCount) : 1
+    const batchTotal = batchSize
 
     this.currentSenderSendLimit = configuredSendLimit
     this.senderUsageInBatch.clear()
     this.senderCursor = 0
+    await this.refreshSenderCache()
 
     const pendingItems = await this.taskItemRepository.find({
       where: { taskId: task.id, status: TaskItemStatus.PENDING },
       order: { id: 'ASC' },
-      take: this.taskBatchSize
+      take: batchTotal
     })
 
     if (pendingItems.length === 0) {
@@ -234,27 +258,30 @@ export class TaskService {
       return
     }
 
+    const queue = [...pendingItems]
+    const workerCount = Math.min(concurrentCount, queue.length)
+
+    const workerResults = await Promise.allSettled(
+      Array.from({ length: workerCount }, (_, index) => this.runSenderWorker(task, queue, sendIntervalMs, index))
+    )
+
     let batchSent = 0
     let batchSuccess = 0
-    let batchFailed = 0
-
-    for (const item of pendingItems) {
-      const current = await this.taskRepository.findOne({ where: { id: task.id } })
-      if (!current || current.status !== TaskStatus.SENDING) {
-        this.app.log.info(`任务 ${task.id} 已暂停或停止,终止本批次发送`)
-        break
-      }
 
-      try {
-        await this.sendTaskItem(task, item)
-        batchSuccess++
-      } catch (error) {
-        const msg = error instanceof Error ? error.message : '未知错误'
-        batchFailed++
-        this.app.log.warn(`发送失败 taskId=${task.id}, item=${item.id}: ${msg}`)
+    workerResults.forEach((result, index) => {
+      if (result.status === 'fulfilled') {
+        batchSent += result.value.sent
+        batchSuccess += result.value.success
+      } else {
+        const msg =
+          result.reason instanceof Error
+            ? result.reason.message
+            : typeof result.reason === 'string'
+            ? result.reason
+            : '未知错误'
+        this.app.log.error(`worker=${index} 执行失败: ${msg}`)
       }
-      batchSent++
-    }
+    })
 
     if (batchSent > 0) {
       await this.taskRepository.increment({ id: task.id }, 'sent', batchSent)
@@ -263,21 +290,114 @@ export class TaskService {
       await this.taskRepository.increment({ id: task.id }, 'successCount', batchSuccess)
     }
 
-    if (batchSent < pendingItems.length) {
+    const latest = await this.taskRepository.findOne({ where: { id: task.id } })
+    if (!latest || latest.status !== TaskStatus.SENDING) {
+      this.app.log.info(`任务 ${task.id} 已暂停或停止,本轮结束`)
       return
     }
 
     await this.finalizeTaskIfDone(task.id)
   }
 
-  private async sendTaskItem(task: Task, taskItem: TaskItem): Promise<void> {
-    const sender = await this.pickSender()
-    const sessionString = await this.ensureSessionString(sender)
+  private async finalizeTaskIfDone(taskId: number): Promise<void> {
+    const pendingCount = await this.taskItemRepository.count({
+      where: { taskId, status: TaskItemStatus.PENDING }
+    })
+    if (pendingCount > 0) {
+      return
+    }
+
+    const successCount = await this.taskItemRepository.count({
+      where: { taskId, status: TaskItemStatus.SUCCESS }
+    })
+    const failedCount = await this.taskItemRepository.count({
+      where: { taskId, status: TaskItemStatus.FAILED }
+    })
+
+    await this.taskRepository.update(taskId, {
+      status: TaskStatus.COMPLETED,
+      sent: successCount + failedCount,
+      successCount
+    })
+  }
 
+  private async runSenderWorker(
+    task: Task,
+    queue: TaskItem[],
+    sendIntervalMs: number,
+    workerIndex: number
+  ): Promise<{ sent: number; success: number; failed: number }> {
+    let sent = 0
+    let success = 0
+    let failed = 0
+    let sender: Sender | null = null
     let client: TelegramClient | null = null
-    try {
-      client = await this.tgClientService.connect(sessionString)
+    let senderSentInRound = 0
+
+    while (true) {
+      const taskItem = queue.shift()
+      if (!taskItem) {
+        break
+      }
+
+      const stillSending = await this.isTaskSending(task.id)
+      if (!stillSending) {
+        this.app.log.info(`任务 ${task.id} 已暂停/停止,worker=${workerIndex} 提前结束`)
+        break
+      }
+
+      if (!sender || senderSentInRound >= this.currentSenderSendLimit) {
+        await this.tgClientService.disconnectClient(client)
+        sender = await this.pickSender()
+        const sessionString = await this.ensureSessionString(sender)
+        client = await this.tgClientService.createConnectedClient(sessionString)
+        senderSentInRound = 0
+
+        const me = await client.getMe().catch(() => null)
+        const delaySeconds = this.getRandomDelaySeconds()
+        const displayName = `${me?.firstName ?? ''} ${me?.lastName ?? ''}`.trim() || me?.username || ''
+        this.app.log.info(
+          `worker=${workerIndex} ,当前登录账号: id: ${me?.id ?? sender.id} ,name: ${displayName || sender.id},延迟 ${delaySeconds}s 后开始发送`
+        )
+        await this.sleep(delaySeconds * 1000)
+      }
 
+      try {
+        await this.sendTaskItemWithClient(task, taskItem, sender!, client!, workerIndex)
+        success++
+      } catch (error) {
+        failed++
+        const msg = error instanceof Error ? error.message : '未知错误'
+        this.app.log.warn(
+          `❌ 发送失败 taskId=${task.id}, item=${taskItem.id}, sender=${sender?.id ?? '未知'}, worker=${workerIndex}, error: ${msg}`
+        )
+      } finally {
+        sent++
+        senderSentInRound++
+        if (sender) {
+          const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
+          this.senderUsageInBatch.set(sender.id, used)
+        }
+      }
+
+      if (sendIntervalMs > 0) {
+        await this.sleep(sendIntervalMs)
+      }
+    }
+
+    await this.tgClientService.disconnectClient(client)
+
+    return { sent, success, failed }
+  }
+
+  private async sendTaskItemWithClient(
+    task: Task,
+    taskItem: TaskItem,
+    sender: Sender,
+    client: TelegramClient,
+    workerIndex: number
+  ): Promise<void> {
+    try {
       const parsedTarget = this.parseTarget(taskItem.target)
       if (!parsedTarget) {
         throw new Error('target 格式错误,请检查是否正确')
@@ -294,20 +414,8 @@ export class TaskService {
       }
 
       await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
-
-      try {
-        await this.tgClientService.clearConversation(client, targetPeer)
-      } catch (clearError) {
-        const msg = clearError instanceof Error ? clearError.message : '未知错误'
-        this.app.log.warn(`清除会话失败 [${taskItem.target}]: ${msg}`)
-      }
-
-      try {
-        await this.tgClientService.deleteTempContact(client, (targetPeer as any).id)
-      } catch (deleteError) {
-        const msg = deleteError instanceof Error ? deleteError.message : '未知错误'
-        this.app.log.warn(`删除临时联系人失败 [${taskItem.target}]: ${msg}`)
-      }
+      await this.tgClientService.clearConversation(client, targetPeer).catch(() => {})
+      await this.tgClientService.deleteTempContact(client, (targetPeer as any).id).catch(() => {})
 
       await this.taskItemRepository.update(taskItem.id, {
         status: TaskItemStatus.SUCCESS,
@@ -317,14 +425,7 @@ export class TaskService {
       })
 
       await this.senderService.incrementUsageCount(sender.id)
-
-      const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
-      this.senderUsageInBatch.set(sender.id, used)
-
-      if (used >= this.currentSenderSendLimit) {
-        this.app.log.info(`sender=${sender.id} 已达单次发送上限 ${this.currentSenderSendLimit},切换下一个账号`)
-        await this.tgClientService.disconnect()
-      }
+      this.app.log.info(`✅ 发送成功 taskId=${task.id}, itemId=${taskItem.id}, sender=${sender.id}, worker=${workerIndex}`)
     } catch (error) {
       const msg = error instanceof Error ? error.message : '未知错误'
       await this.taskItemRepository.update(taskItem.id, {
@@ -333,40 +434,30 @@ export class TaskService {
         senderId: sender.id,
         errorMsg: msg
       })
-
-      if (client) {
-        try {
-          await this.tgClientService.disconnect()
-        } catch (disconnectError) {
-          const disconnectMsg = disconnectError instanceof Error ? disconnectError.message : '未知错误'
-          this.app.log.warn(`断开连接失败: ${disconnectMsg}`)
-        }
-      }
-
+      await this.senderService.incrementUsageCount(sender.id)
       throw error
     }
   }
 
-  private async finalizeTaskIfDone(taskId: number): Promise<void> {
-    const pendingCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.PENDING }
+  private async refreshSenderCache(): Promise<void> {
+    this.senderCache = await this.senderRepository.find({
+      where: { delFlag: false },
+      order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
     })
-    if (pendingCount > 0) {
-      return
-    }
+    this.senderCursor = 0
+  }
 
-    const successCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.SUCCESS }
-    })
-    const failedCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.FAILED }
-    })
+  private async isTaskSending(taskId: number): Promise<boolean> {
+    const current = await this.taskRepository.findOne({ where: { id: taskId } })
+    return !!current && current.status === TaskStatus.SENDING && current.delFlag === false
+  }
 
-    await this.taskRepository.update(taskId, {
-      status: TaskStatus.COMPLETED,
-      sent: successCount + failedCount,
-      successCount
-    })
+  private async sleep(ms: number): Promise<void> {
+    return await new Promise(resolve => setTimeout(resolve, ms))
+  }
+
+  private getRandomDelaySeconds(min: number = 10, max: number = 20): number {
+    return Math.floor(Math.random() * (max - min + 1)) + min
   }
 
   private async pickSender(): Promise<Sender> {
@@ -393,7 +484,6 @@ export class TaskService {
       }
     }
 
-    // 所有 sender 均已达到当前批次上限,重置计数重新分配
     this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
     this.senderUsageInBatch.clear()
     this.senderCursor = 0

+ 6 - 6
src/services/test.service.ts

@@ -170,10 +170,10 @@ export class TestService {
               errorMsg: null
             })
             totalSuccess++
-            this.app.log.info(`sender=${sender.id} item=${item.id} 发送成功`)
+            this.app.log.info(`sender=${sender.id} item=${item.id} 发送成功`)
           } catch (error) {
             const msg = error instanceof Error ? error.message : '未知错误'
-            this.app.log.warn(`item=${item.id} 发送失败: ${msg}`)
+            this.app.log.warn(`item=${item.id} 发送失败: ${msg}`)
             try {
               await this.taskItemRepository.update(item.id, {
                 status: TaskItemStatus.FAILED,
@@ -460,7 +460,7 @@ export class TestService {
           const isVerifyAccount = (taskItem as any).isVerifyAccount === true
 
           if (isVerifyAccount) {
-            this.app.log.info(`[验证账户] 发送成功: ${taskItem.target}`)
+            this.app.log.info(`[验证账户] 发送成功: ${taskItem.target}`)
           } else {
             if (taskItem.id) {
               try {
@@ -482,7 +482,7 @@ export class TestService {
           const errorMessage = error instanceof Error ? error.message : '未知错误'
 
           if (isVerifyAccount) {
-            this.app.log.warn(`[验证账户] 发送失败 [${taskItem.target}]: ${errorMessage}`)
+            this.app.log.warn(`[验证账户] 发送失败 [${taskItem.target}]: ${errorMessage}`)
           } else {
             if (taskItem.id) {
               try {
@@ -497,7 +497,7 @@ export class TestService {
                 this.app.log.warn(`更新 taskItem 失败状态失败 [${taskItem.target}]: ${updateErrorMessage}`)
               }
             }
-            this.app.log.warn(`发送失败 [${taskItem.target}]: ${errorMessage}`)
+            this.app.log.warn(`发送失败 [${taskItem.target}]: ${errorMessage}`)
           }
           totalFailedCount++
         }
@@ -539,7 +539,7 @@ export class TestService {
       }
     } catch (error) {
       const errorMessage = error instanceof Error ? error.message : '未知错误'
-      this.app.log.error(`测试发送失败: ${errorMessage}`)
+      this.app.log.error(`测试发送失败: ${errorMessage}`)
       return {
         success: false,
         message: '测试发送消息时发生错误',

+ 2 - 2
src/services/tg-msg-send.service.ts

@@ -33,7 +33,7 @@ export class TgMsgSendService {
       }
 
       const result = await this.tgClientService.sendMessageToPeer(client, targetPeer, message)
-      this.app.log.info('消息发送成功', result.id)
+      this.app.log.info(`✅ 消息发送成功`, result.id)
 
       await this.tgClientService.clearConversation(client, targetPeer)
       this.app.log.info('会话清除成功')
@@ -52,7 +52,7 @@ export class TgMsgSendService {
       }
     } catch (error) {
       const errorMessage = this.extractErrorMessage(error)
-      this.app.log.error({ msg: '发送消息失败', error: errorMessage })
+      this.app.log.error({ msg: '发送消息失败', error: errorMessage })
       return {
         success: false,
         error: errorMessage

+ 81 - 37
src/services/tgClient.service.ts

@@ -19,6 +19,13 @@ export class TgClientService {
   private apiId: number
   private apiHash: string
   private initPromise: Promise<void> | null = null
+  private readonly defaultConnectionOptions = {
+    connectionRetries: 5,
+    retryDelay: 2000,
+    requestRetries: 5,
+    useWSS: true
+  }
+  private activeClients: Set<TelegramClient> = new Set()
 
   private constructor() {}
 
@@ -58,37 +65,64 @@ export class TgClientService {
       await this.disconnect()
     }
 
-    const stringSession = new StringSession(sessionString)
-    const connectionOptions = {
-      connectionRetries: 5,
-      retryDelay: 2000,
-      requestRetries: 5,
-      useWSS: true
+    this.client = await this.createConnectedClient(sessionString)
+    this.currentSession = sessionString
+    return this.client
+  }
+
+  async disconnect(): Promise<void> {
+    if (!this.client) {
+      return
+    }
+
+    try {
+      if (this.client.connected) {
+        await this.client.disconnect()
+        this.app.log.info('TelegramClient 已断开连接')
+        await this.client.destroy()
+        this.app.log.info('TelegramClient 已销毁')
+      }
+      this.activeClients.delete(this.client)
+      this.logActiveClientCount()
+    } catch (error) {
+      const errorMessage = error instanceof Error ? error.message : String(error)
+      if (!errorMessage.includes('TIMEOUT')) {
+        this.app.log.error({ msg: '断开连接时发生错误', error: errorMessage })
+      }
+    } finally {
+      this.client = null
+      this.currentSession = null
     }
+  }
+
+  async createConnectedClient(sessionString: string): Promise<TelegramClient> {
+    await this.initializeApp()
 
-    this.client = new TelegramClient(stringSession, this.apiId, this.apiHash, connectionOptions)
+    const stringSession = new StringSession(sessionString)
+    const connectionOptions = { ...this.defaultConnectionOptions }
 
     this.app.log.info(
       `正在建立连接... useWSS=${connectionOptions.useWSS ? 'true' : 'false'} (443 优先), retries=${
         connectionOptions.connectionRetries
       }`
     )
+
+    const client = new TelegramClient(stringSession, this.apiId, this.apiHash, connectionOptions)
+
     try {
-      await this.client.connect()
+      await client.connect()
     } catch (err) {
       const errorMessage = err instanceof Error ? err.message : String(err)
       this.app.log.error({ msg: '连接 Telegram 失败', error: errorMessage, useWSS: connectionOptions.useWSS })
       throw err
     }
 
-    if (!this.client.connected) {
+    if (!client.connected) {
       throw new Error('TelegramClient 连接失败,请检查网络或 Session 是否有效')
     }
 
-    this.currentSession = sessionString
     this.app.log.info('TelegramClient 连接成功,正在获取账号信息...')
-
-    const me = await this.client.getMe()
+    const me = await client.getMe()
     if (me) {
       this.app.log.info(
         `当前登录账号: id: ${me.id} ,name: ${me.firstName || ''} ${me.lastName || ''} ${me.username || ''}`.trim()
@@ -98,32 +132,42 @@ export class TgClientService {
     }
     this.app.log.info('账号信息获取完成')
 
-    return this.client
+    this.activeClients.add(client)
+    this.logActiveClientCount()
+
+    return client
   }
 
-  async disconnect(): Promise<void> {
-    if (!this.client) {
+  async disconnectClient(client: TelegramClient | null): Promise<void> {
+    if (!client) {
       return
     }
 
     try {
-      if (this.client.connected) {
-        await this.client.disconnect()
+      if (client.connected) {
+        await client.disconnect()
         this.app.log.info('TelegramClient 已断开连接')
-        await this.client.destroy()
-        this.app.log.info('TelegramClient 已销毁')
       }
+      await client.destroy()
+      this.app.log.info('TelegramClient 已销毁')
+      this.activeClients.delete(client)
+      this.logActiveClientCount()
     } catch (error) {
       const errorMessage = error instanceof Error ? error.message : String(error)
       if (!errorMessage.includes('TIMEOUT')) {
         this.app.log.error({ msg: '断开连接时发生错误', error: errorMessage })
       }
-    } finally {
-      this.client = null
-      this.currentSession = null
     }
   }
 
+  getActiveClientCount(): number {
+    return this.activeClients.size
+  }
+
+  private logActiveClientCount(): void {
+    this.app?.log?.info?.(`当前活跃 Telegram 客户端数量: ${this.activeClients.size}`)
+  }
+
   async getTargetPeer(client: TelegramClient, parsedTarget: string | number): Promise<any> {
     this.app.log.info('正在获取目标实体信息...')
 
@@ -153,29 +197,29 @@ export class TgClientService {
         targetPeer = await client.getEntity(parsedTarget)
       }
 
-      if (targetPeer) {
-        this.logTargetInfo(targetPeer)
-      }
+      // if (targetPeer) {
+      //   this.logTargetInfo(targetPeer)
+      // }
 
       return targetPeer
     } catch (error) {
       const errorMessage = this.extractErrorMessage(error)
 
       if (typeof parsedTarget === 'number') {
-        this.app.log.error({
-          msg: '无法获取用户实体',
-          error: errorMessage,
-          targetId: parsedTarget.toString()
-        })
-        throw new Error('target 无效,不在用户消息列表中')
+        // this.app.log.error({
+        //   msg: '无法获取用户实体',
+        //   error: errorMessage,
+        //   targetId: parsedTarget.toString()
+        // })
+        throw new Error(`无法获取用户实体: ${errorMessage}`)
       }
 
-      this.app.log.error({
-        msg: '无法获取目标信息',
-        error: errorMessage,
-        target: parsedTarget
-      })
-      throw new Error('target 无效,请检查 target 是否正确')
+      // this.app.log.error({
+      //   msg: '无法获取目标信息',
+      //   error: errorMessage,
+      //   target: parsedTarget
+      // })
+      throw new Error(`无法获取目标信息: ${errorMessage}`)
     }
   }