Просмотр исходного кода

实现任务调度器和执行器,优化任务管理逻辑,添加任务取消和恢复功能,增强错误处理和日志记录,提升代码可读性和稳定性。同时,重构 TgClientService,简化连接管理,限制并发连接数,确保 Telegram 客户端的稳定性。

wuyi 4 недель назад
Родитель
Сommit
8f93bbfb40

+ 7 - 0
src/app.ts

@@ -8,6 +8,7 @@ import multipart from '@fastify/multipart'
 import fastifyEnv, { FastifyEnvOptions } from '@fastify/env'
 import fastifyEnv, { FastifyEnvOptions } from '@fastify/env'
 import { schema } from './config/env'
 import { schema } from './config/env'
 import { createDataSource } from './config/database'
 import { createDataSource } from './config/database'
+import { TaskScheduler } from './schedulers/task.scheduler'
 import userRoutes from './routes/user.routes'
 import userRoutes from './routes/user.routes'
 import recordsRoutes from './routes/records.routes'
 import recordsRoutes from './routes/records.routes'
 import fileRoutes from './routes/file.routes'
 import fileRoutes from './routes/file.routes'
@@ -100,6 +101,12 @@ export const createApp = async () => {
   await dataSource.initialize()
   await dataSource.initialize()
   app.decorate('dataSource', dataSource)
   app.decorate('dataSource', dataSource)
 
 
+  app.addHook('onReady', async () => {
+    const scheduler = TaskScheduler.getInstance(app)
+    scheduler.trigger()
+    app.log.info('TaskScheduler initialized')
+  })
+
   app.addHook('onClose', async () => {
   app.addHook('onClose', async () => {
     await dataSource.destroy()
     await dataSource.destroy()
     process.exit(0)
     process.exit(0)

+ 6 - 3
src/entities/task.entity.ts

@@ -32,9 +32,6 @@ export class Task {
   @Column({ default: 0 })
   @Column({ default: 0 })
   successCount: number
   successCount: number
 
 
-  @Column({ default: false })
-  delFlag: boolean
-
   @Column({
   @Column({
     type: 'enum',
     type: 'enum',
     enum: TaskStatus,
     enum: TaskStatus,
@@ -42,6 +39,9 @@ export class Task {
   })
   })
   status: TaskStatus
   status: TaskStatus
 
 
+  @Column({ default: false })
+  cancelRequested: boolean
+
   @Column({ default: 5 })
   @Column({ default: 5 })
   sendLimit: number
   sendLimit: number
 
 
@@ -54,6 +54,9 @@ export class Task {
   @Column({ default: 5 })
   @Column({ default: 5 })
   concurrentCount: number
   concurrentCount: number
 
 
+  @Column({ default: false })
+  delFlag: boolean
+
   @Column({ type: 'datetime', precision: 6, default: null })
   @Column({ type: 'datetime', precision: 6, default: null })
   startedAt: Date
   startedAt: Date
 
 

+ 456 - 0
src/executor/task.executor.ts

@@ -0,0 +1,456 @@
+import { FastifyInstance } from 'fastify'
+import { Repository } from 'typeorm'
+import { TelegramClient, Api } from 'telegram'
+import { Task, TaskStatus } from '../entities/task.entity'
+import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
+import { Sender } from '../entities/sender.entity'
+import { SenderService } from '../services/sender.service'
+import { TgClientService } from '../services/tgClient.service'
+import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
+
+export class TaskExecutor {
+  private taskRepo: Repository<Task>
+  private taskItemRepo: Repository<TaskItem>
+  private senderRepo: Repository<Sender>
+  private senderService: SenderService
+
+  private readonly defaultSenderSendLimit = 5
+  private currentSenderSendLimit = this.defaultSenderSendLimit
+  private senderUsageInBatch: Map<string, number> = new Map()
+  private senderCursor = 0
+  private senderCache: Sender[] = []
+
+  constructor(private app: FastifyInstance) {
+    const ds = app.dataSource
+    this.senderService = new SenderService(app)
+    this.taskRepo = ds.getRepository(Task)
+    this.taskItemRepo = ds.getRepository(TaskItem)
+    this.senderRepo = ds.getRepository(Sender)
+  }
+
+  /**
+   * TaskScheduler 唯一入口
+   */
+  async execute(task: Task): Promise<void> {
+    try {
+      await this.beforeExecute(task)
+
+      // 初始化 sender 配置
+      this.currentSenderSendLimit =
+        task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
+      this.senderUsageInBatch.clear()
+      this.senderCursor = 0
+      await this.refreshSenderCache()
+
+      await this.process(task)
+      await this.finalize(task.id)
+    } catch (err) {
+      this.app.log.error({ err, taskId: task.id }, 'TaskExecutor.execute failed')
+      throw err
+    }
+  }
+
+  /**
+   * 执行前校验 & 标记
+   */
+  private async beforeExecute(task: Task): Promise<void> {
+    if (task.status !== TaskStatus.SENDING) {
+      throw new Error(`Task ${task.id} status invalid: ${task.status}`)
+    }
+
+    if (task.cancelRequested) {
+      await this.taskRepo.update(task.id, {
+        status: TaskStatus.CANCELED
+      })
+      throw new Error(`Task ${task.id} canceled before execution`)
+    }
+  }
+
+  /**
+   * 核心发送逻辑(并发 worker)
+   */
+  private async process(task: Task): Promise<void> {
+    const concurrency = Math.max(1, task.concurrentCount)
+
+    const workers: Promise<void>[] = []
+
+    for (let i = 0; i < concurrency; i++) {
+      workers.push(this.workerLoop(task.id))
+    }
+
+    await Promise.all(workers)
+  }
+
+  /**
+   * 单 worker 循环
+   */
+  private async workerLoop(taskId: number): Promise<void> {
+    let sender: Sender | null = null
+    const workerTgClient = new TgClientService()
+    let senderSentInRound = 0
+    const sendIntervalMs = await this.getSendInterval(taskId)
+
+    try {
+      while (true) {
+        const task = await this.taskRepo.findOneBy({ id: taskId })
+        if (!task) return
+
+        if (task.cancelRequested) {
+          if (task.status === TaskStatus.PAUSED) {
+            throw new Error('TASK_PAUSED')
+          }
+          throw new Error('TASK_CANCELED')
+        }
+
+        if (task.status !== TaskStatus.SENDING) {
+          return
+        }
+
+        const taskItem = await this.pickNextTaskItem(taskId)
+        if (!taskItem) {
+          return
+        }
+
+        // Sender 轮换逻辑
+        if (!sender || senderSentInRound >= this.currentSenderSendLimit) {
+          await workerTgClient.disconnect()
+          sender = await this.pickSender()
+          const sessionString = await this.ensureSessionString(sender)
+
+          try {
+            await workerTgClient.connect(sessionString)
+          } catch (error) {
+            const msg = error instanceof Error ? error.message : String(error)
+            if (this.isSessionRevokedMessage(msg)) {
+              await this.handleSessionRevoked(sender)
+              sender = null
+              continue
+            }
+            throw error
+          }
+
+          senderSentInRound = 0
+
+          // 获取当前账号信息并延迟
+          const me = await workerTgClient
+            .getClient()
+            ?.getMe()
+            .catch(() => null)
+          const delaySeconds = this.getRandomDelaySeconds()
+          const displayName = `${me?.firstName ?? ''} ${me?.lastName ?? ''}`.trim() || me?.username || ''
+          this.app.log.info(
+            `当前登录账号: id: ${me?.id ?? sender.id}, name: ${
+              displayName || sender.id
+            },延迟 ${delaySeconds}s 后开始发送`
+          )
+          await this.sleep(delaySeconds * 1000)
+        }
+
+        try {
+          await this.processTaskItem(task, taskItem, sender, workerTgClient)
+        } catch (error) {
+          const msg = error instanceof Error ? error.message : '未知错误'
+          if (sender && this.isSessionRevokedMessage(msg)) {
+            await this.handleSessionRevoked(sender)
+            await workerTgClient.disconnect()
+            sender = null
+            senderSentInRound = 0
+          }
+        } finally {
+          senderSentInRound++
+          if (sender) {
+            const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
+            this.senderUsageInBatch.set(sender.id, used)
+          }
+        }
+
+        // 发送间隔
+        if (sendIntervalMs > 0) {
+          await this.sleep(sendIntervalMs)
+        }
+      }
+    } finally {
+      await workerTgClient.disconnect()
+    }
+  }
+
+  /**
+   * 拉取一个待发送的 TaskItem(DB 层保证并发安全)
+   */
+  private async pickNextTaskItem(taskId: number): Promise<TaskItem | null> {
+    const item = await this.taskItemRepo
+      .createQueryBuilder()
+      .where('taskId = :taskId', { taskId })
+      .andWhere('status = :status', { status: TaskItemStatus.PENDING })
+      .orderBy('id', 'ASC')
+      .setLock('pessimistic_write')
+      .getOne()
+
+    if (!item) return null
+
+    await this.taskItemRepo.update(item.id, {
+      status: TaskItemStatus.PENDING
+    })
+
+    return item
+  }
+
+  /**
+   * 真正发送一条消息
+   */
+  private async processTaskItem(
+    task: Task,
+    item: TaskItem,
+    sender: Sender,
+    workerTgClient: TgClientService
+  ): Promise<void> {
+    try {
+      const parsedTarget = this.parseTarget(item.target)
+      if (!parsedTarget) {
+        throw new Error('target 格式错误,请检查是否正确')
+      }
+
+      const targetPeer = await workerTgClient.getTargetPeer(parsedTarget)
+      if (!targetPeer) {
+        throw new Error('target 无效,无法获取目标信息')
+      }
+
+      const canSendMessage = await this.checkCanSendMessage(workerTgClient.getClient()!, targetPeer)
+      if (!canSendMessage) {
+        throw new Error('目标用户不允许接收消息或已被限制')
+      }
+
+      await workerTgClient.sendMessageToPeer(targetPeer, task.message)
+      await workerTgClient.clearConversation(targetPeer).catch(() => {})
+      await workerTgClient.deleteTempContact((targetPeer as any).id).catch(() => {})
+
+      await this.taskItemRepo.update(item.id, {
+        status: TaskItemStatus.SUCCESS,
+        sentAt: new Date(),
+        senderId: sender.id,
+        errorMsg: null
+      })
+
+      await this.senderService.incrementUsageCount(sender.id)
+      await this.taskRepo.increment({ id: task.id }, 'sent', 1)
+      await this.taskRepo.increment({ id: task.id }, 'successCount', 1)
+
+      this.app.log.info(`✅ 发送成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
+    } catch (error) {
+      const msg = error instanceof Error ? error.message : '未知错误'
+      await this.taskItemRepo.update(item.id, {
+        status: TaskItemStatus.FAILED,
+        sentAt: new Date(),
+        senderId: sender.id,
+        errorMsg: msg
+      })
+
+      await this.senderService.incrementUsageCount(sender.id)
+      await this.taskRepo.increment({ id: task.id }, 'sent', 1)
+
+      this.app.log.warn(`❌ 发送失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
+      throw error
+    }
+  }
+
+  /**
+   * 收尾逻辑
+   */
+  private async finalize(taskId: number): Promise<void> {
+    const remain = await this.taskItemRepo.count({
+      where: {
+        taskId,
+        status: TaskItemStatus.PENDING
+      }
+    })
+
+    if (remain > 0) {
+      return
+    }
+
+    const task = await this.taskRepo.findOneBy({ id: taskId })
+    if (!task) return
+
+    if (task.cancelRequested) {
+      await this.taskRepo.update(taskId, {
+        status: TaskStatus.CANCELED
+      })
+      return
+    }
+
+    await this.taskRepo.update(taskId, {
+      status: TaskStatus.COMPLETED
+    })
+  }
+
+  /**
+   * 获取任务发送间隔(毫秒)
+   */
+  private async getSendInterval(taskId: number): Promise<number> {
+    const task = await this.taskRepo.findOneBy({ id: taskId })
+    return Math.max(0, Number(task?.sendInterval ?? 0) * 1000)
+  }
+
+  /**
+   * 刷新发送者缓存
+   */
+  private async refreshSenderCache(): Promise<void> {
+    this.senderCache = await this.senderRepo.find({
+      where: { delFlag: false },
+      order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
+    })
+    this.senderCursor = 0
+  }
+
+  /**
+   * 选择可用的发送者
+   */
+  private async pickSender(): Promise<Sender> {
+    if (this.senderCache.length === 0) {
+      this.senderCache = await this.senderRepo.find({
+        where: { delFlag: false },
+        order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
+      })
+      this.senderCursor = 0
+    }
+
+    if (this.senderCache.length === 0) {
+      throw new Error('暂无可用 sender 账号')
+    }
+
+    const total = this.senderCache.length
+    for (let i = 0; i < total; i++) {
+      const index = (this.senderCursor + i) % total
+      const sender = this.senderCache[index]
+      const used = this.senderUsageInBatch.get(sender.id) ?? 0
+      if (used < this.currentSenderSendLimit) {
+        this.senderCursor = (index + 1) % total
+        return sender
+      }
+    }
+
+    this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
+    this.senderUsageInBatch.clear()
+    this.senderCursor = 0
+    return this.senderCache[0]
+  }
+
+  /**
+   * 确保发送者有有效的会话字符串
+   */
+  private async ensureSessionString(sender: Sender): Promise<string> {
+    if (sender.sessionStr) {
+      return sender.sessionStr
+    }
+
+    if (sender.dcId && sender.authKey) {
+      const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
+      await this.senderRepo.update(sender.id, { sessionStr: session })
+      return session
+    }
+
+    throw new Error(`sender=${sender.id} 缺少 session 信息`)
+  }
+
+  /**
+   * 处理会话被撤销的情况
+   */
+  private async handleSessionRevoked(sender: Sender): Promise<void> {
+    await this.senderRepo.update(sender.id, { delFlag: true })
+    this.senderCache = this.senderCache.filter(s => s.id !== sender.id)
+    this.senderCursor = 0
+    this.app.log.warn(`sender=${sender.id} session 失效,已删除`)
+  }
+
+  /**
+   * 检查是否可以发送消息给目标用户
+   */
+  private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
+    try {
+      const fullUser = await client.invoke(
+        new Api.users.GetFullUser({
+          id: targetPeer
+        })
+      )
+
+      const fullUserData = fullUser.fullUser as any
+
+      if (fullUserData?.blocked) {
+        return false
+      }
+
+      if (targetPeer.bot && targetPeer.botChatHistory === false) {
+        return false
+      }
+
+      if (targetPeer.deleted) {
+        return false
+      }
+
+      if (targetPeer.fake || targetPeer.scam) {
+        return false
+      }
+
+      return true
+    } catch (error) {
+      const errorMessage = error instanceof Error ? error.message : '未知错误'
+
+      if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
+        throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
+      }
+
+      if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
+        return false
+      }
+
+      return true
+    }
+  }
+
+  /**
+   * 解析目标标识符
+   */
+  private parseTarget(targetId: string): string | number | null {
+    const trimmed = targetId.trim()
+
+    // 用户名 手机号
+    if (trimmed.startsWith('@') || trimmed.startsWith('+')) {
+      return trimmed
+    }
+
+    // 手机号 不带+号,使用正则
+    const phoneRegex = /^\d+$/
+    if (phoneRegex.test(trimmed)) {
+      return `+${trimmed}`
+    }
+
+    // 用户 id
+    const integerRegex = /^-?\d+$/
+    if (integerRegex.test(trimmed)) {
+      return Number(trimmed)
+    }
+
+    return null
+  }
+
+  /**
+   * 获取随机延迟秒数
+   */
+  private getRandomDelaySeconds(min: number = 10, max: number = 20): number {
+    return Math.floor(Math.random() * (max - min + 1)) + min
+  }
+
+  /**
+   * 检查错误消息是否表示会话被撤销
+   */
+  private isSessionRevokedMessage(msg: string): boolean {
+    return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
+  }
+
+  /**
+   * 延迟指定毫秒数
+   */
+  private async sleep(ms: number): Promise<void> {
+    return await new Promise(resolve => setTimeout(resolve, ms))
+  }
+}

+ 125 - 0
src/schedulers/task.scheduler.ts

@@ -0,0 +1,125 @@
+import { Repository } from 'typeorm'
+import { FastifyInstance } from 'fastify'
+import { Task, TaskStatus } from '../entities/task.entity'
+import { TaskExecutor } from '../executor/task.executor'
+
+export class TaskScheduler {
+  private static instance: TaskScheduler
+
+  private taskRepo: Repository<Task>
+  private taskExecutor: TaskExecutor
+  private running = false
+  private readonly taskExecutionTimeoutMs = 3 * 60 * 1000
+
+  private constructor(private app: FastifyInstance) {
+    this.taskRepo = app.dataSource.getRepository(Task)
+    this.taskExecutor = new TaskExecutor(app)
+  }
+
+  static getInstance(app: FastifyInstance): TaskScheduler {
+    if (!this.instance) {
+      this.instance = new TaskScheduler(app)
+    }
+    return this.instance
+  }
+
+  /**
+   * 对外触发
+   */
+  trigger(): void {
+    if (this.running) return
+
+    this.tryRunNext().catch(err => {
+      console.error('[TaskScheduler] trigger error:', err)
+    })
+  }
+
+  /**
+   * 调度入口
+   */
+  private async tryRunNext(): Promise<void> {
+    if (this.running) return
+    this.running = true
+
+    try {
+      // 1️⃣ 是否已有执行中的任务
+      const runningTask = await this.taskRepo.findOne({
+        where: { status: TaskStatus.SENDING }
+      })
+      if (runningTask) return
+
+      // 2️⃣ 找下一个任务
+      const nextTask = await this.taskRepo.findOne({
+        where: { status: TaskStatus.QUEUING },
+        order: { startedAt: 'ASC' }
+      })
+      if (!nextTask) return
+
+      // 3️⃣ 抢占执行权
+      await this.taskRepo.update(nextTask.id, {
+        status: TaskStatus.SENDING,
+        cancelRequested: false,
+        startedAt: nextTask.startedAt ?? new Date()
+      })
+
+      const task = await this.taskRepo.findOneBy({ id: nextTask.id })
+      if (!task) return
+
+      // 4️⃣ 执行(阻塞)
+      await this.executeTask(task)
+    } finally {
+      this.running = false
+      setImmediate(() => this.trigger())
+    }
+  }
+
+  /**
+   * 执行任务
+   */
+  private async executeTask(task: Task): Promise<void> {
+    const startTime = Date.now()
+
+    try {
+      await this.runWithTimeout(() => this.taskExecutor.execute(task))
+    } catch (err: any) {
+      await this.handleExecutionError(task.id, err)
+    } finally {
+      this.app.log.info(`[TaskScheduler] task ${task.id} finished in ${Date.now() - startTime}ms`)
+    }
+  }
+
+  /**
+   * 错误处理统一出口
+   */
+  private async handleExecutionError(taskId: number, err: Error): Promise<void> {
+    if (err.message === 'TASK_PAUSED') {
+      await this.taskRepo.update(taskId, { status: TaskStatus.PAUSED })
+      return
+    }
+
+    if (err.message === 'TASK_CANCELED') {
+      await this.taskRepo.update(taskId, { status: TaskStatus.CANCELED })
+      return
+    }
+
+    if (err.message === 'TASK_TIMEOUT') {
+      await this.taskRepo.update(taskId, { status: TaskStatus.CANCELED })
+      return
+    }
+
+    console.error('[TaskScheduler] task failed:', err)
+    await this.taskRepo.update(taskId, {
+      status: TaskStatus.CANCELED
+    })
+  }
+
+  /**
+   * 写死的超时包装
+   */
+  private async runWithTimeout(fn: () => Promise<void>): Promise<void> {
+    return Promise.race([
+      fn(),
+      new Promise((_, reject) => setTimeout(() => reject(new Error('TASK_TIMEOUT')), this.taskExecutionTimeoutMs))
+    ]) as Promise<void>
+  }
+}

+ 11 - 0
src/services/sender.service.ts

@@ -80,6 +80,17 @@ export class SenderService {
     return this.senderRepository.save(sender)
     return this.senderRepository.save(sender)
   }
   }
 
 
+  async pickAvailableSender(): Promise<Sender | null> {
+    const sender = await this.senderRepository.findOne({
+      where: { delFlag: false },
+      order: {
+        usageCount: 'ASC',
+        lastUsageTime: 'DESC'
+      }
+    })
+    return sender
+  }
+
   async importFromJson(
   async importFromJson(
     senders: Array<{ id: string; dcId?: number; authKey?: string; sessionStr?: string }>
     senders: Array<{ id: string; dcId?: number; authKey?: string; sessionStr?: string }>
   ): Promise<{ created: number; updated: number; failed: number; total: number }> {
   ): Promise<{ created: number; updated: number; failed: number; total: number }> {

+ 61 - 19
src/services/task.service.ts

@@ -3,6 +3,7 @@ import { FastifyInstance } from 'fastify'
 import { Task, TaskStatus } from '../entities/task.entity'
 import { Task, TaskStatus } from '../entities/task.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
 import { PaginationResponse } from '../dto/common.dto'
 import { PaginationResponse } from '../dto/common.dto'
+import { TaskScheduler } from '../schedulers/task.scheduler'
 
 
 export class TaskService {
 export class TaskService {
   private taskRepository: Repository<Task>
   private taskRepository: Repository<Task>
@@ -120,43 +121,84 @@ export class TaskService {
 
 
   async startTask(id: number): Promise<void> {
   async startTask(id: number): Promise<void> {
     const task = await this.findById(id)
     const task = await this.findById(id)
-    if (!task) {
-      throw new Error('任务不存在')
-    }
-    if (task.delFlag) {
-      throw new Error('任务已被删除')
-    }
+    if (!task) throw new Error('任务不存在')
+    if (task.delFlag) throw new Error('任务已被删除')
 
 
     if (task.status === TaskStatus.QUEUING || task.status === TaskStatus.SENDING) {
     if (task.status === TaskStatus.QUEUING || task.status === TaskStatus.SENDING) {
       return
       return
     }
     }
 
 
-    if (
-      task.status === TaskStatus.COMPLETED ||
-      task.status === TaskStatus.CANCELED ||
-      task.status === TaskStatus.PAUSED
-    ) {
+    if (task.status === TaskStatus.COMPLETED || task.status === TaskStatus.CANCELED) {
       throw new Error(`任务已结束: ${task.status},无法启动`)
       throw new Error(`任务已结束: ${task.status},无法启动`)
     }
     }
 
 
+    const running = await this.taskRepository.findOne({
+      where: { status: TaskStatus.SENDING }
+    })
+
     await this.taskRepository.update(id, {
     await this.taskRepository.update(id, {
-      status: TaskStatus.SENDING,
+      status: running ? TaskStatus.QUEUING : TaskStatus.SENDING,
+      cancelRequested: false,
       startedAt: task.startedAt ?? new Date()
       startedAt: task.startedAt ?? new Date()
     })
     })
+
+    TaskScheduler.getInstance(this.app).trigger()
   }
   }
 
 
   async pauseTask(id: number): Promise<void> {
   async pauseTask(id: number): Promise<void> {
     const task = await this.findById(id)
     const task = await this.findById(id)
-    if (!task) {
-      throw new Error('任务不存在')
-    }
-    if (task.delFlag) {
-      throw new Error('任务已被删除')
-    }
+    if (!task) throw new Error('任务不存在')
+    if (task.delFlag) throw new Error('任务已被删除')
+
     if (task.status !== TaskStatus.SENDING) {
     if (task.status !== TaskStatus.SENDING) {
       throw new Error('仅发送中的任务可暂停')
       throw new Error('仅发送中的任务可暂停')
     }
     }
 
 
-    await this.taskRepository.update(id, { status: TaskStatus.PAUSED })
+    await this.taskRepository.update(id, {
+      status: TaskStatus.PAUSED,
+      cancelRequested: true
+    })
+  }
+
+  async resumeTask(id: number): Promise<void> {
+    const task = await this.findById(id)
+    if (!task) throw new Error('任务不存在')
+
+    if (task.status !== TaskStatus.PAUSED) {
+      throw new Error('仅暂停中的任务可恢复')
+    }
+
+    const running = await this.taskRepository.findOne({
+      where: { status: TaskStatus.SENDING }
+    })
+
+    await this.taskRepository.update(id, {
+      status: running ? TaskStatus.QUEUING : TaskStatus.SENDING,
+      cancelRequested: false
+    })
+
+    TaskScheduler.getInstance(this.app).trigger()
+  }
+
+  async cancelTask(id: number): Promise<void> {
+    const task = await this.findById(id)
+    if (!task) throw new Error('任务不存在')
+    if (task.delFlag) throw new Error('任务已被删除')
+
+    if (task.status === TaskStatus.QUEUING) {
+      await this.taskRepository.update(id, {
+        status: TaskStatus.CANCELED
+      })
+      return
+    }
+
+    if (task.status === TaskStatus.SENDING) {
+      await this.taskRepository.update(id, {
+        cancelRequested: true
+      })
+      return
+    }
+
+    throw new Error(`任务状态 ${task.status} 不可取消`)
   }
   }
 }
 }

+ 40 - 49
src/services/test.service.ts

@@ -30,7 +30,7 @@ export class TestService {
     this.taskItemRepository = app.dataSource.getRepository(TaskItem)
     this.taskItemRepository = app.dataSource.getRepository(TaskItem)
     this.senderRepository = app.dataSource.getRepository(Sender)
     this.senderRepository = app.dataSource.getRepository(Sender)
     this.senderService = new SenderService(app)
     this.senderService = new SenderService(app)
-    this.tgClientService = TgClientService.getInstance()
+    this.tgClientService = new TgClientService()
     this.chatGroupService = new ChatGroupService(app)
     this.chatGroupService = new ChatGroupService(app)
   }
   }
 
 
@@ -113,27 +113,24 @@ export class TestService {
         const sender = await pickSender()
         const sender = await pickSender()
         const sessionString = await this.ensureSessionStringForSender(sender)
         const sessionString = await this.ensureSessionStringForSender(sender)
 
 
+        const senderTgClient = new TgClientService()
         const connectWithTimeout = async () => {
         const connectWithTimeout = async () => {
           const timeoutMs = 10_000
           const timeoutMs = 10_000
           return Promise.race([
           return Promise.race([
-            this.tgClientService.connect(sessionString),
+            senderTgClient.connect(sessionString),
             (async () => {
             (async () => {
               await delay(timeoutMs)
               await delay(timeoutMs)
               throw new Error(`连接超时(${timeoutMs / 1000}s)`)
               throw new Error(`连接超时(${timeoutMs / 1000}s)`)
             })()
             })()
-          ]) as Promise<TelegramClient>
+          ])
         }
         }
 
 
         this.app.log.info(`sender=${sender.id} 准备连接并发送,当前批次上限=${this.senderSendLimit}`)
         this.app.log.info(`sender=${sender.id} 准备连接并发送,当前批次上限=${this.senderSendLimit}`)
-        let client: TelegramClient
         try {
         try {
-          client = await connectWithTimeout()
+          await connectWithTimeout()
         } catch (error) {
         } catch (error) {
           const msg = error instanceof Error ? error.message : '未知错误'
           const msg = error instanceof Error ? error.message : '未知错误'
           this.app.log.warn(`sender=${sender.id} 连接失败,切换下一个 sender: ${msg}`)
           this.app.log.warn(`sender=${sender.id} 连接失败,切换下一个 sender: ${msg}`)
-          try {
-            await this.tgClientService.disconnect()
-          } catch {}
           continue
           continue
         }
         }
 
 
@@ -151,17 +148,17 @@ export class TestService {
               throw new Error('target 格式错误,请检查是否正确')
               throw new Error('target 格式错误,请检查是否正确')
             }
             }
 
 
-            const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
+            const targetPeer = await senderTgClient.getTargetPeer(parsedTarget)
             if (!targetPeer) {
             if (!targetPeer) {
               throw new Error('target 无效,无法获取目标信息')
               throw new Error('target 无效,无法获取目标信息')
             }
             }
 
 
-            const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
+            const canSendMessage = await this.checkCanSendMessage(senderTgClient.getClient()!, targetPeer)
             if (!canSendMessage) {
             if (!canSendMessage) {
               throw new Error('目标用户不允许接收消息或已被限制')
               throw new Error('目标用户不允许接收消息或已被限制')
             }
             }
 
 
-            await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
+            await senderTgClient.sendMessageToPeer(targetPeer, task.message)
 
 
             await this.taskItemRepository.update(item.id, {
             await this.taskItemRepository.update(item.id, {
               status: TaskItemStatus.SUCCESS,
               status: TaskItemStatus.SUCCESS,
@@ -205,7 +202,7 @@ export class TestService {
           }
           }
         }
         }
 
 
-        await this.tgClientService.disconnect()
+        await senderTgClient.disconnect()
       }
       }
 
 
       if (totalSent > 0) {
       if (totalSent > 0) {
@@ -393,13 +390,13 @@ export class TestService {
               message: 'Sender 缺少 dcId 或 authKey 信息'
               message: 'Sender 缺少 dcId 或 authKey 信息'
             }
             }
           }
           }
-          sessionString = buildStringSessionByDcIdAndAuthKey(sender!.dcId, sender!.authKey)
-          await this.senderRepository.update(sender!.id, { sessionStr: sessionString })
-        }
+        sessionString = buildStringSessionByDcIdAndAuthKey(sender!.dcId, sender!.authKey)
+        await this.senderRepository.update(sender!.id, { sessionStr: sessionString })
       }
       }
+    }
 
 
       this.app.log.info('正在连接 TelegramClient...')
       this.app.log.info('正在连接 TelegramClient...')
-      client = await this.tgClientService.connect(sessionString)
+      await this.tgClientService.connect(sessionString)
       this.app.log.info('TelegramClient 连接完成')
       this.app.log.info('TelegramClient 连接完成')
 
 
       const waitTime = Math.floor(Math.random() * 21) + 20
       const waitTime = Math.floor(Math.random() * 21) + 20
@@ -427,20 +424,20 @@ export class TestService {
             throw new Error('target 格式错误,请检查是否正确')
             throw new Error('target 格式错误,请检查是否正确')
           }
           }
 
 
