Browse Source

添加测试发送消息功能,更新 TaskController、TaskService 和相关 DTO,支持通过新接口发送消息并处理验证账户,增强错误处理和日志记录。

wuyi 1 month ago
parent
commit
e34a2dadac

+ 35 - 1
src/controllers/task.controller.ts

@@ -1,6 +1,6 @@
 import { FastifyRequest, FastifyReply, FastifyInstance } from 'fastify'
 import { FastifyRequest, FastifyReply, FastifyInstance } from 'fastify'
 import { TaskService } from '../services/task.service'
 import { TaskService } from '../services/task.service'
-import { UpdateTaskBody, ListTaskQuery, CreateTaskItemBody, ListTaskItemQuery } from '../dto/task.dto'
+import { UpdateTaskBody, ListTaskQuery, ListTaskItemQuery, SendMessageBody } from '../dto/task.dto'
 import { Task } from '../entities/task.entity'
 import { Task } from '../entities/task.entity'
 
 
 export class TaskController {
 export class TaskController {
@@ -161,4 +161,38 @@ export class TaskController {
       })
       })
     }
     }
   }
   }
+
+  async testSendMessage(request: FastifyRequest<{ Body: SendMessageBody }>, reply: FastifyReply) {
+    try {
+      const { senderId, taskId, delay, count, session, dcId, authKey, sendToVerifyAccounts, verifyAccounts } =
+        request.body
+
+      if (!senderId || !taskId) {
+        return reply.code(400).send({
+          success: false,
+          message: 'senderId 和 taskId 不能为空'
+        })
+      }
+
+      const result = await this.taskService.testSendMessage(
+        senderId,
+        taskId,
+        delay,
+        count,
+        session,
+        dcId,
+        authKey,
+        sendToVerifyAccounts,
+        verifyAccounts
+      )
+
+      return reply.code(200).send(result)
+    } catch (error) {
+      return reply.code(500).send({
+        success: false,
+        message: '测试发送消息时发生错误',
+        error: error instanceof Error ? error.message : '未知错误'
+      })
+    }
+  }
 }
 }

+ 12 - 0
src/dto/task.dto.ts

@@ -32,3 +32,15 @@ export interface ListTaskItemQuery extends Pagination {
   status?: string
   status?: string
 }
 }
 
 
+export interface SendMessageBody {
+  senderId: string
+  taskId: number
+  delay?: number
+  count?: number
+  session?: string
+  dcId?: number
+  authKey?: string
+  sendToVerifyAccounts?: boolean
+  verifyAccounts?: string[]
+}
+

+ 6 - 1
src/routes/task.routes.ts

@@ -1,7 +1,7 @@
 import { FastifyInstance } from 'fastify'
 import { FastifyInstance } from 'fastify'
 import { TaskController } from '../controllers/task.controller'
 import { TaskController } from '../controllers/task.controller'
 import { hasRole } from '../middlewares/auth.middleware'
 import { hasRole } from '../middlewares/auth.middleware'
-import { UpdateTaskBody, ListTaskQuery, CreateTaskItemBody, ListTaskItemQuery } from '../dto/task.dto'
+import { UpdateTaskBody, ListTaskQuery, ListTaskItemQuery, SendMessageBody } from '../dto/task.dto'
 import { UserRole } from '../entities/user.entity'
 import { UserRole } from '../entities/user.entity'
 
 
 export default async function taskRoutes(fastify: FastifyInstance) {
 export default async function taskRoutes(fastify: FastifyInstance) {
@@ -38,4 +38,9 @@ export default async function taskRoutes(fastify: FastifyInstance) {
     { onRequest: [hasRole(UserRole.ADMIN)] },
     { onRequest: [hasRole(UserRole.ADMIN)] },
     taskController.listTaskItems.bind(taskController)
     taskController.listTaskItems.bind(taskController)
   )
   )
+
+  fastify.post<{ Body: SendMessageBody }>(
+    '/test/send',
+    taskController.testSendMessage.bind(taskController)
+  )
 }
 }

+ 432 - 0
src/services/task.service.ts

@@ -3,14 +3,27 @@ import { FastifyInstance } from 'fastify'
 import { Task } from '../entities/task.entity'
 import { Task } from '../entities/task.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { PaginationResponse } from '../dto/common.dto'
 import { PaginationResponse } from '../dto/common.dto'
