Răsfoiți Sursa

添加任务状态 QUEUING 和 CANCELED,优化 TaskService 的任务启动逻辑,增强错误处理,确保任务状态管理更加严谨。同时,重构 TgMsgSendService,添加任务发送轮询和处理机制,提升代码可读性和稳定性。

wuyi 4 săptămâni în urmă
părinte
comite
60bc1416b5
3 a modificat fișierele cu 561 adăugiri și 462 ștergeri
  1. 2 0
      src/entities/task.entity.ts
  2. 11 460
      src/services/task.service.ts
  3. 548 2
      src/services/tg-msg-send.service.ts

+ 2 - 0
src/entities/task.entity.ts

@@ -3,7 +3,9 @@ import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, Update
 export enum TaskStatus {
   PENDING = 'pending',
   SENDING = 'sending',
+  QUEUING = 'queuing',
   PAUSED = 'paused',
+  CANCELED = 'canceled',
   COMPLETED = 'completed'
 }
 

+ 11 - 460
src/services/task.service.ts

@@ -1,44 +1,19 @@
 import { Repository } from 'typeorm'
 import { FastifyInstance } from 'fastify'
-import { Api, TelegramClient } from 'telegram'
 import { Task, TaskStatus } from '../entities/task.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { PaginationResponse } from '../dto/common.dto'