-          const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
+          const targetPeer = await this.tgClientService.getTargetPeer(parsedTarget)
           if (!targetPeer) {
           if (!targetPeer) {
             throw new Error('target 无效,无法获取目标信息')
             throw new Error('target 无效,无法获取目标信息')
           }
           }
 
 
-          const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
+          const canSendMessage = await this.checkCanSendMessage(this.tgClientService.getClient()!, targetPeer)
           if (!canSendMessage) {
           if (!canSendMessage) {
             throw new Error('目标用户不允许接收消息或已被限制')
             throw new Error('目标用户不允许接收消息或已被限制')
           }
           }
 
 
-          await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
+          await this.tgClientService.sendMessageToPeer(targetPeer, task.message)
 
 
           try {
           try {
-            await this.tgClientService.clearConversation(client, targetPeer)
+            await this.tgClientService.clearConversation(targetPeer)
           } catch (clearError) {
           } catch (clearError) {
             this.app.log.warn(
             this.app.log.warn(
               `清除会话失败 [${taskItem.target}]: ${clearError instanceof Error ? clearError.message : '未知错误'}`
               `清除会话失败 [${taskItem.target}]: ${clearError instanceof Error ? clearError.message : '未知错误'}`
@@ -448,7 +445,7 @@ export class TestService {
           }
           }
 
 
           try {
           try {
-            await this.tgClientService.deleteTempContact(client, targetPeer.id)
+            await this.tgClientService.deleteTempContact(targetPeer.id)
           } catch (deleteError) {
           } catch (deleteError) {
             this.app.log.warn(
             this.app.log.warn(
               `删除临时联系人失败 [${taskItem.target}]: ${
               `删除临时联系人失败 [${taskItem.target}]: ${
@@ -546,15 +543,13 @@ export class TestService {
         error: errorMessage
         error: errorMessage
       }
       }
     } finally {
     } finally {
-      if (client) {
-        try {
-          this.app.log.info(`正在断开连接并销毁 client`)
-          await this.tgClientService.disconnect()
-          this.app.log.info(`连接已断开,client 已销毁`)
-        } catch (error) {
-          const errorMessage = error instanceof Error ? error.message : '未知错误'
-          this.app.log.error(`断开连接失败: ${errorMessage}`)
-        }
+      try {
+        this.app.log.info(`正在断开连接并销毁 client`)
+        await this.tgClientService.disconnect()
+        this.app.log.info(`连接已断开,client 已销毁`)
+      } catch (error) {
+        const errorMessage = error instanceof Error ? error.message : '未知错误'
+        this.app.log.error(`断开连接失败: ${errorMessage}`)
       }
       }
     }
     }
   }
   }
@@ -579,7 +574,7 @@ export class TestService {
       const { sessionString } = sessionResult
       const { sessionString } = sessionResult
 
 
       this.app.log.info('正在连接 TelegramClient...')
       this.app.log.info('正在连接 TelegramClient...')
-      client = await this.tgClientService.connect(sessionString)
+      await this.tgClientService.connect(sessionString)
       this.app.log.info('TelegramClient 连接完成')
       this.app.log.info('TelegramClient 连接完成')
 
 
       const waitTime = Math.floor(Math.random() * 21) + 20
       const waitTime = Math.floor(Math.random() * 21) + 20
@@ -735,14 +730,12 @@ export class TestService {
         message: `创建聊天群失败: ${errorMessage}`
         message: `创建聊天群失败: ${errorMessage}`
       }
       }
     } finally {
     } finally {
-      if (client) {
-        try {
-          await this.tgClientService.disconnect()
-          this.app.log.info('TelegramClient 已断开连接')
-        } catch (error) {
-          const errorMessage = error instanceof Error ? error.message : '未知错误'
-          this.app.log.error(`断开连接失败: ${errorMessage}`)
-        }
+      try {
+        await this.tgClientService.disconnect()
+        this.app.log.info('TelegramClient 已断开连接')
+      } catch (error) {
+        const errorMessage = error instanceof Error ? error.message : '未知错误'
+        this.app.log.error(`断开连接失败: ${errorMessage}`)
       }
       }
     }
     }
   }
   }
