Просмотр исходного кода

添加启动任务并发送功能,更新 TestController 和 TestService,支持通过新接口启动任务并发送消息,同时增强错误处理和日志记录。

wuyi 1 месяц назад
Родитель
Сommit
6bca66a198
3 измененных файлов с 281 добавлено и 2 удалено
  1. 34 1
      src/controllers/test.controller.ts
  2. 6 1
      src/routes/test.routes.ts
  3. 241 0
      src/services/test.service.ts

+ 34 - 1
src/controllers/test.controller.ts

@@ -1,5 +1,5 @@
 import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify'
-import { SendMessageBody, CreateChatGroupBody, InviteMembersBody } from '../dto/task.dto'
+import { SendMessageBody, CreateChatGroupBody, InviteMembersBody, StartTaskSendBody } from '../dto/task.dto'
 import { TestService } from '../services/test.service'
 
 export class TestController {
@@ -97,4 +97,37 @@ export class TestController {
       return reply.code(500).send(result)
     }
   }
+
+  async startTaskAndSend(request: FastifyRequest<{ Body: StartTaskSendBody }>, reply: FastifyReply) {
+    try {
+      const { taskId, senderSendLimit } = request.body
+
+      if (!taskId || isNaN(Number(taskId))) {
+        return reply.code(400).send({
+          success: false,
+          message: 'taskId 必须为有效数字'
+        })
+      }
+
+      if (senderSendLimit !== undefined && (isNaN(Number(senderSendLimit)) || Number(senderSendLimit) <= 0)) {
+        return reply.code(400).send({
+          success: false,
+          message: 'senderSendLimit 必须为大于 0 的数字'
+        })
+      }
+
+      const result = await this.testService.startTaskAndSend(
+        Number(taskId),
+        senderSendLimit ? Number(senderSendLimit) : undefined
+      )
+
+      return reply.code(result.success ? 200 : 500).send(result)
+    } catch (error) {
+      return reply.code(500).send({
+        success: false,
+        message: '启动并发送任务失败',
+        error: error instanceof Error ? error.message : '未知错误'
+      })
+    }
+  }
 }

+ 6 - 1
src/routes/test.routes.ts

@@ -1,6 +1,6 @@
 import { FastifyInstance } from 'fastify'
 import { TestController } from '../controllers/test.controller'
-import { SendMessageBody, CreateChatGroupBody, InviteMembersBody } from '../dto/task.dto'
+import { SendMessageBody, CreateChatGroupBody, InviteMembersBody, StartTaskSendBody } from '../dto/task.dto'
 
 export default async function testRoutes(fastify: FastifyInstance) {
   const testController = new TestController(fastify)
@@ -16,4 +16,9 @@ export default async function testRoutes(fastify: FastifyInstance) {
     '/task/invite-members',
     testController.testInviteMembersToChat.bind(testController)
   )
+
+  fastify.post<{ Body: StartTaskSendBody }>(
+    '/task/start-and-send',
+    testController.startTaskAndSend.bind(testController)
+  )
 }

+ 241 - 0
src/services/test.service.ts

@@ -1,6 +1,8 @@
 import { FastifyInstance } from 'fastify'
 import { Repository } from 'typeorm'
 import { Api, TelegramClient } from 'telegram'
+import { StringSession } from 'telegram/sessions'
+import bigInt from 'big-integer'
 import { Task, TaskStatus } from '../entities/task.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { Sender } from '../entities/sender.entity'