-import { Sender } from '../entities/sender.entity'
-import { SenderService } from './sender.service'
-import { TgClientService } from './tgClient.service'
-import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
 
 export class TaskService {
   private taskRepository: Repository<Task>
   private taskItemRepository: Repository<TaskItem>
   private app: FastifyInstance
-  private processing = false
-  private static schedulerStarted = false
-  private senderRepository: Repository<Sender>
-  private senderService: SenderService
-  private tgClientService: TgClientService
   private readonly defaultSenderSendLimit = 5
-  private currentSenderSendLimit = this.defaultSenderSendLimit
-  private senderUsageInBatch: Map<string, number> = new Map()
-  private senderCursor = 0
-  private senderCache: Sender[] = []
-  private readonly pollIntervalMs = 5000
-  private readonly taskBatchSize = 50
-  private readonly instanceId = `${process.pid}-${Math.random().toString(36).slice(2, 8)}`
-  private processingSince: number | null = null
-  private readonly maxProcessingMs = 2 * 60 * 1000
 
   constructor(app: FastifyInstance) {
     this.app = app
     this.taskRepository = app.dataSource.getRepository(Task)
     this.taskItemRepository = app.dataSource.getRepository(TaskItem)
-    this.senderRepository = app.dataSource.getRepository(Sender)
-    this.tgClientService = TgClientService.getInstance()
-    this.senderService = new SenderService(app)
-    this.app.addHook('onReady', async () => {
-      this.scheduleTaskSend()
-    })
   }
 
   async create(data: {
@@ -151,16 +126,23 @@ export class TaskService {
     if (task.delFlag) {
       throw new Error('任务已被删除')
     }
-    if (![TaskStatus.PENDING, TaskStatus.PAUSED].includes(task.status as TaskStatus)) {
-      throw new Error('当前状态不可启动')
+
+    if (task.status === TaskStatus.QUEUING || task.status === TaskStatus.SENDING) {
+      return
+    }
+
+    if (
+      task.status === TaskStatus.COMPLETED ||
+      task.status === TaskStatus.CANCELED ||
+      task.status === TaskStatus.PAUSED
+    ) {
+      throw new Error(`任务已结束: ${task.status},无法启动`)
     }
 
     await this.taskRepository.update(id, {
       status: TaskStatus.SENDING,
       startedAt: task.startedAt ?? new Date()
     })
-
-    void this.taskSendCycle()
   }
 
   async pauseTask(id: number): Promise<void> {
@@ -177,435 +159,4 @@ export class TaskService {
 
     await this.taskRepository.update(id, { status: TaskStatus.PAUSED })
   }
-
-  private scheduleTaskSend() {
-    if (TaskService.schedulerStarted) {
-      return
-    }
-    const interval = setInterval(() => void this.taskSendCycle(), this.pollIntervalMs)
-    TaskService.schedulerStarted = true
-
-    this.app.addHook('onClose', async () => {
-      clearInterval(interval)
-      TaskService.schedulerStarted = false
-    })
-
-    this.app.log.info(
-      `任务发送轮询已启动,间隔=${this.pollIntervalMs}ms,实例=${this.instanceId}, 批次=${this.taskBatchSize}`
-    )
-  }
-
-  private async taskSendCycle() {
-    if (this.processing) {
-      const now = Date.now()
-      if (this.processingSince && now - this.processingSince > this.maxProcessingMs) {
-        this.app.log.warn('taskSendCycle 脱离卡死,重置 processing')
-        this.processing = false
-      } else {
-        return
-      }
-    }
-    this.processing = true
-    this.processingSince = Date.now()
-    try {
-      await this.startTaskSend()
-    } catch (error) {
-      const msg = error instanceof Error ? error.message : '未知错误'
-      const isDbConnectionError =
-        msg.includes('ECONNRESET') ||
-        msg.includes('EPIPE') ||
-        msg.includes('ETIMEDOUT') ||
-        msg.includes('ENOTFOUND') ||
-        msg.includes('Connection lost') ||
-        msg.includes('Connection closed')
-
-      if (isDbConnectionError) {
-        this.app.log.debug(`数据库连接异常,跳过本次轮询: ${msg}`)
-      } else {
-        const stack = error instanceof Error ? error.stack : undefined
-        this.app.log.error(`处理发送任务失败: ${msg}${stack ? `; stack=${stack}` : ''}`)
-      }
-    } finally {
-      this.processing = false
-      this.processingSince = null
-    }
-  }
-
-  private async startTaskSend() {
-    const task = await this.taskRepository.findOne({
-      where: { status: TaskStatus.SENDING, delFlag: false },
-      order: { startedAt: 'ASC' }
-    })
-
-    if (!task) {
-      return
-    }
-    this.app.log.info(`开始发送任务 id=${task.id}, startedAt=${task.startedAt?.toISOString}`)
-
-    const configuredSendLimit =
-      task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
-    const sendIntervalMs = Math.max(0, Number(task.sendInterval ?? 0) * 1000)
-    const batchSize =
-      task.sendBatchSize && Number(task.sendBatchSize) > 0 ? Number(task.sendBatchSize) : this.taskBatchSize
-    const concurrentCount = task.concurrentCount && Number(task.concurrentCount) > 0 ? Number(task.concurrentCount) : 1
-    const batchTotal = batchSize
-
-    this.currentSenderSendLimit = configuredSendLimit
-    this.senderUsageInBatch.clear()
-    this.senderCursor = 0
-    await this.refreshSenderCache()
-
-    const pendingItems = await this.taskItemRepository.find({
-      where: { taskId: task.id, status: TaskItemStatus.PENDING },
-      order: { id: 'ASC' },
-      take: batchTotal
-    })
-
-    if (pendingItems.length === 0) {
-      await this.finalizeTaskIfDone(task.id)
-      return
-    }
-
-    const queue = [...pendingItems]
-    const workerCount = Math.min(concurrentCount, queue.length)
-
-    const workerResults = await Promise.allSettled(
-      Array.from({ length: workerCount }, (_, index) => this.runSenderWorker(task, queue, sendIntervalMs, index))
-    )
-
-    let batchSent = 0
-    let batchSuccess = 0
-
-    workerResults.forEach((result, index) => {
-      if (result.status === 'fulfilled') {
-        batchSent += result.value.sent
-        batchSuccess += result.value.success
-      } else {
-        const msg =
-          result.reason instanceof Error
-            ? result.reason.message
-            : typeof result.reason === 'string'
-            ? result.reason
-            : '未知错误'
-        this.app.log.error(`worker=${index} 执行失败: ${msg}`)
-      }
-    })
-
-    if (batchSent > 0) {
-      await this.taskRepository.increment({ id: task.id }, 'sent', batchSent)
-    }
-    if (batchSuccess > 0) {
-      await this.taskRepository.increment({ id: task.id }, 'successCount', batchSuccess)
-    }
-
-    const latest = await this.taskRepository.findOne({ where: { id: task.id } })
-    if (!latest || latest.status !== TaskStatus.SENDING) {
-      this.app.log.info(`任务 ${task.id} 已暂停或停止,本轮结束`)
-      return
-    }
-
-    await this.finalizeTaskIfDone(task.id)
-  }
-
-  private async finalizeTaskIfDone(taskId: number): Promise<void> {
-    const pendingCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.PENDING }
-    })
-    if (pendingCount > 0) {
-      return
-    }
-
-    const successCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.SUCCESS }
-    })
-    const failedCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.FAILED }
-    })
-
-    await this.taskRepository.update(taskId, {
-      status: TaskStatus.COMPLETED,
-      sent: successCount + failedCount,
-      successCount
-    })
-  }
-
-  private async runSenderWorker(
-    task: Task,
-    queue: TaskItem[],
-    sendIntervalMs: number,
-    workerIndex: number
-  ): Promise<{ sent: number; success: number; failed: number }> {
-    let sent = 0
-    let success = 0
-    let failed = 0
-    let sender: Sender | null = null
-    let client: TelegramClient | null = null
-    let senderSentInRound = 0
-
-    while (true) {
-      const taskItem = queue.shift()
-      if (!taskItem) {
-        break
-      }
-
-      const stillSending = await this.isTaskSending(task.id)
-      if (!stillSending) {
-        this.app.log.info(`任务 ${task.id} 已暂停/停止,worker=${workerIndex} 提前结束`)
-        break
-      }
-
-      if (!sender || senderSentInRound >= this.currentSenderSendLimit) {
-        await this.tgClientService.disconnectClient(client)
-        sender = await this.pickSender()
-        const sessionString = await this.ensureSessionString(sender)
-        try {
-          client = await this.tgClientService.createConnectedClient(sessionString)
-        } catch (error) {
-          const msg = error instanceof Error ? error.message : String(error)
-          if (this.isSessionRevokedMessage(msg)) {
-            await this.handleSessionRevoked(sender)
-            sender = null
-            client = null
-            continue
-          }
-          throw error
-        }
-        senderSentInRound = 0
-
-        const me = await client.getMe().catch(() => null)
-        const delaySeconds = this.getRandomDelaySeconds()
-        const displayName = `${me?.firstName ?? ''} ${me?.lastName ?? ''}`.trim() || me?.username || ''
-        this.app.log.info(
-          `worker=${workerIndex} ,当前登录账号: id: ${me?.id ?? sender.id} ,name: ${
-            displayName || sender.id
-          },延迟 ${delaySeconds}s 后开始发送`
-        )
-        await this.sleep(delaySeconds * 1000)
-      }
-
-      try {
-        await this.sendTaskItemWithClient(task, taskItem, sender!, client!, workerIndex)
-        success++
-      } catch (error) {
-        failed++
-        const msg = error instanceof Error ? error.message : '未知错误'
-        if (sender && this.isSessionRevokedMessage(msg)) {
-          await this.handleSessionRevoked(sender)
-          await this.tgClientService.disconnectClient(client)
-          sender = null
-          client = null
-          senderSentInRound = 0
-        }
-        this.app.log.warn(
-          `❌ 发送失败 taskId=${task.id}, item=${taskItem.id}, sender=${
-            sender?.id ?? '未知'
-          }, worker=${workerIndex}, error: ${msg}`
-        )
-      } finally {
-        sent++
-        senderSentInRound++
-        if (sender) {
-          const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
-          this.senderUsageInBatch.set(sender.id, used)
-        }
-      }
-
-      if (sendIntervalMs > 0) {
-        await this.sleep(sendIntervalMs)
-      }
-    }
-
-    await this.tgClientService.disconnectClient(client)
-
-    return { sent, success, failed }
-  }
-
-  private async sendTaskItemWithClient(
-    task: Task,
-    taskItem: TaskItem,
-    sender: Sender,
-    client: TelegramClient,
-    workerIndex: number
-  ): Promise<void> {
-    try {
-      const parsedTarget = this.parseTarget(taskItem.target)
-      if (!parsedTarget) {
-        throw new Error('target 格式错误,请检查是否正确')
-      }
-
-      const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
-      if (!targetPeer) {
-        throw new Error('target 无效,无法获取目标信息')
-      }
-
-      const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
-      if (!canSendMessage) {
-        throw new Error('目标用户不允许接收消息或已被限制')
-      }
-
-      await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
-      await this.tgClientService.clearConversation(client, targetPeer).catch(() => {})
-      await this.tgClientService.deleteTempContact(client, (targetPeer as any).id).catch(() => {})
-
-      await this.taskItemRepository.update(taskItem.id, {
-        status: TaskItemStatus.SUCCESS,
-        sentAt: new Date(),
-        senderId: sender.id,
-        errorMsg: null
-      })
-
-      await this.senderService.incrementUsageCount(sender.id)
-      this.app.log.info(
-        `✅ 发送成功 taskId=${task.id}, itemId=${taskItem.id}, sender=${sender.id}, worker=${workerIndex}`
-      )
-    } catch (error) {
-      const msg = error instanceof Error ? error.message : '未知错误'
-      await this.taskItemRepository.update(taskItem.id, {
-        status: TaskItemStatus.FAILED,
-        sentAt: new Date(),
-        senderId: sender.id,
-        errorMsg: msg
-      })
-      await this.senderService.incrementUsageCount(sender.id)
-      throw error
-    }
-  }
-
-  private async refreshSenderCache(): Promise<void> {
-    this.senderCache = await this.senderRepository.find({
-      where: { delFlag: false },
-      order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
-    })
-    this.senderCursor = 0
-  }
-
-  private async isTaskSending(taskId: number): Promise<boolean> {
-    const current = await this.taskRepository.findOne({ where: { id: taskId } })
-    return !!current && current.status === TaskStatus.SENDING && current.delFlag === false
-  }
-
-  private async sleep(ms: number): Promise<void> {
-    return await new Promise(resolve => setTimeout(resolve, ms))
-  }
-
-  private getRandomDelaySeconds(min: number = 10, max: number = 20): number {
-    return Math.floor(Math.random() * (max - min + 1)) + min
-  }
-
-  private async pickSender(): Promise<Sender> {
-    if (this.senderCache.length === 0) {
-      this.senderCache = await this.senderRepository.find({
-        where: { delFlag: false },
-        order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
-      })
-      this.senderCursor = 0
-    }
-
-    if (this.senderCache.length === 0) {
-      throw new Error('暂无可用 sender 账号')
-    }
-
-    const total = this.senderCache.length
-    for (let i = 0; i < total; i++) {
-      const index = (this.senderCursor + i) % total
-      const sender = this.senderCache[index]
-      const used = this.senderUsageInBatch.get(sender.id) ?? 0
-      if (used < this.currentSenderSendLimit) {
-        this.senderCursor = (index + 1) % total
-        return sender
-      }
-    }
-
-    this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
-    this.senderUsageInBatch.clear()
-    this.senderCursor = 0
-    return this.senderCache[0]
-  }
-
-  private async ensureSessionString(sender: Sender): Promise<string> {
-    if (sender.sessionStr) {
-      return sender.sessionStr
-    }
-
-    if (sender.dcId && sender.authKey) {
-      const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
-      await this.senderRepository.update(sender.id, { sessionStr: session })
-      return session
-    }
-
-    throw new Error(`sender=${sender.id} 缺少 session 信息`)
-  }
-
-  private isSessionRevokedMessage(msg: string): boolean {
-    return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
-  }
-
-  private async handleSessionRevoked(sender: Sender): Promise<void> {
-    await this.senderRepository.update(sender.id, { delFlag: true })
-    this.senderCache = this.senderCache.filter(s => s.id !== sender.id)
-    this.senderCursor = 0
-    this.app.log.warn(`sender=${sender.id} session 失效,已删除`)
-  }
-
-  private parseTarget(targetId: string): string | number | null {
-    const trimmed = targetId.trim()
-
-    if (trimmed.startsWith('@') || trimmed.startsWith('+')) {
-      return trimmed
-    }
-
-    const phoneRegex = /^\d+$/
-    if (phoneRegex.test(trimmed)) {
-      return `+${trimmed}`
-    }
-
-    const integerRegex = /^-?\d+$/
-    if (integerRegex.test(trimmed)) {
-      return Number(trimmed)
-    }
-
-    return null
-  }
-
-  private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
-    try {
-      const fullUser = await client.invoke(
-        new Api.users.GetFullUser({
-          id: targetPeer
-        })
-      )
-
-      const fullUserData = fullUser.fullUser as any
-
-      if (fullUserData?.blocked) {
-        return false
-      }
-
-      if (targetPeer.bot && targetPeer.botChatHistory === false) {
-        return false
-      }
-
-      if (targetPeer.deleted) {
-        return false
-      }
-
-      if (targetPeer.fake || targetPeer.scam) {
-        return false
-      }
-
-      return true
-    } catch (error) {
-      const errorMessage = error instanceof Error ? error.message : '未知错误'
-
-      if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
-        throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
-      }
-
-      if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
-        return false
-      }
-
-      return true
-    }
-  }
 }