@@ -775,7 +768,7 @@ export class TestService {
       const { sessionString } = sessionResult
       const { sessionString } = sessionResult
 
 
       this.app.log.info('正在连接 TelegramClient...')
       this.app.log.info('正在连接 TelegramClient...')
-      client = await this.tgClientService.connect(sessionString)
+      await this.tgClientService.connect(sessionString)
       this.app.log.info('TelegramClient 连接完成')
       this.app.log.info('TelegramClient 连接完成')
 
 
       const waitTime = Math.floor(Math.random() * 21) + 20
       const waitTime = Math.floor(Math.random() * 21) + 20
@@ -881,7 +874,7 @@ export class TestService {
 
 
           let targetUser: any
           let targetUser: any
           try {
           try {
-            targetUser = await this.tgClientService.getTargetPeer(client, parsedTarget)
+            targetUser = await this.tgClientService.getTargetPeer(parsedTarget)
             if (!targetUser) {
             if (!targetUser) {
               throw new Error('无法获取用户信息')
               throw new Error('无法获取用户信息')
             }
             }
@@ -986,14 +979,12 @@ export class TestService {
       this.app.log.error(`邀请成员到群失败: ${errorMessage}`)
       this.app.log.error(`邀请成员到群失败: ${errorMessage}`)
       return { success: false, message: `邀请成员失败: ${errorMessage}` }
       return { success: false, message: `邀请成员失败: ${errorMessage}` }
     } finally {
     } finally {
-      if (client) {
-        try {
-          await this.tgClientService.disconnect()
-          this.app.log.info('TelegramClient 已断开连接')
-        } catch (error) {
-          const errorMessage = error instanceof Error ? error.message : '未知错误'
-          this.app.log.error(`断开连接失败: ${errorMessage}`)
-        }
+      try {
+        await this.tgClientService.disconnect()
+        this.app.log.info('TelegramClient 已断开连接')
+      } catch (error) {
+        const errorMessage = error instanceof Error ? error.message : '未知错误'
+        this.app.log.error(`断开连接失败: ${errorMessage}`)
       }
       }
     }
     }
   }
   }

+ 6 - 9
src/services/tg-group.service.ts

@@ -13,7 +13,7 @@ export class TgGroupService {
   private readonly senderRepository: Repository<Sender>
   private readonly senderRepository: Repository<Sender>
   constructor(app: FastifyInstance) {
   constructor(app: FastifyInstance) {
     this.app = app
     this.app = app
-    this.tgClientService = TgClientService.getInstance()
+    this.tgClientService = new TgClientService()
     this.chatGroupService = new ChatGroupService(app)
     this.chatGroupService = new ChatGroupService(app)
     this.senderRepository = app.dataSource.getRepository(Sender)
     this.senderRepository = app.dataSource.getRepository(Sender)
   }
   }
@@ -74,21 +74,20 @@ export class TgGroupService {
         }
         }
       }
       }
 
 
