Просмотр исходного кода

重构任务管理功能,添加任务类型和相关配置,更新任务创建和更新逻辑以支持新的字段(如 accountLimit、intervalTime 和 threads),增强错误处理和数据验证,提升代码可读性和一致性。同时,移除不再使用的 GroupTask 和 GroupTaskItem 实体,优化任务调度和执行逻辑。

wuyi 3 недель назад
Родитель
Сommit
44640f3e1c

+ 122 - 20
src/controllers/task.controller.ts

@@ -1,7 +1,7 @@
 import { FastifyRequest, FastifyReply, FastifyInstance } from 'fastify'
 import { TaskService } from '../services/task.service'
 import { UpdateTaskBody, ListTaskQuery, ListTaskItemQuery } from '../dto/task.dto'
-import { Task } from '../entities/task.entity'
+import { Task, TaskType } from '../entities/task.entity'
 
 export class TaskController {
   private taskService: TaskService
@@ -10,6 +10,34 @@ export class TaskController {
     this.taskService = new TaskService(app)
   }
 
+  private parseIntervalTime(input: unknown): { ok: true; value: string } | { ok: false; message: string } {
+    if (input === undefined || input === null || input === '') {
+      return { ok: true, value: '5-5' }
+    }
+
+    const raw = String(input).trim()
+    // 允许: "10", "10-20", "10~20", "10,20", "10 20"
+    const normalized = raw.replace('~', '-').replace(',', '-').replace(/\s+/g, '-')
+    const parts = normalized.split('-').filter(Boolean)
+    if (parts.length === 1) {
+      const s = Number(parts[0])
+      if (Number.isNaN(s) || s < 0) return { ok: false, message: 'intervalTime 必须为大于等于 0 的秒数或区间(如 10-20)' }
+      return { ok: true, value: `${s}-${s}` }
+    }
+    if (parts.length === 2) {
+      const a = Number(parts[0])
+      const b = Number(parts[1])
+      if (Number.isNaN(a) || Number.isNaN(b) || a < 0 || b < 0) {
+        return { ok: false, message: 'intervalTime 必须为大于等于 0 的秒数区间(如 10-20)' }
+      }
+      const min = Math.min(a, b)
+      const max = Math.max(a, b)
+      return { ok: true, value: `${min}-${max}` }
+    }
+
+    return { ok: false, message: 'intervalTime 格式错误,示例:10 或 10-20' }
+  }
+
   async create(request: FastifyRequest, reply: FastifyReply) {
     try {
       const userId = request.user.id
@@ -20,44 +48,100 @@ export class TaskController {
       }
 
       const nameField = data.fields['name']
+      const typeField = data.fields['type']
       const messageField = data.fields['message']
-      const sendLimitField = data.fields['sendLimit']
+      const inviteLinkField = data.fields['inviteLink']
+      // 新字段
+      const accountLimitField = data.fields['accountLimit']
+      const intervalTimeField = data.fields['intervalTime']
+      const threadsField = data.fields['threads']
 
       const name =
         nameField && !Array.isArray(nameField) && 'value' in nameField ? (nameField.value as string) : undefined
+      const type =
+        typeField && !Array.isArray(typeField) && 'value' in typeField ? (typeField.value as string) : undefined
       const message =
         messageField && !Array.isArray(messageField) && 'value' in messageField
           ? (messageField.value as string)
           : undefined
-      const sendLimit =
-        sendLimitField && !Array.isArray(sendLimitField) && 'value' in sendLimitField
-          ? Number(sendLimitField.value)
+      const inviteLink =
+        inviteLinkField && !Array.isArray(inviteLinkField) && 'value' in inviteLinkField
+          ? (inviteLinkField.value as string)
+          : undefined
+      const accountLimit =
+        accountLimitField && !Array.isArray(accountLimitField) && 'value' in accountLimitField
+          ? Number(accountLimitField.value)
           : undefined
 
-      if (!name || !message) {
-        return reply.code(400).send({ message: '任务名称和消息内容不能为空' })
+      const intervalRaw =
+        intervalTimeField && !Array.isArray(intervalTimeField) && 'value' in intervalTimeField
+          ? intervalTimeField.value
+          : undefined
+      const intervalParsed = this.parseIntervalTime(intervalRaw)
+      if (!intervalParsed.ok) {
+        return reply.code(400).send({ message: intervalParsed.message })
       }
 
-      if (sendLimit !== undefined && (isNaN(sendLimit) || sendLimit <= 0)) {
-        return reply.code(400).send({ message: 'sendLimit 必须为大于 0 的数字' })
+      const threads =
+        threadsField && !Array.isArray(threadsField) && 'value' in threadsField ? Number(threadsField.value) : undefined
+
+      if (!name) {
+        return reply.code(400).send({ message: '任务名称不能为空' })
+      }
+
+      const taskType: TaskType =
+        type === TaskType.INVITE_TO_GROUP || type === TaskType.SEND_MESSAGE ? (type as TaskType) : TaskType.SEND_MESSAGE
+
+      if (accountLimit !== undefined && (isNaN(accountLimit) || accountLimit <= 0)) {
+        return reply.code(400).send({ message: 'accountLimit 必须为大于 0 的数字' })
+      }
+
+      if (threads !== undefined && (isNaN(threads) || threads <= 0)) {
+        return reply.code(400).send({ message: 'threads 必须为大于 0 的数字' })
+      }
+      if (threads !== undefined && threads > 10) {
+        return reply.code(400).send({ message: 'threads 最大值为 10' })
+      }
+
+      // payload 仅存任务特有配置
+      let payload: Record<string, any> = {}
+      if (taskType === TaskType.SEND_MESSAGE) {
+        if (!message) {
+          return reply.code(400).send({ message: 'SEND_MESSAGE 任务 message 不能为空' })
+        }
+        payload = { message }
+      } else if (taskType === TaskType.INVITE_TO_GROUP) {
+        if (!inviteLink || !inviteLink.trim()) {
+          return reply.code(400).send({ message: 'INVITE_TO_GROUP 任务 inviteLink 不能为空' })
+        }
+        payload = { inviteLink: inviteLink.trim() }
       }
 
       const buffer = await data.toBuffer()
 
       const task = await this.taskService.create({
         name,
-        message,
         userId,
         buffer,
-        sendLimit
+        type: taskType,
+        payload,
+        accountLimit,
+        intervalTime: intervalParsed.value,
+        threads
       })
 
       return reply.code(201).send({
         task: {
           id: task.id,
           name: task.name,
-          message: task.message,
-          sendLimit: task.sendLimit
+          type: task.type,
+          payload: task.payload,
+          accountLimit: task.accountLimit,
+          intervalTime: task.intervalTime,
+          threads: task.threads,
+          processed: task.processed,
+          success: task.success,
+          total: task.total
         }
       })
     } catch (error) {
@@ -92,7 +176,8 @@ export class TaskController {
 
   async update(request: FastifyRequest<{ Body: UpdateTaskBody }>, reply: FastifyReply) {
     try {
-      const { id, name, message, total, sent, successCount, startedAt, sendLimit } = request.body
+      const { id, name, payload, total, processed, success, startedAt, accountLimit, intervalTime, threads } =
+        request.body
 
       if (!id) {
         return reply.code(400).send({ message: '任务ID不能为空' })
@@ -103,18 +188,35 @@ export class TaskController {
         return reply.code(500).send({ message: '任务不存在' })
       }
 
-      if (sendLimit !== undefined && (isNaN(Number(sendLimit)) || Number(sendLimit) <= 0)) {
-        return reply.code(400).send({ message: 'sendLimit 必须为大于 0 的数字' })
+      if (accountLimit !== undefined && (isNaN(Number(accountLimit)) || Number(accountLimit) <= 0)) {
+        return reply.code(400).send({ message: 'accountLimit 必须为大于 0 的数字' })
+      }
+
+      if (intervalTime !== undefined) {
+        const parsed = this.parseIntervalTime(intervalTime)
+        if (!parsed.ok) return reply.code(400).send({ message: parsed.message })
+      }
+
+      if (threads !== undefined && (isNaN(Number(threads)) || Number(threads) <= 0)) {
+        return reply.code(400).send({ message: 'threads 必须为大于 0 的数字' })
+      }
+      if (threads !== undefined && Number(threads) > 10) {
+        return reply.code(400).send({ message: 'threads 最大值为 10' })
       }
 
       const updateData: Partial<Task> = {}
       if (name !== undefined) updateData.name = name
-      if (message !== undefined) updateData.message = message
+      if (payload !== undefined) updateData.payload = payload as any
       if (total !== undefined) updateData.total = total
-      if (sent !== undefined) updateData.sent = sent
-      if (successCount !== undefined) updateData.successCount = successCount
+      if (processed !== undefined) updateData.processed = processed
+      if (success !== undefined) updateData.success = success
       if (startedAt !== undefined) updateData.startedAt = startedAt
-      if (sendLimit !== undefined) updateData.sendLimit = Number(sendLimit)
+      if (accountLimit !== undefined) updateData.accountLimit = Number(accountLimit)
+      if (intervalTime !== undefined) {
+        const parsed = this.parseIntervalTime(intervalTime)
+        if (parsed.ok) updateData.intervalTime = parsed.value
+      }
+      if (threads !== undefined) updateData.threads = Math.min(10, Number(threads))
 
       await this.taskService.update(id, updateData)
 

+ 12 - 6
src/dto/task.dto.ts

@@ -3,21 +3,27 @@ import { Pagination } from './common.dto'
 
 export interface CreateTaskBody {
   name: string
-  message: string
+  type?: 'send_message' | 'invite_to_group'
+  message?: string
+  inviteLink?: string
   file: any
   userId?: number
-  sendLimit?: number
+  accountLimit?: number
+  intervalTime?: string
+  threads?: number
 }
 
 export interface UpdateTaskBody {
   id: number
   name?: string
-  message?: string
+  payload?: Record<string, any> | null
   total?: number
-  sent?: number
-  successCount?: number
+  processed?: number
+  success?: number
   startedAt?: Date
-  sendLimit?: number
+  accountLimit?: number
+  intervalTime?: string
+  threads?: number
 }
 
 export interface ListTaskQuery extends Pagination {

+ 0 - 40
src/entities/group-task-item.entity.ts

@@ -1,40 +0,0 @@
-import { Column, CreateDateColumn, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'
-
-export enum GroupTaskItemStatus {
-  PENDING = 'pending',
-  SUCCESS = 'success',
-  FAILED = 'failed'
-}
-
-export class GroupTaskItem {
-  @PrimaryGeneratedColumn()
-  id: number
-
-  @Column()
-  groupTaskId: number
-
-  @Column()
-  target: string
-
-  @Column({
-    type: 'enum',
-    enum: GroupTaskItemStatus,
-    default: GroupTaskItemStatus.PENDING
-  })
-  status: GroupTaskItemStatus
-
-  @Column({ type: 'bigint', nullable: true })
-  inviterId: string | null
-
-  @Column({ type: 'text', nullable: true })
-  errorMsg: string | null
-
-  @Column({ type: 'datetime', precision: 6, default: null })
-  invitedAt: Date
-
-  @CreateDateColumn()
-  createdAt: Date
-
-  @UpdateDateColumn()
-  updatedAt: Date
-}

+ 0 - 65
src/entities/group-task.entity.ts

@@ -1,65 +0,0 @@
-import { Entity, Column, CreateDateColumn, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'
-
-export enum GroupTaskStatus {
-  PENDING = 'pending',
-  SENDING = 'sending',
-  QUEUING = 'queuing',
-  PAUSED = 'paused',
-  CANCELED = 'canceled',
-  COMPLETED = 'completed'
-}
-
-@Entity()
-export class GroupTask {
-  @PrimaryGeneratedColumn()
-  id: number
-
-  @Column()
-  name: string
-
-  @Column()
-  userId: number
-
-  @Column({ default: 0 })
-  total: number
-
-  @Column({ default: 0 })
-  invited: number
-
-  @Column({ default: 0 })
-  successCount: number
-
-  @Column({
-    type: 'enum',
-    enum: GroupTaskStatus,
-    default: GroupTaskStatus.PENDING
-  })
-  status: GroupTaskStatus
-
-  @Column({ default: false })
-  cancelRequested: boolean
-
-  @Column({ nullable: false })
-  inviteLink: string
-
-  @Column({ default: 5 })
-  inviteLimit: number
-
-  @Column({ default: 5 })
-  inviteInterval: number
-
-  @Column({ default: 5 })
-  concurrentCount: number
-
-  @Column({ default: false })
-  delFlag: boolean
-
-  @Column({ type: 'datetime', precision: 6, default: null })
-  startedAt: Date
-
-  @CreateDateColumn()
-  createdAt: Date
-
-  @UpdateDateColumn()
-  updatedAt: Date
-}

+ 5 - 3
src/entities/task-item.entity.ts

@@ -2,12 +2,14 @@ import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, Update
 
 export enum TaskItemStatus {
   PENDING = 'pending',
+  PROCESSING = 'processing',
   SUCCESS = 'success',
   FAILED = 'failed'
 }
 
 @Entity()
-@Index(['taskId', 'status'])
+@Index('idx_task_item_taskId_status_id', ['taskId', 'status', 'id'])
+@Index('idx_task_item_taskId_createdAt', ['taskId', 'createdAt'])
 export class TaskItem {
   @PrimaryGeneratedColumn()
   id: number
@@ -26,13 +28,13 @@ export class TaskItem {
   status: TaskItemStatus
 
   @Column({ type: 'bigint', nullable: true })
-  senderId: string | null
+  operationId: string | null
 
   @Column({ type: 'text', nullable: true })
   errorMsg: string | null
 
   @Column({ type: 'datetime', precision: 6, default: null })
-  sentAt: Date
+  operatingAt: Date
 
   @CreateDateColumn()
   createdAt: Date

+ 28 - 12
src/entities/task.entity.ts

@@ -1,5 +1,10 @@
 import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'
 
+export enum TaskType {
+  SEND_MESSAGE = 'send_message',
+  INVITE_TO_GROUP = 'invite_to_group'
+}
+
 export enum TaskStatus {
   PENDING = 'pending',
   SENDING = 'sending',
@@ -10,6 +15,8 @@ export enum TaskStatus {
 }
 
 @Entity()
+@Index('idx_task_status_delFlag_startedAt', ['status', 'delFlag', 'startedAt'])
+@Index('idx_task_userId_delFlag_createdAt', ['userId', 'delFlag', 'createdAt'])
 export class Task {
   @PrimaryGeneratedColumn()
   id: number
@@ -20,17 +27,29 @@ export class Task {
   @Column()
   userId: number
 
-  @Column({ type: 'text', nullable: false })
-  message: string
+  @Column({
+    type: 'enum',
+    enum: TaskType,
+    default: TaskType.SEND_MESSAGE
+  })
+  type: TaskType
+
+  /**
+   * 任务配置
+   * - SEND_MESSAGE: { message: string }
+   * - INVITE_TO_GROUP: { inviteLink: string }
+   */
+  @Column({ type: 'simple-json', nullable: true })
+  payload: Record<string, any> | null
 
   @Column({ default: 0 })
-  total: number
+  processed: number
 
   @Column({ default: 0 })
-  sent: number
+  success: number
 
   @Column({ default: 0 })
-  successCount: number
+  total: number
 
   @Column({
     type: 'enum',
@@ -43,16 +62,13 @@ export class Task {
   cancelRequested: boolean
 
   @Column({ default: 5 })
-  sendLimit: number
-
-  @Column({ default: 5 })
-  sendInterval: number
+  accountLimit: number
 
-  @Column({ default: 50 })
-  sendBatchSize: number
+  @Column({ type: 'varchar', length: 32, default: '5-10' })
+  intervalTime: string
 
   @Column({ default: 5 })
-  concurrentCount: number
+  threads: number
 
   @Column({ default: false })
   delFlag: boolean

+ 222 - 43
src/executor/task.executor.ts

@@ -1,7 +1,8 @@
 import { FastifyInstance } from 'fastify'
+import { DataSource } from 'typeorm'
 import { Repository } from 'typeorm'
 import { TelegramClient, Api } from 'telegram'
-import { Task, TaskStatus } from '../entities/task.entity'
+import { Task, TaskStatus, TaskType } from '../entities/task.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { TgUser } from '../entities/tg-user.entity'
 import { TgUserService } from '../services/tg-user.service'
@@ -9,6 +10,7 @@ import { TgClientService } from '../services/tgClient.service'
 import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
 
 export class TaskExecutor {
+  private ds: DataSource
   private taskRepo: Repository<Task>
   private taskItemRepo: Repository<TaskItem>
   private senderRepo: Repository<TgUser>
@@ -22,6 +24,7 @@ export class TaskExecutor {
 
   constructor(private app: FastifyInstance) {
     const ds = app.dataSource
+    this.ds = ds
     this.senderService = new TgUserService(app)
     this.taskRepo = ds.getRepository(Task)
     this.taskItemRepo = ds.getRepository(TaskItem)
@@ -37,7 +40,7 @@ export class TaskExecutor {
 
       // 初始化 sender 配置
       this.currentSenderSendLimit =
-        task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
+        task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultSenderSendLimit
       this.senderUsageInBatch.clear()
       this.senderCursor = 0
       await this.refreshSenderCache()
@@ -70,7 +73,7 @@ export class TaskExecutor {
    * 核心发送逻辑(并发 worker)
    */
   private async process(task: Task): Promise<void> {
-    const concurrency = Math.max(1, task.concurrentCount)
+    const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
 
     const workers: Promise<void>[] = []
 
@@ -88,7 +91,7 @@ export class TaskExecutor {
     let sender: TgUser | null = null
     const workerTgClient = new TgClientService()
     let senderSentInRound = 0
-    const sendIntervalMs = await this.getSendInterval(taskId)
+    let inviteGroupEntity: any | null = null
 
     try {
       while (true) {
@@ -130,6 +133,7 @@ export class TaskExecutor {
           }
 
           senderSentInRound = 0
+          inviteGroupEntity = null
 
           // 获取当前账号信息并延迟
           const me = await workerTgClient
@@ -144,10 +148,19 @@ export class TaskExecutor {
             },延迟 ${delaySeconds}s 后开始发送`
           )
           await this.sleep(delaySeconds * 1000)
+
+          // 邀请任务:每次换号后,确保已加入目标群并拿到群实体
+          if (task.type === TaskType.INVITE_TO_GROUP) {
+            const inviteLink = String(task.payload?.inviteLink ?? '').trim()
+            if (!inviteLink) {
+              throw new Error('邀请链接为空,请检查 task.payload.inviteLink')
+            }
+            inviteGroupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
+          }
         }
 
         try {
-          await this.processTaskItem(task, taskItem, sender, workerTgClient)
+          await this.processTaskItem(task, taskItem, sender, workerTgClient, inviteGroupEntity)
         } catch (error) {
           const msg = error instanceof Error ? error.message : '未知错误'
           if (sender && this.isSessionRevokedMessage(msg)) {
@@ -164,9 +177,10 @@ export class TaskExecutor {
           }
         }
 
-        // 发送间隔
-        if (sendIntervalMs > 0) {
-          await this.sleep(sendIntervalMs)
+        // 处理间隔(每条随机)
+        const intervalMs = this.pickIntervalMs(task.intervalTime)
+        if (intervalMs > 0) {
+          await this.sleep(intervalMs)
         }
       }
     } finally {
@@ -178,32 +192,75 @@ export class TaskExecutor {
    * 拉取一个待发送的 TaskItem(DB 层保证并发安全)
    */
   private async pickNextTaskItem(taskId: number): Promise<TaskItem | null> {
-    const item = await this.taskItemRepo
-      .createQueryBuilder()
-      .where('taskId = :taskId', { taskId })
-      .andWhere('status = :status', { status: TaskItemStatus.PENDING })
-      .orderBy('id', 'ASC')
-      .setLock('pessimistic_write')
-      .getOne()
-
-    if (!item) return null
-
-    await this.taskItemRepo.update(item.id, {
-      status: TaskItemStatus.PENDING
-    })
+    const queryRunner = this.ds.createQueryRunner()
+    await queryRunner.connect()
+    await queryRunner.startTransaction()
+    try {
+      const repo = queryRunner.manager.getRepository(TaskItem)
+      const item = await repo
+        .createQueryBuilder('item')
+        .setLock('pessimistic_write')
+        .where('item.taskId = :taskId', { taskId })
+        .andWhere('item.status = :status', { status: TaskItemStatus.PENDING })
+        .orderBy('item.id', 'ASC')
+        .getOne()
+
+      if (!item) {
+        await queryRunner.commitTransaction()
+        return null
+      }
 
-    return item
+      await repo.update(item.id, {
+        status: TaskItemStatus.PROCESSING,
+        operatingAt: new Date(),
+        errorMsg: null
+      })
+
+      await queryRunner.commitTransaction()
+      return { ...item, status: TaskItemStatus.PROCESSING }
+    } catch (err) {
+      await queryRunner.rollbackTransaction()
+      throw err
+    } finally {
+      await queryRunner.release()
+    }
   }
 
   /**
-   * 真正发送一条消息
+   * 处理单个 TaskItem(按 Task.type 分发)
    */
   private async processTaskItem(
+    task: Task,
+    item: TaskItem,
+    sender: TgUser,
+    workerTgClient: TgClientService,
+    inviteGroupEntity: any | null
+  ): Promise<void> {
+    if (task.type === TaskType.INVITE_TO_GROUP) {
+      return await this.processInviteToGroup(task, item, sender, workerTgClient, inviteGroupEntity)
+    }
+
+    return await this.processSendMessage(task, item, sender, workerTgClient)
+  }
+
+  private async processSendMessage(
     task: Task,
     item: TaskItem,
     sender: TgUser,
     workerTgClient: TgClientService
   ): Promise<void> {
+    const message = String(task.payload?.message ?? '').trim()
+    if (!message) {
+      await this.taskItemRepo.update(item.id, {
+        status: TaskItemStatus.FAILED,
+        operatingAt: new Date(),
+        operationId: sender.id,
+        errorMsg: '消息内容为空'
+      })
+      await this.taskRepo.increment({ id: task.id }, 'processed', 1)
+      return
+    }
+
     try {
       const parsedTarget = this.parseTarget(item.target)
       if (!parsedTarget) {
@@ -220,49 +277,154 @@ export class TaskExecutor {
         throw new Error('目标用户不允许接收消息或已被限制')
       }
 
-      await workerTgClient.sendMessageToPeer(targetPeer, task.message)
+      await workerTgClient.sendMessageToPeer(targetPeer, message)
       await workerTgClient.clearConversation(targetPeer).catch(() => {})
       await workerTgClient.deleteTempContact((targetPeer as any).id).catch(() => {})
 
       await this.taskItemRepo.update(item.id, {
         status: TaskItemStatus.SUCCESS,
-        sentAt: new Date(),
-        senderId: sender.id,
+        operatingAt: new Date(),
+        operationId: sender.id,
         errorMsg: null
       })
 
       await this.senderService.incrementUsageCount(sender.id)
-      await this.taskRepo.increment({ id: task.id }, 'sent', 1)
-      await this.taskRepo.increment({ id: task.id }, 'successCount', 1)
+      await this.taskRepo.increment({ id: task.id }, 'processed', 1)
+      await this.taskRepo.increment({ id: task.id }, 'success', 1)
 
       this.app.log.info(`✅ 发送成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
     } catch (error) {
       const msg = error instanceof Error ? error.message : '未知错误'
       await this.taskItemRepo.update(item.id, {
         status: TaskItemStatus.FAILED,
-        sentAt: new Date(),
-        senderId: sender.id,
+        operatingAt: new Date(),
+        operationId: sender.id,
         errorMsg: msg
       })
 
       await this.senderService.incrementUsageCount(sender.id)
-      await this.taskRepo.increment({ id: task.id }, 'sent', 1)
+      await this.taskRepo.increment({ id: task.id }, 'processed', 1)
 
       this.app.log.warn(`❌ 发送失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
       throw error
     }
   }
 
+  private async processInviteToGroup(
+    task: Task,
+    item: TaskItem,
+    sender: TgUser,
+    workerTgClient: TgClientService,
+    inviteGroupEntity: any | null
+  ): Promise<void> {
+    try {
+      if (!inviteGroupEntity) {
+        throw new Error('未获取到群组实体(inviteGroupEntity 为空)')
+      }
+
+      const parsedTarget = this.parseTarget(item.target)
+      if (!parsedTarget) {
+        throw new Error('target 格式错误,请检查是否正确')
+      }
+
+      const targetUser = await workerTgClient.getTargetPeer(parsedTarget)
+      if (!targetUser) {
+        throw new Error('target 无效,无法获取目标信息')
+      }
+
+      const client = workerTgClient.getClient()
+      if (!client) {
+        throw new Error('TelegramClient 未连接')
+      }
+
+      const isChannel = inviteGroupEntity?.className === 'Channel'
+      const isChat = inviteGroupEntity?.className === 'Chat'
+      if (!isChannel && !isChat) {
+        throw new Error('目标并非群组或频道,无法邀请成员')
+      }
+
+      const inputUser = new Api.InputUser({
+        userId: targetUser.id,
+        accessHash: targetUser.accessHash || BigInt(0)
+      })
+
+      if (isChannel) {
+        if (!inviteGroupEntity?.accessHash) {
+          throw new Error('缺少 accessHash,无法邀请到频道/超级群组')
+        }
+        const inputChannel = new Api.InputChannel({
+          channelId: inviteGroupEntity.id,
+          accessHash: inviteGroupEntity.accessHash
+        })
+        await client.invoke(
+          new Api.channels.InviteToChannel({
+            channel: inputChannel,
+            users: [inputUser]
+          })
+        )
+      } else {
+        await client.invoke(
+          new Api.messages.AddChatUser({
+            chatId: inviteGroupEntity.id,
+            userId: inputUser,
+            fwdLimit: 0
+          })
+        )
+      }
+
+      await this.taskItemRepo.update(item.id, {
+        status: TaskItemStatus.SUCCESS,
+        operatingAt: new Date(),
+        operationId: sender.id,
+        errorMsg: null
+      })
+
+      await this.senderService.incrementUsageCount(sender.id)
+      await this.taskRepo.increment({ id: task.id }, 'processed', 1)
+      await this.taskRepo.increment({ id: task.id }, 'success', 1)
+
+      this.app.log.info(`✅ 邀请成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
+    } catch (error) {
+      const msg = error instanceof Error ? error.message : '未知错误'
+
+      // 已在群内:计为成功
+      if (msg.includes('USER_ALREADY_PARTICIPANT')) {
+        await this.taskItemRepo.update(item.id, {
+          status: TaskItemStatus.SUCCESS,
+          operatingAt: new Date(),
+          operationId: sender.id,
+          errorMsg: null
+        })
+        await this.taskRepo.increment({ id: task.id }, 'processed', 1)
+        await this.taskRepo.increment({ id: task.id }, 'success', 1)
+        this.app.log.info(`ℹ️ 成员已在群组中 taskId=${task.id}, itemId=${item.id}, target=${item.target}`)
+        return
+      }
+
+      await this.taskItemRepo.update(item.id, {
+        status: TaskItemStatus.FAILED,
+        operatingAt: new Date(),
+        operationId: sender.id,
+        errorMsg: msg
+      })
+
+      await this.senderService.incrementUsageCount(sender.id)
+      await this.taskRepo.increment({ id: task.id }, 'processed', 1)
+
+      this.app.log.warn(`❌ 邀请失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
+      throw error
+    }
+  }
+
   /**
    * 收尾逻辑
    */
   private async finalize(taskId: number): Promise<void> {
-    const remain = await this.taskItemRepo.count({
-      where: {
-        taskId,
-        status: TaskItemStatus.PENDING
-      }
-    })
+    const remain = await this.taskItemRepo
+      .createQueryBuilder('item')
+      .where('item.taskId = :taskId', { taskId })
+      .andWhere('item.status IN (:...statuses)', { statuses: [TaskItemStatus.PENDING, TaskItemStatus.PROCESSING] })
+      .getCount()
 
     if (remain > 0) {
       return
@@ -283,12 +445,29 @@ export class TaskExecutor {
     })
   }
 
-  /**
-   * 获取任务发送间隔(毫秒)
-   */
-  private async getSendInterval(taskId: number): Promise<number> {
-    const task = await this.taskRepo.findOneBy({ id: taskId })
-    return Math.max(0, Number(task?.sendInterval ?? 0) * 1000)
+  private pickIntervalMs(intervalTime?: string | null): number {
+    const raw = String(intervalTime ?? '').trim()
+    if (!raw) return 0
+
+    const normalized = raw.replace('~', '-').replace(',', '-').replace(/\s+/g, '-')
+    const parts = normalized.split('-').filter(Boolean)
+
+    let minSec: number
+    let maxSec: number
+    if (parts.length === 1) {
+      minSec = Number(parts[0])
+      maxSec = Number(parts[0])
+    } else {
+      minSec = Number(parts[0])
+      maxSec = Number(parts[1])
+    }
+
+    if (Number.isNaN(minSec) || Number.isNaN(maxSec)) return 0
+    if (minSec < 0 || maxSec < 0) return 0
+    if (maxSec < minSec) [minSec, maxSec] = [maxSec, minSec]
+
+    const sec = minSec === maxSec ? minSec : Math.floor(Math.random() * (maxSec - minSec + 1)) + minSec
+    return sec * 1000
   }
 
   /**

+ 3 - 3
src/schedulers/task.scheduler.ts

@@ -44,13 +44,13 @@ export class TaskScheduler {
     try {
       // 1️⃣ 是否已有执行中的任务
       const runningTask = await this.taskRepo.findOne({
-        where: { status: TaskStatus.SENDING }
+        where: { status: TaskStatus.SENDING, delFlag: false }
       })
       if (runningTask) return
 
       // 2️⃣ 找下一个任务
       const nextTask = await this.taskRepo.findOne({
-        where: { status: TaskStatus.QUEUING },
+        where: { status: TaskStatus.QUEUING, delFlag: false },
         order: { startedAt: 'ASC' }
       })
       if (!nextTask) return
@@ -62,7 +62,7 @@ export class TaskScheduler {
         startedAt: nextTask.startedAt ?? new Date()
       })
 
-      const task = await this.taskRepo.findOneBy({ id: nextTask.id })
+      const task = await this.taskRepo.findOneBy({ id: nextTask.id, delFlag: false })
       if (!task) return
 
       // 4️⃣ 执行(阻塞)

+ 41 - 19
src/services/task.service.ts

@@ -1,6 +1,6 @@
 import { Repository } from 'typeorm'
 import { FastifyInstance } from 'fastify'
-import { Task, TaskStatus } from '../entities/task.entity'
+import { Task, TaskStatus, TaskType } from '../entities/task.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { PaginationResponse } from '../dto/common.dto'
 import { TaskScheduler } from '../schedulers/task.scheduler'
@@ -9,7 +9,11 @@ export class TaskService {
   private taskRepository: Repository<Task>
   private taskItemRepository: Repository<TaskItem>
   private app: FastifyInstance
-  private readonly defaultSenderSendLimit = 5
+  private readonly defaultLimit = 5
+  private readonly defaultIntervalTime = '5-5'
+  private readonly defaultThreads = 5
+  private readonly maxThreads = 10
+  private readonly taskItemInsertChunkSize = 1000
 
   constructor(app: FastifyInstance) {
     this.app = app
@@ -19,16 +23,27 @@ export class TaskService {
 
   async create(data: {
     name: string
-    message: string
     userId: number
     buffer: Buffer
-    sendLimit?: number
+    type: TaskType
+    payload: Record<string, any>
+    accountLimit?: number
+    intervalTime?: string
+    threads?: number
   }): Promise<Task> {
+    const threads = Math.min(this.maxThreads, Math.max(1, Number(data.threads ?? this.defaultThreads)))
+
     const task = this.taskRepository.create({
       name: data.name,
-      message: data.message,
       userId: data.userId,
-      sendLimit: data.sendLimit ?? this.defaultSenderSendLimit
+      type: data.type,
+      payload: data.payload,
+      accountLimit: data.accountLimit ?? this.defaultLimit,
+      intervalTime: (data.intervalTime ?? this.defaultIntervalTime).trim(),
+      threads,
+      processed: 0,
+      success: 0,
+      total: 0
     })
     const savedTask = await this.taskRepository.save(task)
     const total = await this.createTaskItemByBuffer({ taskId: savedTask.id, buffer: data.buffer })
@@ -41,7 +56,7 @@ export class TaskService {
   }
 
   async findAll(page: number = 0, size: number = 20, userId?: number): Promise<PaginationResponse<Task>> {
-    const where = userId ? { userId } : {}
+    const where = userId ? { userId, delFlag: false } : { delFlag: false }
     const [tasks, total] = await this.taskRepository.findAndCount({
       where,
       skip: (Number(page) || 0) * (Number(size) || 20),
@@ -71,19 +86,26 @@ export class TaskService {
 
   async createTaskItemByBuffer(data: { taskId: number; buffer: Buffer }): Promise<number> {
     const content = data.buffer.toString('utf-8')
-    const lines = content.split('\n').filter(line => line.trim())
+    const lines = content.split(/\r?\n/).filter(line => line.trim())
     if (lines.length === 0) {
       return 0
     }
-    const taskItems = lines.map(line =>
-      this.taskItemRepository.create({
-        taskId: data.taskId,
-        target: line.trim(),
-        status: TaskItemStatus.PENDING
-      })
-    )
-    await this.taskItemRepository.save(taskItems)
-    return taskItems.length
+
+    const values = lines.map(line => ({
+      taskId: data.taskId,
+      target: line.trim(),
+      status: TaskItemStatus.PENDING as TaskItemStatus
+    }))
+
+    await this.app.dataSource.transaction(async manager => {
+      const repo = manager.getRepository(TaskItem)
+      for (let i = 0; i < values.length; i += this.taskItemInsertChunkSize) {
+        const chunk = values.slice(i, i + this.taskItemInsertChunkSize)
+        await repo.insert(chunk)
+      }
+    })
+
+    return values.length
   }
 
   async findTaskItems(
@@ -133,7 +155,7 @@ export class TaskService {
     }
 
     const running = await this.taskRepository.findOne({
-      where: { status: TaskStatus.SENDING }
+      where: { status: TaskStatus.SENDING, delFlag: false }
     })
 
     await this.taskRepository.update(id, {
@@ -169,7 +191,7 @@ export class TaskService {
     }
 
     const running = await this.taskRepository.findOne({
-      where: { status: TaskStatus.SENDING }
+      where: { status: TaskStatus.SENDING, delFlag: false }
     })
 
     await this.taskRepository.update(id, {

+ 23 - 15
src/services/test.service.ts

@@ -65,7 +65,7 @@ export class TestService {
         }
       }
 
-      this.senderSendLimit = validatedLimit ?? task.sendLimit ?? this.senderSendLimit
+      this.senderSendLimit = validatedLimit ?? task.accountLimit ?? this.senderSendLimit
 
       this.senderCache = []
       this.senderCursor = 0
@@ -158,12 +158,16 @@ export class TestService {
               throw new Error('目标用户不允许接收消息或已被限制')
             }
 
-            await senderTgClient.sendMessageToPeer(targetPeer, task.message)
+            const message = String(task.payload?.message ?? '').trim()
+            if (!message) {
+              throw new Error('消息内容为空')
+            }
+            await senderTgClient.sendMessageToPeer(targetPeer, message)
 
             await this.taskItemRepository.update(item.id, {
               status: TaskItemStatus.SUCCESS,
-              sentAt: new Date(),
-              senderId: sender.id,
+              operatingAt: new Date(),
+              operationId: sender.id,
               errorMsg: null
             })
             totalSuccess++
@@ -174,8 +178,8 @@ export class TestService {
             try {
               await this.taskItemRepository.update(item.id, {
                 status: TaskItemStatus.FAILED,
-                sentAt: new Date(),
-                senderId: sender.id,
+                operatingAt: new Date(),
+                operationId: sender.id,
                 errorMsg: msg
               })
             } catch (updateError) {
@@ -206,10 +210,10 @@ export class TestService {
       }
 
       if (totalSent > 0) {
-        await this.taskRepository.increment({ id: taskId }, 'sent', totalSent)
+        await this.taskRepository.increment({ id: taskId }, 'processed', totalSent)
       }
       if (totalSuccess > 0) {
-        await this.taskRepository.increment({ id: taskId }, 'successCount', totalSuccess)
+        await this.taskRepository.increment({ id: taskId }, 'success', totalSuccess)
       }
 
       const pendingLeft = await this.taskItemRepository.count({
@@ -250,7 +254,7 @@ export class TestService {
     message: string
     data?: {
       sender: { id: string }
-      task: { id: number; name: string; message: string }
+      task: { id: number; name: string; payload: any }
       totalSent: number
       successCount: number
       failedCount: number
@@ -434,7 +438,11 @@ export class TestService {
             throw new Error('目标用户不允许接收消息或已被限制')
           }
 
-          await this.tgClientService.sendMessageToPeer(targetPeer, task.message)
+          const message = String(task.payload?.message ?? '').trim()
+          if (!message) {
+            throw new Error('消息内容为空')
+          }
+          await this.tgClientService.sendMessageToPeer(targetPeer, message)
 
           try {
             await this.tgClientService.clearConversation(targetPeer)
@@ -463,8 +471,8 @@ export class TestService {
               try {
                 await this.taskItemRepository.update(taskItem.id, {
                   status: TaskItemStatus.SUCCESS,
-                  sentAt: new Date(),
-                  senderId: senderId,
+                  operatingAt: new Date(),
+                  operationId: senderId,
                   errorMsg: null
                 })
               } catch (updateError) {
@@ -485,8 +493,8 @@ export class TestService {
               try {
                 await this.taskItemRepository.update(taskItem.id, {
                   status: TaskItemStatus.FAILED,
-                  sentAt: new Date(),
-                  senderId: senderId,
+                  operatingAt: new Date(),
+                  operationId: senderId,
                   errorMsg: errorMessage
                 })
               } catch (updateError) {
@@ -528,7 +536,7 @@ export class TestService {
         message: `测试发送完成,共发送 ${totalSentCount} 条,成功 ${totalSuccessCount} 条,失败 ${totalFailedCount} 条`,
         data: {
           sender: { id: sender!.id },
-          task: { id: task.id, name: task.name, message: task.message },
+          task: { id: task.id, name: task.name, payload: task.payload },
           totalSent: totalSentCount,
           successCount: totalSuccessCount,
           failedCount: totalFailedCount

+ 31 - 8
src/services/tg-group.service.ts

@@ -1,7 +1,7 @@
 import { FastifyInstance } from 'fastify'
 import { Repository } from 'typeorm'
 import { TgUser } from '../entities/tg-user.entity'
-import { TgGroup } from '../entities/tg-group.entity'
+import { TgGroup, TgGroupType } from '../entities/tg-group.entity'
 import { TgClientService } from './tgClient.service'
 import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
 import { TelegramClient } from 'telegram'
@@ -24,6 +24,13 @@ export class TgGroupService {
     this.senderRepository = app.dataSource.getRepository(TgUser)
   }
 
+  private normalizeGroupType(groupType?: string): TgGroupType | undefined {
+    if (!groupType) return undefined
+    if (groupType === TgGroupType.CHANNEL) return TgGroupType.CHANNEL
+    if (groupType === TgGroupType.MEGA_GROUP) return TgGroupType.MEGA_GROUP
+    return undefined
+  }
+
   async createTgGroup(
     groupName: string,
     groupDescription: string,
@@ -141,22 +148,32 @@ export class TgGroupService {
 
   async upsertGroup(payload: CreateTgGroupRecordBody): Promise<TgGroup> {
     const existing = await this.tgGroupRepository.findOne({ where: { chatId: payload.chatId } })
+    const normalizedType = this.normalizeGroupType(payload.groupType)
     
     if (existing) {
       // 更新现有记录
       Object.assign(existing, {
-        ...payload,
         chatId: String(payload.chatId),
-        accessHash: String(payload.accessHash)
+        accessHash: String(payload.accessHash),
+        name: payload.name,
+        publicLink: payload.publicLink,
+        inviteLink: payload.inviteLink,
+        ownerId: payload.senderId ? String(payload.senderId) : existing.ownerId,
+        groupType: normalizedType ?? existing.groupType
       })
       return await this.tgGroupRepository.save(existing)
     } else {
       // 创建新记录
       const entity = this.tgGroupRepository.create({
-        ...payload,
         chatId: String(payload.chatId),
-        accessHash: String(payload.accessHash)
-      })
+        accessHash: String(payload.accessHash),
+        name: payload.name,
+        publicLink: payload.publicLink,
+        inviteLink: payload.inviteLink,
+        ownerId: String(payload.senderId ?? ''),
+        groupType: normalizedType ?? TgGroupType.MEGA_GROUP,
+        delFlag: false
+      } as Partial<TgGroup>)
       return await this.tgGroupRepository.save(entity)
     }
   }
@@ -166,7 +183,13 @@ export class TgGroupService {
     if (!existing) {
       return null
     }
-    const { chatId, ...updateData } = payload
+    const { chatId, ...rest } = payload
+    const updateData: any = {
+      ...rest
+    }
+    if (payload.groupType !== undefined) {
+      updateData.groupType = this.normalizeGroupType(payload.groupType)
+    }
     await this.tgGroupRepository.update({ chatId }, updateData)
     return await this.tgGroupRepository.findOne({ where: { chatId: payload.chatId } })
   }
@@ -178,7 +201,7 @@ export class TgGroupService {
   async list(query: ListTgGroupQuery): Promise<PaginationResponse<TgGroup>> {
     const { page = 0, size = 20, senderId, delFlag } = query
     const where: any = {}
-    if (senderId !== undefined) where.senderId = senderId
+    if (senderId !== undefined) where.ownerId = senderId
     if (delFlag !== undefined) where.delFlag = delFlag
 
     const [content, total] = await this.tgGroupRepository.findAndCount({

+ 33 - 2
src/services/tgClient.service.ts

@@ -236,13 +236,23 @@ export class TgClientService {
         throw new Error('无效的邀请链接格式')
       }
 
-      // 检查邀请链接信息
-      await this.client!.invoke(
+      // 检查邀请链接信息(如果已在群内,直接返回 chat 信息)
+      const check = await this.client!.invoke(
         new Api.messages.CheckChatInvite({
           hash
         })
       )
 
+      const checkAny = check as any
+      // ChatInviteAlready: 已经是成员,会直接带 chat
+      if (checkAny?.className === 'ChatInviteAlready' && checkAny?.chat) {
+        const chat = checkAny.chat
+        const chatId = chat.id?.toString() || chat.id
+        const title = chat.title || '未知群组'
+        this.app.log.info(`已是群组成员: ${title} (ID: ${chatId})`)
+        return { chatId: String(chatId), title }
+      }
+
       // 使用邀请链接加入群组
       const result = await this.client!.invoke(
         new Api.messages.ImportChatInvite({
@@ -300,6 +310,27 @@ export class TgClientService {
     }
   }
 
+  /**
+   * 通过邀请链接获取群组实体(会自动加入或复用已加入状态)
+   * - 用于“拉人进群”任务:需要拿到 chat/channel 实体,进而拿到 accessHash
+   */
+  async resolveGroupEntityByInviteLink(inviteLink: string): Promise<any> {
+    this.ensureConnected()
+    const joined = await this.joinGroupByInviteLink(inviteLink)
+
+    // 通过 chatId 获取实体;部分场景需要 -100 前缀重试(频道/超级群)
+    try {
+      return await this.client!.getEntity(joined.chatId)
+    } catch (err) {
+      const chatIdStr = String(joined.chatId)
+      if (!chatIdStr.startsWith('-100')) {
+        const prefixed = `-100${chatIdStr}`
+        return await this.client!.getEntity(prefixed)
+      }
+      throw err
+    }
+  }
+
   /**
    * 退出指定的频道或群组
    * @param target - 群组标识,可以是 chatId、username 或 InputChannel 对象