@@ -17,6 +19,10 @@ export class TestService {
   private readonly senderService: SenderService
   private readonly tgClientService: TgClientService
   private readonly chatGroupService: ChatGroupService
+  private senderSendLimit = 5
+  private senderUsageInBatch: Map<string, number> = new Map()
+  private senderCursor = 0
+  private senderCache: Sender[] = []
 
   constructor(app: FastifyInstance) {
     this.app = app
@@ -32,6 +38,197 @@ export class TestService {
     return this.taskRepository.findOne({ where: { id, delFlag: false } })
   }
 
+  async startTaskAndSend(
+    taskId: number,
+    senderSendLimit?: number
+  ): Promise<{ success: boolean; message: string; error?: string }> {
+    // 参考 testSendMessage 的登录+发送逻辑,单线程顺序发送:登录后等 5s,每条后等 5s,单账号发送满上限后切换
+    try {
+      if (senderSendLimit !== undefined) {
+        if (isNaN(Number(senderSendLimit)) || Number(senderSendLimit) <= 0) {
+          return {
+            success: false,
+            message: 'senderSendLimit 必须为大于 0 的数字'
+          }
+        }
+        this.senderSendLimit = Number(senderSendLimit)
+      }
+
+      const task = await this.findTaskById(taskId)
+      if (!task) {
+        return { success: false, message: '任务不存在' }
+      }
+      if (task.delFlag) {
+        return { success: false, message: '任务已被删除' }
+      }
+
+      this.senderCache = []
+      this.senderCursor = 0
+      this.senderUsageInBatch.clear()
+
+      await this.taskRepository.update(taskId, {
+        status: TaskStatus.SENDING,
+        startedAt: task.startedAt ?? new Date()
+      })
+
+      const pendingItems = await this.taskItemRepository.find({
+        where: { taskId, status: TaskItemStatus.PENDING },
+        order: { id: 'ASC' }
+      })
+
+      if (pendingItems.length === 0) {
+        await this.taskRepository.update(taskId, { status: TaskStatus.COMPLETED })
+        return { success: true, message: '该任务没有待发送的目标账户' }
+      }
+
+      const delay = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))
+      let totalSent = 0
+      let totalSuccess = 0
+      let totalFailed = 0
+
+      const pickSender = async (): Promise<Sender> => {
+        if (this.senderCache.length === 0) {
+          this.senderCache = await this.senderRepository.find({
+            where: { delFlag: false },
+            order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
+          })
+          this.senderCursor = 0
+        }
+        if (this.senderCache.length === 0) {
+          throw new Error('暂无可用 sender 账号')
+        }
+        const sender = this.senderCache[this.senderCursor]
+        this.senderCursor = (this.senderCursor + 1) % this.senderCache.length
+        return sender
+      }
+
+      const queue = [...pendingItems]
+
+      while (queue.length > 0) {
+        const sender = await pickSender()
+        const sessionString = await this.ensureSessionStringForSender(sender)
+
+        const connectWithTimeout = async () => {
+          const timeoutMs = 10_000
+          return Promise.race([
+            this.tgClientService.connect(sessionString),
+            (async () => {
+              await delay(timeoutMs)
+              throw new Error(`连接超时(${timeoutMs / 1000}s)`)
+            })()
+          ]) as Promise<TelegramClient>
+        }
+
+        this.app.log.info(`sender=${sender.id} 准备连接并发送,当前批次上限=${this.senderSendLimit}`)
+        let client: TelegramClient
+        try {
+          client = await connectWithTimeout()
+        } catch (error) {
+          const msg = error instanceof Error ? error.message : '未知错误'
+          this.app.log.warn(`sender=${sender.id} 连接失败,切换下一个 sender: ${msg}`)
+          try {
+            await this.tgClientService.disconnect()
+          } catch {}
+          continue
+        }
+
+        const loginDelaySeconds = 5
+        this.app.log.info(`sender=${sender.id} 登录完成,等待 ${loginDelaySeconds} 秒后开始发送`)
+        await delay(loginDelaySeconds * 1000)
+
+        let messagesWithCurrentSender = 0
+
+        while (messagesWithCurrentSender < this.senderSendLimit && queue.length > 0) {
+          const item = queue.shift()!
+          try {
+            const parsedTarget = this.parseTarget(item.target)
+            if (!parsedTarget) {
+              throw new Error('target 格式错误,请检查是否正确')
+            }
+
+            const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
+            if (!targetPeer) {
+              throw new Error('target 无效,无法获取目标信息')
+            }
+
+            const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
+            if (!canSendMessage) {
+              throw new Error('目标用户不允许接收消息或已被限制')
+            }
+
+            await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
+
+            await this.taskItemRepository.update(item.id, {
+              status: TaskItemStatus.SUCCESS,
+              sentAt: new Date()
+            })
+            totalSuccess++
+            this.app.log.info(`sender=${sender.id} item=${item.id} 发送成功`)
+          } catch (error) {
+            const msg = error instanceof Error ? error.message : '未知错误'
+            this.app.log.warn(`item=${item.id} 发送失败: ${msg}`)
+            try {
+              await this.taskItemRepository.update(item.id, {
+                status: TaskItemStatus.FAILED,
+                sentAt: new Date()
+              })
+            } catch (updateError) {
+              const updateMsg = updateError instanceof Error ? updateError.message : '未知错误'
+              this.app.log.warn(`更新 taskItem 状态失败 [${item.id}]: ${updateMsg}`)
+            }
+            totalFailed++
+          } finally {
+            totalSent++
+            const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
+            this.senderUsageInBatch.set(sender.id, used)
+            try {
+              await this.senderService.incrementUsageCount(sender.id)
+            } catch (error) {
+              const msg = error instanceof Error ? error.message : '未知错误'
+              this.app.log.warn(`更新 sender usageCount 失败 [${sender.id}]: ${msg}`)
+            }
+
+            messagesWithCurrentSender++
+
+            const delaySeconds = 5
+            this.app.log.info(`等待 ${delaySeconds} 秒后继续发送下一条`)
+            await delay(delaySeconds * 1000)
+          }
+        }
+
+        await this.tgClientService.disconnect()
+      }
+
+      if (totalSent > 0) {
+        await this.taskRepository.increment({ id: taskId }, 'sent', totalSent)
+      }
+      if (totalSuccess > 0) {
+        await this.taskRepository.increment({ id: taskId }, 'successCount', totalSuccess)
+      }
+
+      const pendingLeft = await this.taskItemRepository.count({
+        where: { taskId, status: TaskItemStatus.PENDING }
+      })
+
+      if (pendingLeft === 0) {
+        await this.taskRepository.update(taskId, { status: TaskStatus.COMPLETED })
+      }
+
+      return {
+        success: true,
+        message: `任务发送完成,总计=${totalSent},成功=${totalSuccess},失败=${totalFailed}`
+      }
+    } catch (error) {
+      const msg = error instanceof Error ? error.message : '未知错误'
+      this.app.log.error(`启动并发送任务失败: ${msg}`)
+      return {
+        success: false,
+        message: '启动并发送任务失败',
+        error: msg
+      }
+    }
+  }
+
   async testSendMessage(
     senderId: string,
     taskId: number,
@@ -936,6 +1133,50 @@ export class TestService {
     return null
   }
 
+  private async pickSenderForTask(): Promise<Sender> {
+    if (this.senderCache.length === 0) {
+      this.senderCache = await this.senderRepository.find({
+        where: { delFlag: false },
+        order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
+      })
+      this.senderCursor = 0
+    }
+
+    if (this.senderCache.length === 0) {
+      throw new Error('暂无可用 sender 账号')
+    }
+
+    const total = this.senderCache.length
+    for (let i = 0; i < total; i++) {
+      const index = (this.senderCursor + i) % total
+      const sender = this.senderCache[index]
+      const used = this.senderUsageInBatch.get(sender.id) ?? 0
+      if (used < this.senderSendLimit) {
+        this.senderCursor = (index + 1) % total
+        return sender
+      }
+    }
+
+    this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
+    this.senderUsageInBatch.clear()
+    this.senderCursor = 0
+    return this.senderCache[0]
+  }
+
+  private async ensureSessionStringForSender(sender: Sender): Promise<string> {
+    if (sender.sessionStr) {
+      return sender.sessionStr
+    }
+
+    if (sender.dcId && sender.authKey) {
+      const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
+      await this.senderRepository.update(sender.id, { sessionStr: session })
+      return session
+    }
+
+    throw new Error(`sender=${sender.id} 缺少 session 信息`)
+  }
+
   private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
     try {
       const fullUser = await client.invoke(