-      client = await this.tgClientService.createConnectedClient(sender.sessionStr)
+      await this.tgClientService.connect(sender.sessionStr)
 
 
       const groupInfo = await this.tgClientService.createChannelGroup(
       const groupInfo = await this.tgClientService.createChannelGroup(
-        client,
         groupName,
         groupName,
         groupDescription || '',
         groupDescription || '',
         groupType as 'megagroup' | 'channel'
         groupType as 'megagroup' | 'channel'
       )
       )
 
 
       const inputChannel = await this.tgClientService.getInputChannel(groupInfo.chatId, groupInfo.accessHash)
       const inputChannel = await this.tgClientService.getInputChannel(groupInfo.chatId, groupInfo.accessHash)
-      const inviteLink = await this.tgClientService.getInviteLink(client, inputChannel)
-      const publicLink = await this.tgClientService.getPublicLink(client, inputChannel, groupName)
+      const inviteLink = await this.tgClientService.getInviteLink(inputChannel)
+      const publicLink = await this.tgClientService.getPublicLink(inputChannel, groupName)
 
 
       if (initMsg && initMsg.trim().length > 0) {
       if (initMsg && initMsg.trim().length > 0) {
-        await this.tgClientService.sendMessageToChannelGroup(client, inputChannel, initMsg.trim())
+        await this.tgClientService.sendMessageToChannelGroup(inputChannel, initMsg.trim())
       }
       }
 
 
       await this.chatGroupService.upsertGroup({
       await this.chatGroupService.upsertGroup({
@@ -126,9 +125,7 @@ export class TgGroupService {
         message: `创建群组失败: ${errorMessage}`
         message: `创建群组失败: ${errorMessage}`
       }
       }
     } finally {
     } finally {
-      if (client) {
-        await this.tgClientService.disconnectClient(client)
-      }
+      await this.tgClientService.disconnect()
     }
     }
   }
   }
 
 