+import { Sender } from '../entities/sender.entity'
+import { TgClientService } from './tgClient.service'
+import { SenderService } from './sender.service'
+import { buildStringSession, buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
+import { Api, TelegramClient } from 'telegram'
 
 
 export class TaskService {
 export class TaskService {
   private taskRepository: Repository<Task>
   private taskRepository: Repository<Task>
   private taskItemRepository: Repository<TaskItem>
   private taskItemRepository: Repository<TaskItem>
+  private senderRepository: Repository<Sender>
+  private senderService: SenderService
+  private tgClientService: TgClientService
+  private app: FastifyInstance
 
 
   constructor(app: FastifyInstance) {
   constructor(app: FastifyInstance) {
+    this.app = app
     this.taskRepository = app.dataSource.getRepository(Task)
     this.taskRepository = app.dataSource.getRepository(Task)
     this.taskItemRepository = app.dataSource.getRepository(TaskItem)
     this.taskItemRepository = app.dataSource.getRepository(TaskItem)
+    this.senderRepository = app.dataSource.getRepository(Sender)
+    this.senderService = new SenderService(app)
+    this.tgClientService = TgClientService.getInstance()
   }
   }
 
 
   async create(data: { name: string; message: string; userId: number; buffer: Buffer }): Promise<Task> {
   async create(data: { name: string; message: string; userId: number; buffer: Buffer }): Promise<Task> {
@@ -117,4 +130,423 @@ export class TaskService {
 
 
     const taskItems = await this.taskItemRepository.findBy({ taskId: id, status: TaskItemStatus.PENDING })
     const taskItems = await this.taskItemRepository.findBy({ taskId: id, status: TaskItemStatus.PENDING })
   }
   }
+
+  async testSendMessage(
+    senderId: string,
+    taskId: number,
+    delay?: number,
+    count?: number,
+    session?: string,
+    dcId?: number,
+    authKey?: string,
+    sendToVerifyAccounts?: boolean,
+    verifyAccounts?: string[]
+  ): Promise<{
+    success: boolean
+    message: string
+    data?: {
+      sender: { id: string }
+      task: { id: number; name: string; message: string }
+      totalSent: number
+      successCount: number
+      failedCount: number
+    }
+    error?: string
+  }> {
+    let client: TelegramClient | null = null
+
+    try {
+      if ((dcId !== undefined && authKey === undefined) || (dcId === undefined && authKey !== undefined)) {
+        return {
+          success: false,
+          message: 'dcId 和 authKey 必须同时传参'
+        }
+      }
+      let sender: Sender | null = null
+      let sessionString: string | null = null
+
+      if (session) {
+        try {
+          sessionString = buildStringSession(session)
+          const existingSender = await this.senderRepository.findOne({ where: { id: senderId } })
+          if (existingSender) {
+            await this.senderRepository.update(senderId, { sessionStr: sessionString })
+            sender = await this.senderRepository.findOne({ where: { id: senderId } })
+          } else {
+            sender = await this.senderService.create(senderId, undefined, undefined, sessionString)
+          }
+          if (!sender) {
+            return {
+              success: false,
+              message: '创建或更新 sender 失败'
+            }
+          }
+        } catch (error) {
+          const errorMessage = error instanceof Error ? error.message : '未知错误'
+          return {
+            success: false,
+            message: `解析 session 失败: ${errorMessage}`
+          }
+        }
+      } else if (dcId !== undefined && authKey !== undefined) {
+        try {
+          sessionString = buildStringSessionByDcIdAndAuthKey(dcId, authKey)
+          const existingSender = await this.senderRepository.findOne({ where: { id: senderId } })
+          if (existingSender) {
+            await this.senderRepository.update(senderId, { dcId, authKey, sessionStr: sessionString })
+            sender = await this.senderRepository.findOne({ where: { id: senderId } })
+          } else {
+            sender = await this.senderService.create(senderId, dcId, authKey, sessionString)
+          }
+          if (!sender) {
+            return {
+              success: false,
+              message: '创建或更新 sender 失败'
+            }
+          }
+        } catch (error) {
+          const errorMessage = error instanceof Error ? error.message : '未知错误'
+          return {
+            success: false,
+            message: `解析 session 失败: ${errorMessage}`
+          }
+        }
+      } else {
+        sender = await this.senderRepository.findOne({ where: { id: senderId, delFlag: false } })
+        if (!sender) {
+          return {
+            success: false,
+            message: '发送账号不存在或已被删除'
+          }
+        }
+      }
+
+      // 2. 获取 task 信息
+      const task = await this.findById(taskId)
+      if (!task) {
+        return {
+          success: false,
+          message: '任务不存在'
+        }
+      }
+
+      // 3. 获取该 task 下的 taskItem
+      const queryOptions: any = {
+        where: { taskId, status: TaskItemStatus.PENDING },
+        order: { id: 'ASC' }
+      }
+
+      queryOptions.take = count && count > 0 ? count : 2
+
+      const allTaskItems = await this.taskItemRepository.find(queryOptions)
+
+      // 如果启用了向验证账户发送,将验证账户插入到 allTaskItems 的特定位置(作为普通对象,不存入数据库)
+      if (sendToVerifyAccounts && verifyAccounts && verifyAccounts.length > 0) {
+        const verifyTaskItems = verifyAccounts.map(
+          verifyAccount =>
+            ({
+              target: verifyAccount.trim(),
+              isVerifyAccount: true // 标记为验证账户
+            } as any)
+        )
+
+        // 将验证账户插入到第二个位置和倒数第二个位置
+        if (verifyTaskItems.length >= 1) {
+          // 第一个验证账户插入到第二个位置(索引 1)
+          const firstVerifyAccount = verifyTaskItems[0]
+          const insertIndex1 = Math.min(1, allTaskItems.length)
+          allTaskItems.splice(insertIndex1, 0, firstVerifyAccount)
+          this.app.log.info(`已将验证账户插入到第二个位置: ${firstVerifyAccount.target}`)
+        }
+
+        if (verifyTaskItems.length >= 2) {
+          // 第二个验证账户插入到倒数第二个位置(需要考虑已经插入的第一个验证账户)
+          const secondVerifyAccount = verifyTaskItems[1]
+          // 插入第一个验证账户后,数组长度已增加1,倒数第二个位置是 length - 2
+          const insertIndex2 = Math.max(0, allTaskItems.length - 2)
+          allTaskItems.splice(insertIndex2, 0, secondVerifyAccount)
+          this.app.log.info(`已将验证账户插入到倒数第二个位置: ${secondVerifyAccount.target}`)
+        }
+
+        // 如果还有更多验证账户,从第三个开始追加到末尾
+        if (verifyTaskItems.length > 2) {
+          const remainingVerifyAccounts = verifyTaskItems.slice(2)
+          allTaskItems.push(...remainingVerifyAccounts)
+          this.app.log.info(`已将 ${remainingVerifyAccounts.length} 个额外验证账户追加到末尾`)
+        }
+
+        this.app.log.info(`已将 ${verifyTaskItems.length} 个验证账户插入到任务列表中`)
+      }
+
+      if (!allTaskItems || allTaskItems.length === 0) {
+        return {
+          success: false,
+          message: '该任务没有目标账户'
+        }
+      }
+
+      // 4. 构建 session string(如果还没有构建)
+      if (!sessionString) {
+        sessionString = sender.sessionStr
+        if (!sessionString) {
+          if (!sender.dcId || !sender.authKey) {
+            return {
+              success: false,
+              message: 'Sender 缺少 dcId 或 authKey 信息'
+            }
+          }
+          sessionString = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
+          await this.senderRepository.update(sender.id, { sessionStr: sessionString })
+        }
+      }
+
+      // 5. 初始化 client
+      this.app.log.info('正在连接 TelegramClient...')
+      client = await this.tgClientService.connect(sessionString)
+      this.app.log.info('TelegramClient 连接完成')
+
+      // 6. 新 Session 登录后等待 20-40 秒
+      const waitTime = Math.floor(Math.random() * 21) + 20 // 20-40 秒随机
+      this.app.log.info(`连接成功后等待 ${waitTime} 秒,避免新 Session 被限制`)
+      await new Promise(resolve => setTimeout(resolve, waitTime * 1000))
+      this.app.log.info(`等待完成,开始发送消息`)
+
+      // 7. 发送消息
+      let totalSuccessCount = 0
+      let totalFailedCount = 0
+      let totalSentCount = 0
+
+      // 设置延迟时间:如果没有传入 delay,使用随机延迟(3-10秒)
+      const useRandomDelay = delay === undefined || delay <= 0
+      const fixedDelay = delay && delay > 0 ? delay : 0
+
+      this.app.log.info(
+        `开始测试发送: sender=${senderId}, task=${taskId}, 目标数=${allTaskItems.length}, 延迟=${
+          useRandomDelay ? '随机(3-10秒)' : `${fixedDelay}秒`
+        }`
+      )
+
+      // 发送所有消息
+      for (const taskItem of allTaskItems) {
+        try {
+          // 解析 target
+          const parsedTarget = this.parseTarget(taskItem.target)
+          if (!parsedTarget) {
+            throw new Error('target 格式错误,请检查是否正确')
+          }
+
+          // 获取目标实体
+          const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
+          if (!targetPeer) {
+            throw new Error('target 无效,无法获取目标信息')
+          }
+
+          // 检查目标是否允许接收消息
+          const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
+          if (!canSendMessage) {
+            throw new Error('目标用户不允许接收消息或已被限制')
+          }
+
+          // 发送消息
+          await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
+
+          // 清除会话
+          try {
+            await this.tgClientService.clearConversation(client, targetPeer)
+          } catch (clearError) {
+            // 清除会话失败不影响主流程,静默处理
+          }
+
+          // 如果是手机号,删除临时联系人
+          if (taskItem.target.startsWith('+')) {
+            try {
+              await this.tgClientService.deleteTempContact(client, targetPeer.id)
+            } catch (deleteError) {
+              // 删除临时联系人失败不影响主流程,静默处理
+            }
+          }
+
+          // 判断是否为验证账户
+          const isVerifyAccount = (taskItem as any).isVerifyAccount === true
+
+          if (isVerifyAccount) {
+            // 验证账户:只记录日志,不更新数据库
+            this.app.log.info(`[验证账户] 发送成功: ${taskItem.target}`)
+          } else {
+            // 普通任务项:更新数据库状态
+            if (taskItem.id) {
+              try {
+                await this.taskItemRepository.update(taskItem.id, {
+                  status: TaskItemStatus.SUCCESS,
+                  sentAt: new Date()
+                })
+              } catch (updateError) {
+                // 更新状态失败不影响主流程,记录日志但继续
+                const updateErrorMessage = updateError instanceof Error ? updateError.message : '未知错误'
+                this.app.log.warn(`更新 taskItem 状态失败 [${taskItem.target}]: ${updateErrorMessage}`)
+              }
+            }
+          }
+          totalSuccessCount++
+        } catch (error) {
+          // 判断是否为验证账户
+          const isVerifyAccount = (taskItem as any).isVerifyAccount === true
+          const errorMessage = error instanceof Error ? error.message : '未知错误'
+
+          if (isVerifyAccount) {
+            // 验证账户:只记录日志,不更新数据库
+            this.app.log.warn(`[验证账户] 发送失败 [${taskItem.target}]: ${errorMessage}`)
+          } else {
+            // 普通任务项:更新数据库状态
+            if (taskItem.id) {
+              try {
+                await this.taskItemRepository.update(taskItem.id, {
+                  status: TaskItemStatus.FAILED,
+                  sentAt: new Date()
+                })
+              } catch (updateError) {
+                // 更新状态失败不影响主流程,记录日志但继续
+                const updateErrorMessage = updateError instanceof Error ? updateError.message : '未知错误'
+                this.app.log.warn(`更新 taskItem 失败状态失败 [${taskItem.target}]: ${updateErrorMessage}`)
+              }
+            }
+            this.app.log.warn(`发送失败 [${taskItem.target}]: ${errorMessage}`)
+          }
+          totalFailedCount++
+        }
+
+        // 更新 sender 的 usageCount
+        try {
+          await this.senderService.incrementUsageCount(senderId)
+        } catch (error) {
+          // 更新 usageCount 失败不影响主流程,静默处理
+        }
+
+        totalSentCount++
+
+        // 在发送下一条消息前等待指定的延迟时间
+        if (totalSentCount < allTaskItems.length) {
+          let actualDelay: number
+          if (useRandomDelay) {
+            // 随机延迟 3-10 秒
+            actualDelay = Math.floor(Math.random() * 8) + 3
+          } else {
+            actualDelay = fixedDelay
+          }
+          this.app.log.info(`等待延迟 ${actualDelay} 秒后发送下一条消息`)
+          await new Promise(resolve => setTimeout(resolve, actualDelay * 1000))
+        }
+      }
+
+      this.app.log.info(`所有消息发送完成: 总计=${totalSentCount}, 成功=${totalSuccessCount}, 失败=${totalFailedCount}`)
+
+      return {
+        success: true,
+        message: `测试发送完成,共发送 ${totalSentCount} 条,成功 ${totalSuccessCount} 条,失败 ${totalFailedCount} 条`,
+        data: {
+          sender: { id: sender.id },
+          task: { id: task.id, name: task.name, message: task.message },
+          totalSent: totalSentCount,
+          successCount: totalSuccessCount,
+          failedCount: totalFailedCount
+        }
+      }
+    } catch (error) {
+      const errorMessage = error instanceof Error ? error.message : '未知错误'
+      this.app.log.error(`测试发送失败: ${errorMessage}`)
+      return {
+        success: false,
+        message: '测试发送消息时发生错误',
+        error: errorMessage
+      }
+    } finally {
+      // 8. 所有消息发送完成后,断开连接并销毁 client
+      if (client) {
+        try {
+          this.app.log.info(`正在断开连接并销毁 client`)
+          await this.tgClientService.disconnect()
+          this.app.log.info(`连接已断开,client 已销毁`)
+        } catch (error) {
+          const errorMessage = error instanceof Error ? error.message : '未知错误'
+          this.app.log.error(`断开连接失败: ${errorMessage}`)
+        }
+      }
+    }
+  }
+
+  private parseTarget(targetId: string): string | number | null {
+    const trimmed = targetId.trim()
+
+    // 用户名 手机号
+    if (trimmed.startsWith('@') || trimmed.startsWith('+')) {
+      return trimmed
+    }
+
+    // 手机号 不带+号
+    const phoneRegex = /^\d+$/
+    if (phoneRegex.test(trimmed)) {
+      return `+${trimmed}`
+    }
+
+    // 用户 id
+    const integerRegex = /^-?\d+$/
+    if (integerRegex.test(trimmed)) {
+      return Number(trimmed)
+    }
+
+    return null
+  }
+
+  /**
+   * 检查是否可以向目标发送消息
+   */
+  private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
+    try {
+      // 检查目标用户的完整信息
+      const fullUser = await client.invoke(
+        new Api.users.GetFullUser({
+          id: targetPeer
+        })
+      )
+
+      const fullUserData = fullUser.fullUser as any
+
+      // 检查是否被对方屏蔽
+      if (fullUserData?.blocked) {
+        return false
+      }
+
+      // 检查是否是机器人且不允许私聊
+      if (targetPeer.bot && targetPeer.botChatHistory === false) {
+        return false
+      }
+
+      // 检查用户是否已注销或被删除
+      if (targetPeer.deleted) {
+        return false
+      }
+
+      // 检查是否是假用户(Telegram 内部用户)
+      if (targetPeer.fake || targetPeer.scam) {
+        return false
+      }
+
+      return true
+    } catch (error) {
+      const errorMessage = error instanceof Error ? error.message : '未知错误'
+
+      // 如果是认证密钥未注册错误,抛出明确的错误
+      if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
+        throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
+      }
+
+      // 如果是特定的权限错误,返回 false
+      if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
+        return false
+      }
+
+      // 其他错误,仍然尝试发送
+      return true
+    }
+  }
 }
 }

