Просмотр исходного кода

添加 sendLimit 字段到任务创建和更新功能,增强输入验证,确保 sendLimit 为大于 0 的数字,同时更新相关 DTO 和服务,优化任务处理逻辑。

wuyi 1 месяц назад
Родитель
Сommit
f12d355f1e

+ 19 - 3
src/controllers/task.controller.ts

@@ -21,6 +21,7 @@ export class TaskController {
 
       const nameField = data.fields['name']
       const messageField = data.fields['message']
+      const sendLimitField = data.fields['sendLimit']
 
       const name =
         nameField && !Array.isArray(nameField) && 'value' in nameField ? (nameField.value as string) : undefined
@@ -28,25 +29,35 @@ export class TaskController {
         messageField && !Array.isArray(messageField) && 'value' in messageField
           ? (messageField.value as string)
           : undefined
+      const sendLimit =
+        sendLimitField && !Array.isArray(sendLimitField) && 'value' in sendLimitField
+          ? Number(sendLimitField.value)
+          : undefined
 
       if (!name || !message) {
         return reply.code(400).send({ message: '任务名称和消息内容不能为空' })
       }
 
+      if (sendLimit !== undefined && (isNaN(sendLimit) || sendLimit <= 0)) {
+        return reply.code(400).send({ message: 'sendLimit 必须为大于 0 的数字' })
+      }
+
       const buffer = await data.toBuffer()
 
       const task = await this.taskService.create({
         name,
         message,
         userId,
-        buffer
+        buffer,
+        sendLimit
       })
 
       return reply.code(201).send({
         task: {
           id: task.id,
           name: task.name,
-          message: task.message
+          message: task.message,
+          sendLimit: task.sendLimit
         }
       })
     } catch (error) {
@@ -81,7 +92,7 @@ export class TaskController {
 
   async update(request: FastifyRequest<{ Body: UpdateTaskBody }>, reply: FastifyReply) {
     try {
-      const { id, name, message, total, sent, successCount, startedAt } = request.body
+      const { id, name, message, total, sent, successCount, startedAt, sendLimit } = request.body
 
       if (!id) {
         return reply.code(400).send({ message: '任务ID不能为空' })
@@ -92,6 +103,10 @@ export class TaskController {
         return reply.code(500).send({ message: '任务不存在' })
       }
 
+      if (sendLimit !== undefined && (isNaN(Number(sendLimit)) || Number(sendLimit) <= 0)) {
+        return reply.code(400).send({ message: 'sendLimit 必须为大于 0 的数字' })
+      }
+
       const updateData: Partial<Task> = {}
       if (name !== undefined) updateData.name = name
       if (message !== undefined) updateData.message = message
@@ -99,6 +114,7 @@ export class TaskController {
       if (sent !== undefined) updateData.sent = sent
       if (successCount !== undefined) updateData.successCount = successCount
       if (startedAt !== undefined) updateData.startedAt = startedAt
+      if (sendLimit !== undefined) updateData.sendLimit = Number(sendLimit)
 
       await this.taskService.update(id, updateData)
 

+ 2 - 0
src/dto/task.dto.ts

@@ -6,6 +6,7 @@ export interface CreateTaskBody {
   message: string
   file: any
   userId?: number
+  sendLimit?: number
 }
 
 export interface UpdateTaskBody {
@@ -16,6 +17,7 @@ export interface UpdateTaskBody {
   sent?: number
   successCount?: number
   startedAt?: Date
+  sendLimit?: number
 }
 
 export interface ListTaskQuery extends Pagination {

+ 6 - 0
src/entities/task-item.entity.ts

@@ -25,6 +25,12 @@ export class TaskItem {
   })
   status: TaskItemStatus
 
+  @Column({ type: 'bigint', nullable: true })
+  senderId: string | null
+
+  @Column({ type: 'text', nullable: true })
+  errorMsg: string | null
+
   @Column({ type: 'datetime', precision: 6, default: null })
   sentAt: Date
 

+ 3 - 0
src/entities/task.entity.ts

@@ -40,6 +40,9 @@ export class Task {
   })
   status: TaskStatus
 
+  @Column({ default: 5 })
+  sendLimit: number
+
   @Column({ type: 'datetime', precision: 6, default: null })
   startedAt: Date
 

+ 35 - 13
src/services/task.service.ts

@@ -18,7 +18,8 @@ export class TaskService {
   private senderRepository: Repository<Sender>
   private senderService: SenderService
   private tgClientService: TgClientService
-  private readonly senderSendLimit = 5
+  private readonly defaultSenderSendLimit = 5
+  private currentSenderSendLimit = this.defaultSenderSendLimit
   private senderUsageInBatch: Map<string, number> = new Map()
   private senderCursor = 0
   private senderCache: Sender[] = []
@@ -38,11 +39,18 @@ export class TaskService {
     })
   }
 
-  async create(data: { name: string; message: string; userId: number; buffer: Buffer }): Promise<Task> {
+  async create(data: {
+    name: string
+    message: string
+    userId: number
+    buffer: Buffer
+    sendLimit?: number
+  }): Promise<Task> {
     const task = this.taskRepository.create({
       name: data.name,
       message: data.message,
-      userId: data.userId
+      userId: data.userId,
+      sendLimit: data.sendLimit ?? this.defaultSenderSendLimit
     })
     const savedTask = await this.taskRepository.save(task)
     const total = await this.createTaskItemByBuffer({ taskId: savedTask.id, buffer: data.buffer })
@@ -208,6 +216,13 @@ export class TaskService {
       return
     }
 
+    const configuredSendLimit =
+      task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
+
+    this.currentSenderSendLimit = configuredSendLimit
+    this.senderUsageInBatch.clear()
+    this.senderCursor = 0
+
     const pendingItems = await this.taskItemRepository.find({
       where: { taskId: task.id, status: TaskItemStatus.PENDING },
       order: { id: 'ASC' },
@@ -235,10 +250,6 @@ export class TaskService {
         batchSuccess++
       } catch (error) {
         const msg = error instanceof Error ? error.message : '未知错误'
-        await this.taskItemRepository.update(item.id, {
-          status: TaskItemStatus.FAILED,
-          sentAt: new Date()
-        })
         batchFailed++
         this.app.log.warn(`发送失败 taskId=${task.id}, item=${item.id}: ${msg}`)
       }
@@ -300,7 +311,9 @@ export class TaskService {
 
       await this.taskItemRepository.update(taskItem.id, {
         status: TaskItemStatus.SUCCESS,
-        sentAt: new Date()
+        sentAt: new Date(),
+        senderId: sender.id,
+        errorMsg: null
       })
 
       await this.senderService.incrementUsageCount(sender.id)
@@ -308,19 +321,28 @@ export class TaskService {
       const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
       this.senderUsageInBatch.set(sender.id, used)
 
-      if (used >= this.senderSendLimit) {
-        this.app.log.info(`sender=${sender.id} 已达单次发送上限 ${this.senderSendLimit},切换下一个账号`)
+      if (used >= this.currentSenderSendLimit) {
+        this.app.log.info(`sender=${sender.id} 已达单次发送上限 ${this.currentSenderSendLimit},切换下一个账号`)
         await this.tgClientService.disconnect()
       }
     } catch (error) {
+      const msg = error instanceof Error ? error.message : '未知错误'
+      await this.taskItemRepository.update(taskItem.id, {
+        status: TaskItemStatus.FAILED,
+        sentAt: new Date(),
+        senderId: sender.id,
+        errorMsg: msg
+      })
+
       if (client) {
         try {
           await this.tgClientService.disconnect()
         } catch (disconnectError) {
-          const msg = disconnectError instanceof Error ? disconnectError.message : '未知错误'
-          this.app.log.warn(`断开连接失败: ${msg}`)
+          const disconnectMsg = disconnectError instanceof Error ? disconnectError.message : '未知错误'
+          this.app.log.warn(`断开连接失败: ${disconnectMsg}`)
         }
       }
+
       throw error
     }
   }
@@ -365,7 +387,7 @@ export class TaskService {
       const index = (this.senderCursor + i) % total
       const sender = this.senderCache[index]
       const used = this.senderUsageInBatch.get(sender.id) ?? 0
-      if (used < this.senderSendLimit) {
+      if (used < this.currentSenderSendLimit) {
         this.senderCursor = (index + 1) % total
         return sender
       }

+ 27 - 14
src/services/test.service.ts

@@ -44,16 +44,6 @@ export class TestService {
   ): Promise<{ success: boolean; message: string; error?: string }> {
     // 参考 testSendMessage 的登录+发送逻辑,单线程顺序发送:登录后等 5s,每条后等 5s,单账号发送满上限后切换
     try {
-      if (senderSendLimit !== undefined) {
-        if (isNaN(Number(senderSendLimit)) || Number(senderSendLimit) <= 0) {
-          return {
-            success: false,
-            message: 'senderSendLimit 必须为大于 0 的数字'
-          }
-        }
-        this.senderSendLimit = Number(senderSendLimit)
-      }
-
       const task = await this.findTaskById(taskId)
       if (!task) {
         return { success: false, message: '任务不存在' }
@@ -62,6 +52,21 @@ export class TestService {
         return { success: false, message: '任务已被删除' }
       }
 
+      const hasCustomLimit = senderSendLimit !== undefined
+      const validatedLimit =
+        hasCustomLimit && !isNaN(Number(senderSendLimit)) && Number(senderSendLimit) > 0
+          ? Number(senderSendLimit)
+          : undefined
+
+      if (hasCustomLimit && validatedLimit === undefined) {
+        return {
+          success: false,
+          message: 'senderSendLimit 必须为大于 0 的数字'
+        }
+      }
+
+      this.senderSendLimit = validatedLimit ?? task.sendLimit ?? this.senderSendLimit
+
       this.senderCache = []
       this.senderCursor = 0
       this.senderUsageInBatch.clear()
@@ -160,7 +165,9 @@ export class TestService {
 
             await this.taskItemRepository.update(item.id, {
               status: TaskItemStatus.SUCCESS,
-              sentAt: new Date()
+              sentAt: new Date(),
+              senderId: sender.id,
+              errorMsg: null
             })
             totalSuccess++
             this.app.log.info(`sender=${sender.id} item=${item.id} 发送成功`)
@@ -170,7 +177,9 @@ export class TestService {
             try {
               await this.taskItemRepository.update(item.id, {
                 status: TaskItemStatus.FAILED,
-                sentAt: new Date()
+                sentAt: new Date(),
+                senderId: sender.id,
+                errorMsg: msg
               })
             } catch (updateError) {
               const updateMsg = updateError instanceof Error ? updateError.message : '未知错误'
@@ -457,7 +466,9 @@ export class TestService {
               try {
                 await this.taskItemRepository.update(taskItem.id, {
                   status: TaskItemStatus.SUCCESS,
-                  sentAt: new Date()
+                    sentAt: new Date(),
+                    senderId: senderId,
+                    errorMsg: null
                 })
               } catch (updateError) {
                 const updateErrorMessage = updateError instanceof Error ? updateError.message : '未知错误'
@@ -477,7 +488,9 @@ export class TestService {
               try {
                 await this.taskItemRepository.update(taskItem.id, {
                   status: TaskItemStatus.FAILED,
-                  sentAt: new Date()
+                    sentAt: new Date(),
+                    senderId: senderId,
+                    errorMsg: errorMessage
                 })
               } catch (updateError) {
                 const updateErrorMessage = updateError instanceof Error ? updateError.message : '未知错误'