+ 8 - 533
src/services/tg-msg-send.service.ts

@@ -1,45 +1,14 @@
-import { Repository } from 'typeorm'
-import { TelegramClient } from 'telegram'
 import { FastifyInstance } from 'fastify'
 import { FastifyInstance } from 'fastify'
-import { Api } from 'telegram'
 import { SendMessageResult } from '../dto/tg-msg-send.dto'
 import { SendMessageResult } from '../dto/tg-msg-send.dto'
 import { TgClientService } from './tgClient.service'
 import { TgClientService } from './tgClient.service'
-import { Task, TaskStatus } from '../entities/task.entity'
-import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
-import { Sender } from '../entities/sender.entity'
-import { SenderService } from './sender.service'
-import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
 
 
 export class TgMsgSendService {
 export class TgMsgSendService {
   private app: FastifyInstance
   private app: FastifyInstance
   private tgClientService: TgClientService
   private tgClientService: TgClientService
-  private taskRepository: Repository<Task>
-  private taskItemRepository: Repository<TaskItem>
-  private senderRepository: Repository<Sender>
-  private senderService: SenderService
-  private processing = false
-  private static schedulerStarted = false
-  private readonly pollIntervalMs = 5000
-  private readonly instanceId = `${process.pid}-${Math.random().toString(36).slice(2, 8)}`
-  private processingSince: number | null = null
-  private readonly maxProcessingMs = 2 * 60 * 1000
-  private readonly defaultSenderSendLimit = 5
-  private currentSenderSendLimit = this.defaultSenderSendLimit
-  private senderUsageInBatch: Map<string, number> = new Map()
-  private senderCursor = 0
-  private senderCache: Sender[] = []
-  private readonly taskBatchSize = 50
 
 
   constructor(app: FastifyInstance) {
   constructor(app: FastifyInstance) {
     this.app = app
     this.app = app
-    this.tgClientService = TgClientService.getInstance()
-    this.taskRepository = app.dataSource.getRepository(Task)
-    this.taskItemRepository = app.dataSource.getRepository(TaskItem)
-    this.senderRepository = app.dataSource.getRepository(Sender)
-    this.senderService = new SenderService(app)
-    this.app.addHook('onReady', async () => {
-      this.scheduleTaskSend()
-    })
+    this.tgClientService = new TgClientService()
   }
   }
 
 
   /**
   /**
@@ -55,33 +24,30 @@ export class TgMsgSendService {
       return { success: false, error: configError }
       return { success: false, error: configError }
     }
     }
 
 
-    let client: TelegramClient | null = null
-
     try {
     try {
-      client = await this.tgClientService.connect(sessionString)
+      await this.tgClientService.connect(sessionString)
 
 
       const parsedTarget = this.parseTarget(target)
       const parsedTarget = this.parseTarget(target)
       if (!parsedTarget) {
       if (!parsedTarget) {
         return { success: false, error: 'target 格式错误,请检查是否正确' }
         return { success: false, error: 'target 格式错误,请检查是否正确' }
       }
       }
 
 
-      const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
+      const targetPeer = await this.tgClientService.getTargetPeer(parsedTarget)
       if (!targetPeer) {
       if (!targetPeer) {
         return { success: false, error: 'target 无效,无法获取目标信息' }
         return { success: false, error: 'target 无效,无法获取目标信息' }
       }
       }
 
 
-      const result = await this.tgClientService.sendMessageToPeer(client, targetPeer, message)
+      const result = await this.tgClientService.sendMessageToPeer(targetPeer, message)
       this.app.log.info(`✅ 消息发送成功`, result.id)
       this.app.log.info(`✅ 消息发送成功`, result.id)
 
 
-      await this.tgClientService.clearConversation(client, targetPeer)
+      await this.tgClientService.clearConversation(targetPeer)
       this.app.log.info('会话清除成功')
       this.app.log.info('会话清除成功')
 
 
       if (target.startsWith('+')) {
       if (target.startsWith('+')) {
-        await this.tgClientService.deleteTempContact(client, targetPeer.id)
+        await this.tgClientService.deleteTempContact(targetPeer.id)
         this.app.log.info('临时联系人删除成功')
         this.app.log.info('临时联系人删除成功')
       }
       }
 
 
-      await this.tgClientService.disconnect()
       this.app.log.info('=============发送消息任务结束=============')
       this.app.log.info('=============发送消息任务结束=============')
 
 
       return {
       return {
@@ -95,6 +61,8 @@ export class TgMsgSendService {
         success: false,
         success: false,
         error: errorMessage
         error: errorMessage
       }
       }
+    } finally {
+      await this.tgClientService.disconnect()
     }
     }
   }
   }
 
 
@@ -155,497 +123,4 @@ export class TgMsgSendService {
     }
     }
     return '未知错误'
     return '未知错误'
   }
   }
-
-  /**
-   * 启动任务发送轮询调度器
-   * 使用静态变量确保只启动一个轮询实例
-   */
-  private scheduleTaskSend() {
-    if (TgMsgSendService.schedulerStarted) {
-      return
-    }
-    const interval = setInterval(() => void this.taskSendCycle(), this.pollIntervalMs)
-    TgMsgSendService.schedulerStarted = true
-
-    this.app.addHook('onClose', async () => {
-      clearInterval(interval)
-      TgMsgSendService.schedulerStarted = false
-    })
-
-    this.app.log.info(`任务发送轮询已启动,间隔=${this.pollIntervalMs}ms,实例=${this.instanceId}`)
-  }
-
-  /**
-   * 任务发送轮询循环
-   * 每轮询一次处理一个发送中的任务,包含防卡死机制
-   */
-  private async taskSendCycle() {
-    if (this.processing) {
-      const now = Date.now()
-      if (this.processingSince && now - this.processingSince > this.maxProcessingMs) {
-        this.app.log.warn('taskSendCycle 脱离卡死,重置 processing')
-        this.processing = false
-      } else {
-        return
-      }
-    }
-    this.processing = true
-    this.processingSince = Date.now()
-    try {
-      await this.processTaskSend()
-    } catch (error) {
-      const msg = error instanceof Error ? error.message : '未知错误'
-      const isDbConnectionError =
-        msg.includes('ECONNRESET') ||
-        msg.includes('EPIPE') ||
-        msg.includes('ETIMEDOUT') ||
-        msg.includes('ENOTFOUND') ||
-        msg.includes('Connection lost') ||
-        msg.includes('Connection closed')
-
-      if (isDbConnectionError) {
-        this.app.log.debug(`数据库连接异常,跳过本次轮询: ${msg}`)
-      } else {
-        const stack = error instanceof Error ? error.stack : undefined
-        this.app.log.error(`处理发送任务失败: ${msg}${stack ? `; stack=${stack}` : ''}`)
-      }
-    } finally {
-      this.processing = false
-      this.processingSince = null
-    }
-  }
-
-  /**
-   * 处理任务发送
-   * 查找一个发送中的任务,批量处理待发送的任务项,支持并发发送
-   */
-  private async processTaskSend() {
-    const task = await this.taskRepository.findOne({
-      where: { status: TaskStatus.SENDING, delFlag: false },
-      order: { startedAt: 'ASC' }
-    })
-
-    if (!task) {
-      return
-    }
-    this.app.log.info(`开始发送任务 id=${task.id}, startedAt=${task.startedAt?.toISOString}`)
-
-    const configuredSendLimit =
-      task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
-    const sendIntervalMs = Math.max(0, Number(task.sendInterval ?? 0) * 1000)
-    const batchSize =
-      task.sendBatchSize && Number(task.sendBatchSize) > 0 ? Number(task.sendBatchSize) : this.taskBatchSize
-    const concurrentCount = task.concurrentCount && Number(task.concurrentCount) > 0 ? Number(task.concurrentCount) : 1
-    const batchTotal = batchSize
-
-    this.currentSenderSendLimit = configuredSendLimit
-    this.senderUsageInBatch.clear()
-    this.senderCursor = 0
-    await this.refreshSenderCache()
-
-    const pendingItems = await this.taskItemRepository.find({
-      where: { taskId: task.id, status: TaskItemStatus.PENDING },
-      order: { id: 'ASC' },
-      take: batchTotal
-    })
-
-    if (pendingItems.length === 0) {
-      await this.finalizeTaskIfDone(task.id)
-      return
-    }
-
-    const queue = [...pendingItems]
-    const workerCount = Math.min(concurrentCount, queue.length)
-
-    const workerResults = await Promise.allSettled(
-      Array.from({ length: workerCount }, (_, index) => this.runSenderWorker(task, queue, sendIntervalMs, index))
-    )
-
-    let batchSent = 0
-    let batchSuccess = 0
-
-    workerResults.forEach((result, index) => {
-      if (result.status === 'fulfilled') {
-        batchSent += result.value.sent
-        batchSuccess += result.value.success
-      } else {
-        const msg =
-          result.reason instanceof Error
-            ? result.reason.message
-            : typeof result.reason === 'string'
-            ? result.reason
-            : '未知错误'
-        this.app.log.error(`worker=${index} 执行失败: ${msg}`)
-      }
-    })
-
-    if (batchSent > 0) {
-      await this.taskRepository.increment({ id: task.id }, 'sent', batchSent)
-    }
-    if (batchSuccess > 0) {
-      await this.taskRepository.increment({ id: task.id }, 'successCount', batchSuccess)
-    }
-
-    const latest = await this.taskRepository.findOne({ where: { id: task.id } })
-    if (!latest || latest.status !== TaskStatus.SENDING) {
-      this.app.log.info(`任务 ${task.id} 已暂停或停止,本轮结束`)
-      return
-    }
-
-    await this.finalizeTaskIfDone(task.id)
-  }
-
-  /**
-   * 如果任务已完成,则更新任务状态为已完成
-   * @param taskId 任务ID
-   */
-  private async finalizeTaskIfDone(taskId: number): Promise<void> {
-    const pendingCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.PENDING }
-    })
-    if (pendingCount > 0) {
-      return
-    }
-
-    const successCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.SUCCESS }
-    })
-    const failedCount = await this.taskItemRepository.count({
-      where: { taskId, status: TaskItemStatus.FAILED }
-    })
-
-    await this.taskRepository.update(taskId, {
-      status: TaskStatus.COMPLETED,
-      sent: successCount + failedCount,
-      successCount
-    })
-  }
-
-  /**
-   * 运行发送工作线程
-   * 从队列中取出任务项并发送,支持发送者轮换和会话管理
-   * @param task 任务对象
-   * @param queue 任务项队列
-   * @param sendIntervalMs 发送间隔(毫秒)
-   * @param workerIndex 工作线程索引
-   * @returns 发送统计信息(已发送、成功、失败数量)
-   */
-  private async runSenderWorker(
-    task: Task,
-    queue: TaskItem[],
-    sendIntervalMs: number,
-    workerIndex: number
-  ): Promise<{ sent: number; success: number; failed: number }> {
-    let sent = 0
-    let success = 0
-    let failed = 0
-    let sender: Sender | null = null
-    let client: TelegramClient | null = null
-    let senderSentInRound = 0
-
-    while (true) {
-      const taskItem = queue.shift()
-      if (!taskItem) {
-        break
-      }
-
-      const stillSending = await this.isTaskSending(task.id)
-      if (!stillSending) {
-        this.app.log.info(`任务 ${task.id} 已暂停/停止,worker=${workerIndex} 提前结束`)
-        break
-      }
-
-      if (!sender || senderSentInRound >= this.currentSenderSendLimit) {
-        await this.tgClientService.disconnectClient(client)
-        sender = await this.pickSender()
-        const sessionString = await this.ensureSessionString(sender)
-        try {
-          client = await this.tgClientService.createConnectedClient(sessionString)
-        } catch (error) {
-          const msg = error instanceof Error ? error.message : String(error)
-          if (this.isSessionRevokedMessage(msg)) {
-            await this.handleSessionRevoked(sender)
-            sender = null
-            client = null
-            continue
-          }
-          throw error
-        }
-        senderSentInRound = 0
-
-        const me = await client.getMe().catch(() => null)
-        const delaySeconds = this.getRandomDelaySeconds()
-        const displayName = `${me?.firstName ?? ''} ${me?.lastName ?? ''}`.trim() || me?.username || ''
-        this.app.log.info(
-          `worker=${workerIndex} ,当前登录账号: id: ${me?.id ?? sender.id} ,name: ${
-            displayName || sender.id
-          },延迟 ${delaySeconds}s 后开始发送`
-        )
-        await this.sleep(delaySeconds * 1000)
-      }
-
-      try {
-        await this.sendTaskItemWithClient(task, taskItem, sender!, client!, workerIndex)
-        success++
-      } catch (error) {
-        failed++
-        const msg = error instanceof Error ? error.message : '未知错误'
-        if (sender && this.isSessionRevokedMessage(msg)) {
-          await this.handleSessionRevoked(sender)
-          await this.tgClientService.disconnectClient(client)
-          sender = null
-          client = null
-          senderSentInRound = 0
-        }
-        this.app.log.warn(
-          `❌ 发送失败 taskId=${task.id}, item=${taskItem.id}, sender=${
-            sender?.id ?? '未知'
-          }, worker=${workerIndex}, error: ${msg}`
-        )
-      } finally {
-        sent++
-        senderSentInRound++
-        if (sender) {
-          const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
-          this.senderUsageInBatch.set(sender.id, used)
-        }
-      }
-
-      if (sendIntervalMs > 0) {
-        await this.sleep(sendIntervalMs)
-      }
-    }
-
-    await this.tgClientService.disconnectClient(client)
-
-    return { sent, success, failed }
-  }
-
-  /**
-   * 使用客户端发送任务项
-   * 发送消息、清除会话、删除临时联系人,并更新任务项状态
-   * @param task 任务对象
-   * @param taskItem 任务项对象
-   * @param sender 发送者对象
-   * @param client Telegram 客户端
-   * @param workerIndex 工作线程索引
-   */
-  private async sendTaskItemWithClient(
-    task: Task,
-    taskItem: TaskItem,
-    sender: Sender,
-    client: TelegramClient,
-    workerIndex: number
-  ): Promise<void> {
-    try {
-      const parsedTarget = this.parseTarget(taskItem.target)
-      if (!parsedTarget) {
-        throw new Error('target 格式错误,请检查是否正确')
-      }
-
-      const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
-      if (!targetPeer) {
-        throw new Error('target 无效,无法获取目标信息')
-      }
-
-      const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
-      if (!canSendMessage) {
-        throw new Error('目标用户不允许接收消息或已被限制')
-      }
-
-      await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
-      await this.tgClientService.clearConversation(client, targetPeer).catch(() => {})
-      await this.tgClientService.deleteTempContact(client, (targetPeer as any).id).catch(() => {})
-
-      await this.taskItemRepository.update(taskItem.id, {
-        status: TaskItemStatus.SUCCESS,
-        sentAt: new Date(),
-        senderId: sender.id,
-        errorMsg: null
-      })
-
-      await this.senderService.incrementUsageCount(sender.id)
-      this.app.log.info(
-        `✅ 发送成功 taskId=${task.id}, itemId=${taskItem.id}, sender=${sender.id}, worker=${workerIndex}`
-      )
-    } catch (error) {
-      const msg = error instanceof Error ? error.message : '未知错误'
-      await this.taskItemRepository.update(taskItem.id, {
-        status: TaskItemStatus.FAILED,
-        sentAt: new Date(),
-        senderId: sender.id,
-        errorMsg: msg
-      })
-      await this.senderService.incrementUsageCount(sender.id)
-      throw error
-    }
-  }
-
-  /**
-   * 刷新发送者缓存
-   * 从数据库重新加载所有未删除的发送者,按最后使用时间和使用次数排序
-   */
-  private async refreshSenderCache(): Promise<void> {
-    this.senderCache = await this.senderRepository.find({
-      where: { delFlag: false },
-      order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
-    })
-    this.senderCursor = 0
-  }
-
-  /**
-   * 检查任务是否正在发送中
-   * @param taskId 任务ID
-   * @returns 如果任务状态为发送中且未删除返回 true,否则返回 false
-   */
-  private async isTaskSending(taskId: number): Promise<boolean> {
-    const current = await this.taskRepository.findOne({ where: { id: taskId } })
-    return !!current && current.status === TaskStatus.SENDING && current.delFlag === false
-  }
-
-  /**
-   * 延迟指定毫秒数
-   * @param ms 延迟毫秒数
-   */
-  private async sleep(ms: number): Promise<void> {
-    return await new Promise(resolve => setTimeout(resolve, ms))
-  }
-
-  /**
-   * 选择可用的发送者
-   * 从缓存中选择使用次数最少的发送者,如果所有发送者都达到上限则重置计数
-   * @returns 选中的发送者对象
-   * @throws 如果没有可用发送者则抛出错误
-   */
-  private async pickSender(): Promise<Sender> {
-    if (this.senderCache.length === 0) {
-      this.senderCache = await this.senderRepository.find({
-        where: { delFlag: false },
-        order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
-      })
-      this.senderCursor = 0
-    }
-
-    if (this.senderCache.length === 0) {
-      throw new Error('暂无可用 sender 账号')
-    }
-
-    const total = this.senderCache.length
-    for (let i = 0; i < total; i++) {
-      const index = (this.senderCursor + i) % total
-      const sender = this.senderCache[index]
-      const used = this.senderUsageInBatch.get(sender.id) ?? 0
-      if (used < this.currentSenderSendLimit) {
-        this.senderCursor = (index + 1) % total
-        return sender
-      }
-    }
-
-    this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
-    this.senderUsageInBatch.clear()
-    this.senderCursor = 0
-    return this.senderCache[0]
-  }
-
-  /**
-   * 确保发送者有有效的会话字符串
-   * 如果发送者已有 sessionStr 则直接返回,否则根据 dcId 和 authKey 构建并保存
-   * @param sender 发送者对象
-   * @returns 会话字符串
-   * @throws 如果缺少必要的会话信息则抛出错误
-   */
-  private async ensureSessionString(sender: Sender): Promise<string> {
-    if (sender.sessionStr) {
-      return sender.sessionStr
-    }
-
-    if (sender.dcId && sender.authKey) {
-      const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
-      await this.senderRepository.update(sender.id, { sessionStr: session })
-      return session
-    }
-
-    throw new Error(`sender=${sender.id} 缺少 session 信息`)
-  }
-
-  /**
-   * 处理会话被撤销的情况
-   * 将发送者标记为已删除,并从缓存中移除
-   * @param sender 发送者对象
-   */
-  private async handleSessionRevoked(sender: Sender): Promise<void> {
-    await this.senderRepository.update(sender.id, { delFlag: true })
-    this.senderCache = this.senderCache.filter(s => s.id !== sender.id)
-    this.senderCursor = 0
-    this.app.log.warn(`sender=${sender.id} session 失效,已删除`)
-  }
-
-  /**
-   * 检查是否可以发送消息给目标用户
-   * 检查用户是否被屏蔽、是否为机器人、是否已删除、是否为虚假或诈骗账号
-   * @param client Telegram 客户端
-   * @param targetPeer 目标用户对象
-   * @returns 如果可以发送返回 true,否则返回 false
-   * @throws 如果认证密钥未注册则抛出错误
-   */
-  private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
-    try {
-      const fullUser = await client.invoke(
-        new Api.users.GetFullUser({
-          id: targetPeer
-        })
-      )
-
-      const fullUserData = fullUser.fullUser as any
-
-      if (fullUserData?.blocked) {
-        return false
-      }
-
-      if (targetPeer.bot && targetPeer.botChatHistory === false) {
-        return false
-      }
-
-      if (targetPeer.deleted) {
-        return false
-      }
-
-      if (targetPeer.fake || targetPeer.scam) {
-        return false
-      }
-
-      return true
-    } catch (error) {
-      const errorMessage = error instanceof Error ? error.message : '未知错误'
-
-      if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
-        throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
-      }
-
-      if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
-        return false
-      }
-
-      return true
-    }
-  }
-
-  /**
-   * 获取随机延迟秒数
-   * @param min 最小延迟秒数,默认 10
-   * @param max 最大延迟秒数,默认 20
-   * @returns 随机延迟秒数
-   */
-  private getRandomDelaySeconds(min: number = 10, max: number = 20): number {
-    return Math.floor(Math.random() * (max - min + 1)) + min
-  }
-
-  /**
-   * 检查错误消息是否表示会话被撤销
-   * @param msg 错误消息
-   * @returns 如果是会话被撤销相关的错误返回 true,否则返回 false
-   */
-  private isSessionRevokedMessage(msg: string): boolean {
-    return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
-  }
 }
 }