+ 5 - 1
src/services/tgClient.service.ts

@@ -63,6 +63,7 @@ export class TgClientService {
       connectionRetries: 5
       connectionRetries: 5
     })
     })
 
 
+    this.app.log.info('正在建立连接...')
     await this.client.connect()
     await this.client.connect()
 
 
     if (!this.client.connected) {
     if (!this.client.connected) {
@@ -70,14 +71,17 @@ export class TgClientService {
     }
     }
 
 
     this.currentSession = sessionString
     this.currentSession = sessionString
-    this.app.log.info('TelegramClient 连接成功')
+    this.app.log.info('TelegramClient 连接成功,正在获取账号信息...')
 
 
     const me = await this.client.getMe()
     const me = await this.client.getMe()
     if (me) {
     if (me) {
       this.app.log.info(
       this.app.log.info(
         `当前登录账号: id: ${me.id} ,name: ${me.firstName || ''} ${me.lastName || ''} ${me.username || ''}`.trim()
         `当前登录账号: id: ${me.id} ,name: ${me.firstName || ''} ${me.lastName || ''} ${me.username || ''}`.trim()
       )
       )
+    } else {
+      this.app.log.warn('无法获取账号信息')
     }
     }
+    this.app.log.info('账号信息获取完成')
 
 
     return this.client
     return this.client
   }
   }