+ 548 - 2
src/services/tg-msg-send.service.ts

@@ -1,16 +1,54 @@
+import { Repository } from 'typeorm'
 import { TelegramClient } from 'telegram'
+import { FastifyInstance } from 'fastify'
+import { Api } from 'telegram'
 import { SendMessageResult } from '../dto/tg-msg-send.dto'
 import { TgClientService } from './tgClient.service'
+import { Task, TaskStatus } from '../entities/task.entity'
+import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
+import { Sender } from '../entities/sender.entity'
+import { SenderService } from './sender.service'
+import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
 
 export class TgMsgSendService {
-  private app: any
+  private app: FastifyInstance
   private tgClientService: TgClientService
+  private taskRepository: Repository<Task>
+  private taskItemRepository: Repository<TaskItem>
+  private senderRepository: Repository<Sender>
+  private senderService: SenderService
+  private processing = false
+  private static schedulerStarted = false
+  private readonly pollIntervalMs = 5000
+  private readonly instanceId = `${process.pid}-${Math.random().toString(36).slice(2, 8)}`
+  private processingSince: number | null = null
+  private readonly maxProcessingMs = 2 * 60 * 1000
+  private readonly defaultSenderSendLimit = 5
+  private currentSenderSendLimit = this.defaultSenderSendLimit
+  private senderUsageInBatch: Map<string, number> = new Map()
+  private senderCursor = 0
+  private senderCache: Sender[] = []
+  private readonly taskBatchSize = 50
 
-  constructor(app: any) {
+  constructor(app: FastifyInstance) {
     this.app = app
     this.tgClientService = TgClientService.getInstance()
+    this.taskRepository = app.dataSource.getRepository(Task)
+    this.taskItemRepository = app.dataSource.getRepository(TaskItem)
+    this.senderRepository = app.dataSource.getRepository(Sender)
+    this.senderService = new SenderService(app)
+    this.app.addHook('onReady', async () => {
+      this.scheduleTaskSend()
+    })
   }
 
+  /**
+   * 发送单条消息
+   * @param sessionString Telegram 会话字符串
+   * @param target 目标用户(用户名、手机号或用户ID)
+   * @param message 消息内容
+   * @returns 发送结果
+   */
   async sendMessage(sessionString: string, target: string, message: string): Promise<SendMessageResult> {
     const configError = this.validateConfig()
     if (configError) {
@@ -60,6 +98,10 @@ export class TgMsgSendService {
     }
   }
 
+  /**
+   * 验证配置(API_ID 和 API_HASH)
+   * @returns 如果配置无效返回错误信息,否则返回 null
+   */
   private validateConfig(): string | null {
     const apiId = parseInt(this.app.config.API_ID)
     const apiHash = this.app.config.API_HASH
@@ -70,6 +112,12 @@ export class TgMsgSendService {
     return null
   }
 
+  /**
+   * 解析目标标识符
+   * 支持格式:@username、+1234567890、1234567890(手机号)、-123456(用户ID)
+   * @param targetId 目标标识符字符串
+   * @returns 解析后的目标(字符串或数字),如果格式错误返回 null
+   */
   private parseTarget(targetId: string): string | number | null {
     const trimmed = targetId.trim()
 
@@ -93,6 +141,11 @@ export class TgMsgSendService {
     return null
   }
 
+  /**
+   * 提取错误信息
+   * @param error 错误对象(可能是 Error、字符串或其他类型)
+   * @returns 错误信息字符串
+   */
   private extractErrorMessage(error: unknown): string {
     if (error instanceof Error) {
       return error.message
@@ -102,4 +155,497 @@ export class TgMsgSendService {
     }
     return '未知错误'
   }
+
+  /**
+   * 启动任务发送轮询调度器
+   * 使用静态变量确保只启动一个轮询实例
+   */
+  private scheduleTaskSend() {
+    if (TgMsgSendService.schedulerStarted) {
+      return
+    }
+    const interval = setInterval(() => void this.taskSendCycle(), this.pollIntervalMs)
+    TgMsgSendService.schedulerStarted = true
+
+    this.app.addHook('onClose', async () => {
+      clearInterval(interval)
+      TgMsgSendService.schedulerStarted = false
+    })
+
+    this.app.log.info(`任务发送轮询已启动,间隔=${this.pollIntervalMs}ms,实例=${this.instanceId}`)
+  }
+
+  /**
+   * 任务发送轮询循环
+   * 每轮询一次处理一个发送中的任务,包含防卡死机制
+   */
+  private async taskSendCycle() {
+    if (this.processing) {
+      const now = Date.now()
+      if (this.processingSince && now - this.processingSince > this.maxProcessingMs) {
+        this.app.log.warn('taskSendCycle 脱离卡死,重置 processing')
+        this.processing = false
+      } else {
+        return
+      }
+    }
+    this.processing = true
+    this.processingSince = Date.now()
+    try {
+      await this.processTaskSend()
+    } catch (error) {
+      const msg = error instanceof Error ? error.message : '未知错误'
+      const isDbConnectionError =
+        msg.includes('ECONNRESET') ||
+        msg.includes('EPIPE') ||
+        msg.includes('ETIMEDOUT') ||
+        msg.includes('ENOTFOUND') ||
+        msg.includes('Connection lost') ||
+        msg.includes('Connection closed')
+
+      if (isDbConnectionError) {
+        this.app.log.debug(`数据库连接异常,跳过本次轮询: ${msg}`)
+      } else {
+        const stack = error instanceof Error ? error.stack : undefined
+        this.app.log.error(`处理发送任务失败: ${msg}${stack ? `; stack=${stack}` : ''}`)
+      }
+    } finally {
+      this.processing = false
+      this.processingSince = null
+    }
+  }
+
+  /**
+   * 处理任务发送
+   * 查找一个发送中的任务,批量处理待发送的任务项,支持并发发送
+   */
+  private async processTaskSend() {
+    const task = await this.taskRepository.findOne({
+      where: { status: TaskStatus.SENDING, delFlag: false },
+      order: { startedAt: 'ASC' }
+    })
+
+    if (!task) {
+      return
+    }
+    this.app.log.info(`开始发送任务 id=${task.id}, startedAt=${task.startedAt?.toISOString}`)
+
+    const configuredSendLimit =
+      task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
+    const sendIntervalMs = Math.max(0, Number(task.sendInterval ?? 0) * 1000)
+    const batchSize =
+      task.sendBatchSize && Number(task.sendBatchSize) > 0 ? Number(task.sendBatchSize) : this.taskBatchSize
+    const concurrentCount = task.concurrentCount && Number(task.concurrentCount) > 0 ? Number(task.concurrentCount) : 1
+    const batchTotal = batchSize
+
+    this.currentSenderSendLimit = configuredSendLimit
+    this.senderUsageInBatch.clear()
+    this.senderCursor = 0
+    await this.refreshSenderCache()
+
+    const pendingItems = await this.taskItemRepository.find({
+      where: { taskId: task.id, status: TaskItemStatus.PENDING },
+      order: { id: 'ASC' },
+      take: batchTotal
+    })
+
+    if (pendingItems.length === 0) {
+      await this.finalizeTaskIfDone(task.id)
+      return
+    }
+
+    const queue = [...pendingItems]
+    const workerCount = Math.min(concurrentCount, queue.length)
+
+    const workerResults = await Promise.allSettled(
+      Array.from({ length: workerCount }, (_, index) => this.runSenderWorker(task, queue, sendIntervalMs, index))
+    )
+
+    let batchSent = 0
+    let batchSuccess = 0
+
+    workerResults.forEach((result, index) => {
+      if (result.status === 'fulfilled') {
+        batchSent += result.value.sent
+        batchSuccess += result.value.success
+      } else {
+        const msg =
+          result.reason instanceof Error
+            ? result.reason.message
+            : typeof result.reason === 'string'
+            ? result.reason
+            : '未知错误'
+        this.app.log.error(`worker=${index} 执行失败: ${msg}`)
+      }
+    })
+
+    if (batchSent > 0) {
+      await this.taskRepository.increment({ id: task.id }, 'sent', batchSent)
+    }
+    if (batchSuccess > 0) {
+      await this.taskRepository.increment({ id: task.id }, 'successCount', batchSuccess)
+    }
+
+    const latest = await this.taskRepository.findOne({ where: { id: task.id } })
+    if (!latest || latest.status !== TaskStatus.SENDING) {
+      this.app.log.info(`任务 ${task.id} 已暂停或停止,本轮结束`)
+      return
+    }
+
+    await this.finalizeTaskIfDone(task.id)
+  }
+
+  /**
+   * 如果任务已完成,则更新任务状态为已完成
+   * @param taskId 任务ID
+   */
+  private async finalizeTaskIfDone(taskId: number): Promise<void> {
+    const pendingCount = await this.taskItemRepository.count({
+      where: { taskId, status: TaskItemStatus.PENDING }
+    })
+    if (pendingCount > 0) {
+      return
+    }
+
+    const successCount = await this.taskItemRepository.count({
+      where: { taskId, status: TaskItemStatus.SUCCESS }
+    })
+    const failedCount = await this.taskItemRepository.count({
+      where: { taskId, status: TaskItemStatus.FAILED }
+    })
+
+    await this.taskRepository.update(taskId, {
+      status: TaskStatus.COMPLETED,
+      sent: successCount + failedCount,
+      successCount
+    })
+  }
+
+  /**
+   * 运行发送工作线程
+   * 从队列中取出任务项并发送,支持发送者轮换和会话管理
+   * @param task 任务对象
+   * @param queue 任务项队列
+   * @param sendIntervalMs 发送间隔(毫秒)
+   * @param workerIndex 工作线程索引
+   * @returns 发送统计信息(已发送、成功、失败数量)
+   */
+  private async runSenderWorker(
+    task: Task,
+    queue: TaskItem[],
+    sendIntervalMs: number,
+    workerIndex: number
+  ): Promise<{ sent: number; success: number; failed: number }> {
+    let sent = 0
+    let success = 0
+    let failed = 0
+    let sender: Sender | null = null
+    let client: TelegramClient | null = null
+    let senderSentInRound = 0
+
+    while (true) {
+      const taskItem = queue.shift()
+      if (!taskItem) {
+        break
+      }
+
+      const stillSending = await this.isTaskSending(task.id)
+      if (!stillSending) {
+        this.app.log.info(`任务 ${task.id} 已暂停/停止,worker=${workerIndex} 提前结束`)
+        break
+      }
+
+      if (!sender || senderSentInRound >= this.currentSenderSendLimit) {
+        await this.tgClientService.disconnectClient(client)
+        sender = await this.pickSender()
+        const sessionString = await this.ensureSessionString(sender)
+        try {
+          client = await this.tgClientService.createConnectedClient(sessionString)
+        } catch (error) {
+          const msg = error instanceof Error ? error.message : String(error)
+          if (this.isSessionRevokedMessage(msg)) {
+            await this.handleSessionRevoked(sender)
+            sender = null
+            client = null
+            continue
+          }
+          throw error
+        }
+        senderSentInRound = 0
+
+        const me = await client.getMe().catch(() => null)
+        const delaySeconds = this.getRandomDelaySeconds()
+        const displayName = `${me?.firstName ?? ''} ${me?.lastName ?? ''}`.trim() || me?.username || ''
+        this.app.log.info(
+          `worker=${workerIndex} ,当前登录账号: id: ${me?.id ?? sender.id} ,name: ${
+            displayName || sender.id
+          },延迟 ${delaySeconds}s 后开始发送`
+        )
+        await this.sleep(delaySeconds * 1000)
+      }
+
+      try {
+        await this.sendTaskItemWithClient(task, taskItem, sender!, client!, workerIndex)
+        success++
+      } catch (error) {
+        failed++
+        const msg = error instanceof Error ? error.message : '未知错误'
+        if (sender && this.isSessionRevokedMessage(msg)) {
+          await this.handleSessionRevoked(sender)
+          await this.tgClientService.disconnectClient(client)
+          sender = null
+          client = null
+          senderSentInRound = 0
+        }
+        this.app.log.warn(
+          `❌ 发送失败 taskId=${task.id}, item=${taskItem.id}, sender=${
+            sender?.id ?? '未知'
+          }, worker=${workerIndex}, error: ${msg}`
+        )
+      } finally {
+        sent++
+        senderSentInRound++
+        if (sender) {
+          const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
+          this.senderUsageInBatch.set(sender.id, used)
+        }
+      }
+
+      if (sendIntervalMs > 0) {
+        await this.sleep(sendIntervalMs)
+      }
+    }
+
+    await this.tgClientService.disconnectClient(client)
+
+    return { sent, success, failed }
+  }
+
+  /**
+   * 使用客户端发送任务项
+   * 发送消息、清除会话、删除临时联系人,并更新任务项状态
+   * @param task 任务对象
+   * @param taskItem 任务项对象
+   * @param sender 发送者对象
+   * @param client Telegram 客户端
+   * @param workerIndex 工作线程索引
+   */
+  private async sendTaskItemWithClient(
+    task: Task,
+    taskItem: TaskItem,
+    sender: Sender,
+    client: TelegramClient,
+    workerIndex: number
+  ): Promise<void> {
+    try {
+      const parsedTarget = this.parseTarget(taskItem.target)
+      if (!parsedTarget) {
+        throw new Error('target 格式错误,请检查是否正确')
+      }
+
+      const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
+      if (!targetPeer) {
+        throw new Error('target 无效,无法获取目标信息')
+      }
+
+      const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
+      if (!canSendMessage) {
+        throw new Error('目标用户不允许接收消息或已被限制')
+      }
+
+      await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
+      await this.tgClientService.clearConversation(client, targetPeer).catch(() => {})
+      await this.tgClientService.deleteTempContact(client, (targetPeer as any).id).catch(() => {})
+
+      await this.taskItemRepository.update(taskItem.id, {
+        status: TaskItemStatus.SUCCESS,
+        sentAt: new Date(),
+        senderId: sender.id,
+        errorMsg: null
+      })
+
+      await this.senderService.incrementUsageCount(sender.id)
+      this.app.log.info(
+        `✅ 发送成功 taskId=${task.id}, itemId=${taskItem.id}, sender=${sender.id}, worker=${workerIndex}`
+      )
+    } catch (error) {
+      const msg = error instanceof Error ? error.message : '未知错误'
+      await this.taskItemRepository.update(taskItem.id, {
+        status: TaskItemStatus.FAILED,
+        sentAt: new Date(),
+        senderId: sender.id,
+        errorMsg: msg
+      })
+      await this.senderService.incrementUsageCount(sender.id)
+      throw error
+    }
+  }
+
+  /**
+   * 刷新发送者缓存
+   * 从数据库重新加载所有未删除的发送者,按最后使用时间和使用次数排序
+   */
+  private async refreshSenderCache(): Promise<void> {
+    this.senderCache = await this.senderRepository.find({
+      where: { delFlag: false },
+      order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
+    })
+    this.senderCursor = 0
+  }
+
+  /**
+   * 检查任务是否正在发送中
+   * @param taskId 任务ID
+   * @returns 如果任务状态为发送中且未删除返回 true,否则返回 false
+   */
+  private async isTaskSending(taskId: number): Promise<boolean> {
+    const current = await this.taskRepository.findOne({ where: { id: taskId } })
+    return !!current && current.status === TaskStatus.SENDING && current.delFlag === false
+  }
+
+  /**
+   * 延迟指定毫秒数
+   * @param ms 延迟毫秒数
+   */
+  private async sleep(ms: number): Promise<void> {
+    return await new Promise(resolve => setTimeout(resolve, ms))
+  }
+
+  /**
+   * 选择可用的发送者
+   * 从缓存中选择使用次数最少的发送者,如果所有发送者都达到上限则重置计数
+   * @returns 选中的发送者对象
+   * @throws 如果没有可用发送者则抛出错误
+   */
+  private async pickSender(): Promise<Sender> {
+    if (this.senderCache.length === 0) {
+      this.senderCache = await this.senderRepository.find({
+        where: { delFlag: false },
+        order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
+      })
+      this.senderCursor = 0
+    }
+
+    if (this.senderCache.length === 0) {
+      throw new Error('暂无可用 sender 账号')
+    }
+
+    const total = this.senderCache.length
+    for (let i = 0; i < total; i++) {
+      const index = (this.senderCursor + i) % total
+      const sender = this.senderCache[index]
+      const used = this.senderUsageInBatch.get(sender.id) ?? 0
+      if (used < this.currentSenderSendLimit) {
+        this.senderCursor = (index + 1) % total
+        return sender
+      }
+    }
+
+    this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
+    this.senderUsageInBatch.clear()
+    this.senderCursor = 0
+    return this.senderCache[0]
+  }
+
+  /**
+   * 确保发送者有有效的会话字符串
+   * 如果发送者已有 sessionStr 则直接返回,否则根据 dcId 和 authKey 构建并保存
+   * @param sender 发送者对象
+   * @returns 会话字符串
+   * @throws 如果缺少必要的会话信息则抛出错误
+   */
+  private async ensureSessionString(sender: Sender): Promise<string> {
+    if (sender.sessionStr) {
+      return sender.sessionStr
+    }
+
+    if (sender.dcId && sender.authKey) {
+      const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
+      await this.senderRepository.update(sender.id, { sessionStr: session })
+      return session
+    }
+
+    throw new Error(`sender=${sender.id} 缺少 session 信息`)
+  }
+
+  /**
+   * 处理会话被撤销的情况
+   * 将发送者标记为已删除,并从缓存中移除
+   * @param sender 发送者对象
+   */
+  private async handleSessionRevoked(sender: Sender): Promise<void> {
+    await this.senderRepository.update(sender.id, { delFlag: true })
+    this.senderCache = this.senderCache.filter(s => s.id !== sender.id)
+    this.senderCursor = 0
+    this.app.log.warn(`sender=${sender.id} session 失效,已删除`)
+  }
+
+  /**
+   * 检查是否可以发送消息给目标用户
+   * 检查用户是否被屏蔽、是否为机器人、是否已删除、是否为虚假或诈骗账号
+   * @param client Telegram 客户端
+   * @param targetPeer 目标用户对象
+   * @returns 如果可以发送返回 true,否则返回 false
+   * @throws 如果认证密钥未注册则抛出错误
+   */
+  private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
+    try {
+      const fullUser = await client.invoke(
+        new Api.users.GetFullUser({
+          id: targetPeer
+        })
+      )
+
+      const fullUserData = fullUser.fullUser as any
+
+      if (fullUserData?.blocked) {
+        return false
+      }
+
+      if (targetPeer.bot && targetPeer.botChatHistory === false) {
+        return false
+      }
+
+      if (targetPeer.deleted) {
+        return false
+      }
+
+      if (targetPeer.fake || targetPeer.scam) {
+        return false
+      }
+
+      return true
+    } catch (error) {
+      const errorMessage = error instanceof Error ? error.message : '未知错误'
+
+      if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
+        throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
+      }
+
+      if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
+        return false
+      }
+
+      return true
+    }
+  }
+
+  /**
+   * 获取随机延迟秒数
+   * @param min 最小延迟秒数,默认 10
+   * @param max 最大延迟秒数,默认 20
+   * @returns 随机延迟秒数
+   */
+  private getRandomDelaySeconds(min: number = 10, max: number = 20): number {
+    return Math.floor(Math.random() * (max - min + 1)) + min
+  }
+
+  /**
+   * 检查错误消息是否表示会话被撤销
+   * @param msg 错误消息
+   * @returns 如果是会话被撤销相关的错误返回 true,否则返回 false
+   */
+  private isSessionRevokedMessage(msg: string): boolean {
+    return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
+  }
 }