+ 68 - 126
src/services/tgClient.service.ts

@@ -4,18 +4,8 @@ import { createApp } from '../app'
 import bigInt from 'big-integer'
 import bigInt from 'big-integer'
 
 
 export class TgClientService {
 export class TgClientService {
-  private static _instance: TgClientService
-
-  public static getInstance(): TgClientService {
-    if (!this._instance) {
-      this._instance = new TgClientService()
-    }
-    return this._instance
-  }
-
   private app: any
   private app: any
   private client: TelegramClient | null = null
   private client: TelegramClient | null = null
-  private currentSession: string | null = null
   private apiId: number
   private apiId: number
   private apiHash: string
   private apiHash: string
   private initPromise: Promise<void> | null = null
   private initPromise: Promise<void> | null = null
@@ -30,9 +20,10 @@ export class TgClientService {
     { label: 'WSS', options: { useWSS: true } },
     { label: 'WSS', options: { useWSS: true } },
     { label: 'TCP', options: { useWSS: false } }
     { label: 'TCP', options: { useWSS: false } }
   ]
   ]
-  private activeClients: Set<TelegramClient> = new Set()
+  private static activeClientsCount = 0
+  private static readonly maxActiveClients = 20
 
 
-  private constructor() {}
+  constructor() {}
 
 
   private async initializeApp(): Promise<void> {
   private async initializeApp(): Promise<void> {
     if (this.initPromise) {
     if (this.initPromise) {
@@ -59,33 +50,15 @@ export class TgClientService {
     return this.initPromise
     return this.initPromise
   }
   }
 
 
-  async connect(sessionString: string): Promise<TelegramClient> {
-    await this.initializeApp()
-
-    if (this.client && this.client.connected && this.currentSession === sessionString) {
-      return this.client
+  async connect(sessionString: string): Promise<void> {
+    if (TgClientService.activeClientsCount >= TgClientService.maxActiveClients) {
+      throw new Error('TelegramClient 并发连接数已达上限')
     }
     }
 
 
     if (this.client) {
     if (this.client) {
       await this.disconnect()
       await this.disconnect()
     }
     }
 
 
-    this.client = await this.createConnectedClient(sessionString)
-    this.currentSession = sessionString
-    return this.client
-  }
-
-  async disconnect(): Promise<void> {
-    if (!this.client) {
-      return
-    }
-
-    await this.disposeClient(this.client, 'TelegramClient')
-    this.client = null
-    this.currentSession = null
-  }
-
-  async createConnectedClient(sessionString: string): Promise<TelegramClient> {
     await this.initializeApp()
     await this.initializeApp()
 
 
     const stringSession = new StringSession(sessionString)
     const stringSession = new StringSession(sessionString)
@@ -113,9 +86,10 @@ export class TgClientService {
           this.app.log.warn('无法获取账号信息')
           this.app.log.warn('无法获取账号信息')
         }
         }
 
 
-        this.activeClients.add(client)
+        this.client = client
+        TgClientService.activeClientsCount++
         this.logActiveClientCount()
         this.logActiveClientCount()
-        return client
+        return
       } catch (error) {
       } catch (error) {
         lastError = error
         lastError = error
         const errorMessage = this.extractErrorMessage(error)
         const errorMessage = this.extractErrorMessage(error)
@@ -125,7 +99,7 @@ export class TgClientService {
           useWSS: connectionOptions.useWSS
           useWSS: connectionOptions.useWSS
         })
         })
 
 
-        await this.disposeClient(client, `TelegramClient(${strategy.label})`)
+        await this.disposeClient(client, `TelegramClient(${strategy.label})`, false)
 
 
         if (this.isSessionRevokedError(error)) {
         if (this.isSessionRevokedError(error)) {
           throw new Error('Telegram Session 已失效或被吊销,请重新登录生成新的 session 字符串')
           throw new Error('Telegram Session 已失效或被吊销,请重新登录生成新的 session 字符串')
@@ -138,27 +112,37 @@ export class TgClientService {
     )
     )
   }
   }
 
 
-  async disconnectClient(client: TelegramClient | null): Promise<void> {
-    if (!client) {
+  async disconnect(): Promise<void> {
+    if (!this.client) {
       return
       return
     }
     }
 
 
-    await this.disposeClient(client, 'TelegramClient')
+    await this.disposeClient(this.client, 'TelegramClient', true)
+    this.client = null
+  }
+
+  getClient(): TelegramClient | null {
+    return this.client
+  }
+
+  isConnected(): boolean {
+    return this.client !== null && (this.client.connected ?? false)
   }
   }
 
 
   async createChannelGroup(
   async createChannelGroup(
-    client: TelegramClient,
     groupName: string,
     groupName: string,
     groupDescription: string,
     groupDescription: string,
     groupType: 'megagroup' | 'channel'
     groupType: 'megagroup' | 'channel'
-  ): Promise<{ chatId: string; accessHash: string; client: TelegramClient }> {
+  ): Promise<{ chatId: string; accessHash: string }> {
+    this.ensureConnected()
+
     const waitTime = Math.floor(Math.random() * 21) + 20
     const waitTime = Math.floor(Math.random() * 21) + 20
     this.app.log.info(`连接成功后等待 ${waitTime} 秒,避免新 Session 被限制`)
     this.app.log.info(`连接成功后等待 ${waitTime} 秒,避免新 Session 被限制`)
     await new Promise(resolve => setTimeout(resolve, waitTime * 1000))
     await new Promise(resolve => setTimeout(resolve, waitTime * 1000))
 
 
     this.app.log.info(`开始创建${groupType === 'channel' ? '频道' : '超级群组'}: ${groupName}`)
     this.app.log.info(`开始创建${groupType === 'channel' ? '频道' : '超级群组'}: ${groupName}`)
 
 
-    const result = await client.invoke(
+    const result = await this.client!.invoke(
       new Api.channels.CreateChannel({
       new Api.channels.CreateChannel({
         title: groupName,
         title: groupName,
         about: groupDescription || '',
         about: groupDescription || '',
@@ -184,8 +168,7 @@ export class TgClientService {
 
 
     return {
     return {
       chatId: String(chatId),
       chatId: String(chatId),
-      accessHash: String(accessHash),
-      client
+      accessHash: String(accessHash)
     }
     }
   }
   }
 
 
@@ -196,15 +179,12 @@ export class TgClientService {
     })
     })
   }
   }
 
 
-  async sendMessageToChannelGroup(
-    client: TelegramClient,
-    inputChannel: Api.InputChannel,
-    message: string
-  ): Promise<void> {
+  async sendMessageToChannelGroup(inputChannel: Api.InputChannel, message: string): Promise<void> {
+    this.ensureConnected()
     this.app.log.info('正在发送群组消息...')
     this.app.log.info('正在发送群组消息...')
 
 
     try {
     try {
-      await client.sendMessage(inputChannel, {
+      await this.client!.sendMessage(inputChannel, {
         message: message.trim()
         message: message.trim()
       })
       })
       this.app.log.info('已向群组发送消息')
       this.app.log.info('已向群组发送消息')
@@ -214,15 +194,12 @@ export class TgClientService {
     }
     }
   }
   }
 
 
-  async inviteMembersToChannelGroup(
-    client: TelegramClient,
-    inputChannel: Api.InputChannel,
-    inputUsers: Api.InputUser[]
-  ): Promise<void> {
+  async inviteMembersToChannelGroup(inputChannel: Api.InputChannel, inputUsers: Api.InputUser[]): Promise<void> {
+    this.ensureConnected()
     this.app.log.info('正在邀请成员到群组...')
     this.app.log.info('正在邀请成员到群组...')
 
 
     try {
     try {
-      await client.invoke(
+      await this.client!.invoke(
         new Api.channels.InviteToChannel({
         new Api.channels.InviteToChannel({
           channel: inputChannel,
           channel: inputChannel,
           users: inputUsers
           users: inputUsers
@@ -235,9 +212,11 @@ export class TgClientService {
     }
     }
   }
   }
 
 
-  async getInviteLink(client: TelegramClient, inputChannel: Api.InputChannel): Promise<string | null> {
+  async getInviteLink(inputChannel: Api.InputChannel): Promise<string | null> {
+    this.ensureConnected()
+
     try {
     try {
-      const inviteLink = await client.invoke(new Api.messages.ExportChatInvite({ peer: inputChannel }))
+      const inviteLink = await this.client!.invoke(new Api.messages.ExportChatInvite({ peer: inputChannel }))
       const invite = inviteLink as any
       const invite = inviteLink as any
       if (invite?.link) {
       if (invite?.link) {
         return invite.link
         return invite.link
@@ -249,14 +228,12 @@ export class TgClientService {
     return null
     return null
   }
   }
 
 
-  async getPublicLink(
-    client: TelegramClient,
-    inputChannel: Api.InputChannel,
-    groupName?: string
-  ): Promise<string | null> {
+  async getPublicLink(inputChannel: Api.InputChannel, groupName?: string): Promise<string | null> {
+    this.ensureConnected()
+
     try {
     try {
       const username = this.generateGroupUsername(groupName)
       const username = this.generateGroupUsername(groupName)
-      await client.invoke(
+      await this.client!.invoke(
         new Api.channels.UpdateUsername({
         new Api.channels.UpdateUsername({
           channel: inputChannel,
           channel: inputChannel,
           username
           username
@@ -270,7 +247,8 @@ export class TgClientService {
     return null
     return null
   }
   }
 
 
-  async getTargetPeer(client: TelegramClient, parsedTarget: string | number): Promise<any> {
+  async getTargetPeer(parsedTarget: string | number): Promise<any> {
+    this.ensureConnected()
     this.app.log.info('正在获取目标实体信息...')
     this.app.log.info('正在获取目标实体信息...')
 
 
     try {
     try {
@@ -278,7 +256,7 @@ export class TgClientService {
 
 
       if (typeof parsedTarget === 'string' && parsedTarget.startsWith('+')) {
       if (typeof parsedTarget === 'string' && parsedTarget.startsWith('+')) {
         this.app.log.info('手机号导入联系人...')
         this.app.log.info('手机号导入联系人...')
-        const result = await client.invoke(
+        const result = await this.client!.invoke(
           new Api.contacts.ImportContacts({
           new Api.contacts.ImportContacts({
             contacts: [
             contacts: [
               new Api.InputPhoneContact({
               new Api.InputPhoneContact({
@@ -296,40 +274,27 @@ export class TgClientService {
 
 
         targetPeer = user
         targetPeer = user
       } else {
       } else {
-        targetPeer = await client.getEntity(parsedTarget)
+        targetPeer = await this.client!.getEntity(parsedTarget)
       }
       }
 
 
-      // if (targetPeer) {
-      //   this.logTargetInfo(targetPeer)
-      // }
-
       return targetPeer
       return targetPeer
     } catch (error) {
     } catch (error) {
       const errorMessage = this.extractErrorMessage(error)
       const errorMessage = this.extractErrorMessage(error)
 
 
       if (typeof parsedTarget === 'number') {
       if (typeof parsedTarget === 'number') {
-        // this.app.log.error({
-        //   msg: '无法获取用户实体',
-        //   error: errorMessage,
-        //   targetId: parsedTarget.toString()
-        // })
         throw new Error(`无法获取用户实体: ${errorMessage}`)
         throw new Error(`无法获取用户实体: ${errorMessage}`)
       }
       }
 
 
-      // this.app.log.error({
-      //   msg: '无法获取目标信息',
-      //   error: errorMessage,
-      //   target: parsedTarget
-      // })
       throw new Error(`无法获取目标信息: ${errorMessage}`)
       throw new Error(`无法获取目标信息: ${errorMessage}`)
     }
     }
   }
   }
 
 
-  async sendMessageToPeer(client: TelegramClient, targetPeer: any, message: string): Promise<any> {
+  async sendMessageToPeer(targetPeer: any, message: string): Promise<any> {
+    this.ensureConnected()
     this.app.log.info('正在发送消息...')
     this.app.log.info('正在发送消息...')
 
 
     try {
     try {
-      const result = await client.sendMessage(targetPeer, {
+      const result = await this.client!.sendMessage(targetPeer, {
         message: message
         message: message
       })
       })
       return result
       return result
@@ -339,10 +304,12 @@ export class TgClientService {
     }
     }
   }
   }
 
 
-  async clearConversation(client: TelegramClient, targetPeer: any): Promise<void> {
+  async clearConversation(targetPeer: any): Promise<void> {
+    this.ensureConnected()
     this.app.log.info('正在清除会话...')
     this.app.log.info('正在清除会话...')
+
     try {
     try {
-      await client.invoke(
+      await this.client!.invoke(
         new Api.messages.DeleteHistory({
         new Api.messages.DeleteHistory({
           peer: targetPeer,
           peer: targetPeer,
           revoke: false
           revoke: false
@@ -354,10 +321,12 @@ export class TgClientService {
     }
     }
   }
   }
 
 
-  async deleteTempContact(client: TelegramClient, userId: number): Promise<void> {
+  async deleteTempContact(userId: number): Promise<void> {
+    this.ensureConnected()
     this.app.log.info('正在删除临时联系人...')
     this.app.log.info('正在删除临时联系人...')
+
     try {
     try {
-      await client.invoke(
+      await this.client!.invoke(
         new Api.contacts.DeleteContacts({
         new Api.contacts.DeleteContacts({
           id: [userId]
           id: [userId]
         })
         })
@@ -368,7 +337,13 @@ export class TgClientService {
     }
     }
   }
   }
 
 
-  private async disposeClient(client: TelegramClient | null, context: string): Promise<void> {
+  private ensureConnected(): void {
+    if (!this.client) {
+      throw new Error('TelegramClient 未连接,请先调用 connect() 方法')
+    }
+  }
+
+  private async disposeClient(client: TelegramClient | null, context: string, decrementCount: boolean): Promise<void> {
     if (!client) {
     if (!client) {
       return
       return
     }
     }
@@ -395,8 +370,10 @@ export class TgClientService {
       }
       }
     }
     }
 
 
-    this.activeClients.delete(client)
-    this.logActiveClientCount()
+    if (decrementCount) {
+      TgClientService.activeClientsCount = Math.max(0, TgClientService.activeClientsCount - 1)
+      this.logActiveClientCount()
+    }
   }
   }
 
 
   private extractErrorMessage(error: unknown): string {
   private extractErrorMessage(error: unknown): string {
@@ -500,42 +477,7 @@ export class TgClientService {
     }
     }
   }
   }
 
 
-  private logTargetInfo(targetPeer: any): void {
-    const entityInfo = targetPeer as any
-    const logData: any = {
-      className: entityInfo.className || '未知'
-    }
-
-    if (entityInfo.id !== undefined) {
-      logData.id = entityInfo.id.toString()
-    }
-
-    if (entityInfo.title) {
-      logData.title = entityInfo.title
-    } else if (entityInfo.firstName) {
-      logData.name = `${entityInfo.firstName} ${entityInfo.lastName || ''}`.trim()
-    }
-
-    if (entityInfo.username) {
-      logData.username = entityInfo.username
-    }
-
-    this.app.log.info(logData)
-  }
-
   private logActiveClientCount(): void {
   private logActiveClientCount(): void {
-    this.app?.log?.info?.(`当前活跃 Telegram 客户端数量: ${this.activeClients.size}`)
-  }
-
-  getClient(): TelegramClient | null {
-    return this.client
-  }
-
-  isConnected(): boolean {
-    return this.client !== null && (this.client.connected ?? false)
-  }
-
-  getCurrentSession(): string | null {
-    return this.currentSession
+    this.app?.log?.info?.(`当前活跃 Telegram 客户端数量: ${TgClientService.activeClientsCount}`)
   }
   }
 }
 }