Răsfoiți Sursa

更新 Sender 实体的 sessionStr 字段为可为 null,优化 TaskService 的错误处理和日志记录,增强连接稳定性和可调试性。

wuyi 1 lună în urmă
părinte
comite
ea7e8f59ba
3 a modificat fișierele cu 173 adăugiri și 82 ștergeri
  1. 1 1
      src/entities/sender.entity.ts
  2. 43 13
      src/services/task.service.ts
  3. 129 68
      src/services/tgClient.service.ts

+ 1 - 1
src/entities/sender.entity.ts

@@ -13,7 +13,7 @@ export class Sender {
   authKey: string
 
   @Column({ type: 'text', nullable: true })
-  sessionStr: string
+  sessionStr: string | null
 
   @Column({ default: 0 })
   usageCount: number

+ 43 - 13
src/services/task.service.ts

@@ -27,7 +27,7 @@ export class TaskService {
   private readonly taskBatchSize = 50
   private readonly instanceId = `${process.pid}-${Math.random().toString(36).slice(2, 8)}`
   private processingSince: number | null = null
-  private readonly maxProcessingMs = 2 * 60 * 1000 // 防护:单轮处理超过 2 分钟则自愈
+  private readonly maxProcessingMs = 2 * 60 * 1000
 
   constructor(app: FastifyInstance) {
     this.app = app
@@ -160,7 +160,6 @@ export class TaskService {
       startedAt: task.startedAt ?? new Date()
     })
 
-    // 立即触发一次发送周期,避免依赖下次轮询
     void this.taskSendCycle()
   }
 
@@ -200,10 +199,9 @@ export class TaskService {
     if (this.processing) {
       const now = Date.now()
       if (this.processingSince && now - this.processingSince > this.maxProcessingMs) {
-        this.app.log.warn('taskSendCycle 卡死自愈,重置 processing 标记')
+        this.app.log.warn('taskSendCycle 脱离卡死,重置 processing')
         this.processing = false
       } else {
-        this.app.log.debug?.('taskSendCycle skipped: processing=true')
         return
       }
     }
@@ -223,16 +221,13 @@ export class TaskService {
   private async startTaskSend() {
     const task = await this.taskRepository.findOne({
       where: { status: TaskStatus.SENDING, delFlag: false },
-      order: { startedAt: 'ASC', id: 'ASC' }
+      order: { startedAt: 'ASC' }
     })
 
     if (!task) {
-      this.app.log.debug?.('taskSendCycle: 未发现发送中的任务')
       return
     }
-    this.app.log.info(
-      `taskSendCycle: 捕获发送任务 id=${task.id}, startedAt=${task.startedAt?.toISOString?.() ?? 'null'}`
-    )
+    this.app.log.info(`开始发送任务 id=${task.id}, startedAt=${task.startedAt?.toISOString}`)
 
     const configuredSendLimit =
       task.sendLimit && Number(task.sendLimit) > 0 ? Number(task.sendLimit) : this.defaultSenderSendLimit
@@ -350,14 +345,27 @@ export class TaskService {
         await this.tgClientService.disconnectClient(client)
         sender = await this.pickSender()
         const sessionString = await this.ensureSessionString(sender)
-        client = await this.tgClientService.createConnectedClient(sessionString)
+        try {
+          client = await this.tgClientService.createConnectedClient(sessionString)
+        } catch (error) {
+          const msg = error instanceof Error ? error.message : String(error)
+          if (this.isSessionRevokedMessage(msg)) {
+            await this.handleSessionRevoked(sender)
+            sender = null
+            client = null
+            continue
+          }
+          throw error
+        }
         senderSentInRound = 0
 
         const me = await client.getMe().catch(() => null)
         const delaySeconds = this.getRandomDelaySeconds()
         const displayName = `${me?.firstName ?? ''} ${me?.lastName ?? ''}`.trim() || me?.username || ''
         this.app.log.info(
-          `worker=${workerIndex} ,当前登录账号: id: ${me?.id ?? sender.id} ,name: ${displayName || sender.id},延迟 ${delaySeconds}s 后开始发送`
+          `worker=${workerIndex} ,当前登录账号: id: ${me?.id ?? sender.id} ,name: ${
+            displayName || sender.id
+          },延迟 ${delaySeconds}s 后开始发送`
         )
         await this.sleep(delaySeconds * 1000)
       }
@@ -368,8 +376,17 @@ export class TaskService {
       } catch (error) {
         failed++
         const msg = error instanceof Error ? error.message : '未知错误'
+        if (sender && this.isSessionRevokedMessage(msg)) {
+          await this.handleSessionRevoked(sender)
+          await this.tgClientService.disconnectClient(client)
+          sender = null
+          client = null
+          senderSentInRound = 0
+        }
         this.app.log.warn(
-          `❌ 发送失败 taskId=${task.id}, item=${taskItem.id}, sender=${sender?.id ?? '未知'}, worker=${workerIndex}, error: ${msg}`
+          `❌ 发送失败 taskId=${task.id}, item=${taskItem.id}, sender=${
+            sender?.id ?? '未知'
+          }, worker=${workerIndex}, error: ${msg}`
         )
       } finally {
         sent++
@@ -425,7 +442,9 @@ export class TaskService {
       })
 
       await this.senderService.incrementUsageCount(sender.id)
-      this.app.log.info(`✅ 发送成功 taskId=${task.id}, itemId=${taskItem.id}, sender=${sender.id}, worker=${workerIndex}`)
+      this.app.log.info(
+        `✅ 发送成功 taskId=${task.id}, itemId=${taskItem.id}, sender=${sender.id}, worker=${workerIndex}`
+      )
     } catch (error) {
       const msg = error instanceof Error ? error.message : '未知错误'
       await this.taskItemRepository.update(taskItem.id, {
@@ -504,6 +523,17 @@ export class TaskService {
     throw new Error(`sender=${sender.id} 缺少 session 信息`)
   }
 
+  private isSessionRevokedMessage(msg: string): boolean {
+    return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
+  }
+
+  private async handleSessionRevoked(sender: Sender): Promise<void> {
+    await this.senderRepository.update(sender.id, { delFlag: true })
+    this.senderCache = this.senderCache.filter(s => s.id !== sender.id)
+    this.senderCursor = 0
+    this.app.log.warn(`sender=${sender.id} session 失效,已删除`)
+  }
+
   private parseTarget(targetId: string): string | number | null {
     const trimmed = targetId.trim()
 

+ 129 - 68
src/services/tgClient.service.ts

@@ -20,11 +20,16 @@ export class TgClientService {
   private apiHash: string
   private initPromise: Promise<void> | null = null
   private readonly defaultConnectionOptions = {
-    connectionRetries: 5,
-    retryDelay: 2000,
-    requestRetries: 5,
+    connectionRetries: 2,
+    retryDelay: 1500,
+    requestRetries: 3,
     useWSS: true
   }
+  private readonly connectTimeoutMs = 15000
+  private readonly connectionStrategies = [
+    { label: 'WSS', options: { useWSS: true } },
+    { label: 'TCP', options: { useWSS: false } }
+  ]
   private activeClients: Set<TelegramClient> = new Set()
 
   private constructor() {}
@@ -61,7 +66,7 @@ export class TgClientService {
       return this.client
     }
 
-    if (this.client && this.client.connected) {
+    if (this.client) {
       await this.disconnect()
     }
 
@@ -75,67 +80,62 @@ export class TgClientService {
       return
     }
 
-    try {
-      if (this.client.connected) {
-        await this.client.disconnect()
-        this.app.log.info('TelegramClient 已断开连接')
-        await this.client.destroy()
-        this.app.log.info('TelegramClient 已销毁')
-      }
-      this.activeClients.delete(this.client)
-      this.logActiveClientCount()
-    } catch (error) {
-      const errorMessage = error instanceof Error ? error.message : String(error)
-      if (!errorMessage.includes('TIMEOUT')) {
-        this.app.log.error({ msg: '断开连接时发生错误', error: errorMessage })
-      }
-    } finally {
-      this.client = null
-      this.currentSession = null
-    }
+    await this.disposeClient(this.client, 'TelegramClient')
+    this.client = null
+    this.currentSession = null
   }
 
   async createConnectedClient(sessionString: string): Promise<TelegramClient> {
     await this.initializeApp()
 
     const stringSession = new StringSession(sessionString)
-    const connectionOptions = { ...this.defaultConnectionOptions }
+    let lastError: unknown = null
 
-    this.app.log.info(
-      `正在建立连接... useWSS=${connectionOptions.useWSS ? 'true' : 'false'} (443 优先), retries=${
-        connectionOptions.connectionRetries
-      }`
-    )
-
-    const client = new TelegramClient(stringSession, this.apiId, this.apiHash, connectionOptions)
+    for (const strategy of this.connectionStrategies) {
+      const connectionOptions = { ...this.defaultConnectionOptions, ...strategy.options }
+      this.app.log.info(
+        `正在建立连接[${strategy.label}]... useWSS=${connectionOptions.useWSS ? 'true' : 'false'} (443 优先), retries=${
+          connectionOptions.connectionRetries
+        }, retryDelay=${connectionOptions.retryDelay}ms`
+      )
 
-    try {
-      await client.connect()
-    } catch (err) {
-      const errorMessage = err instanceof Error ? err.message : String(err)
-      this.app.log.error({ msg: '连接 Telegram 失败', error: errorMessage, useWSS: connectionOptions.useWSS })
-      throw err
-    }
+      const client = new TelegramClient(stringSession, this.apiId, this.apiHash, connectionOptions)
+
+      try {
+        await this.connectWithTimeout(client)
+
+        const me = await this.ensureValidSession(client)
+        if (me) {
+          this.app.log.info(
+            `当前登录账号: id: ${me.id} ,name: ${`${me.firstName || ''} ${me.lastName || ''}`.trim()} ${me.username || ''}`.trim()
+          )
+        } else {
+          this.app.log.warn('无法获取账号信息')
+        }
+
+        this.activeClients.add(client)
+        this.logActiveClientCount()
+        return client
+      } catch (error) {
+        lastError = error
+        const errorMessage = this.extractErrorMessage(error)
+        this.app.log.error({
+          msg: `连接 Telegram 失败 (${strategy.label})`,
+          error: errorMessage,
+          useWSS: connectionOptions.useWSS
+        })
 
-    if (!client.connected) {
-      throw new Error('TelegramClient 连接失败,请检查网络或 Session 是否有效')
-    }
+        await this.disposeClient(client, `TelegramClient(${strategy.label})`)
 
-    this.app.log.info('TelegramClient 连接成功,正在获取账号信息...')
-    const me = await client.getMe()
-    if (me) {
-      this.app.log.info(
-        `当前登录账号: id: ${me.id} ,name: ${me.firstName || ''} ${me.lastName || ''} ${me.username || ''}`.trim()
-      )
-    } else {
-      this.app.log.warn('无法获取账号信息')
+        if (this.isSessionRevokedError(error)) {
+          throw new Error('Telegram Session 已失效或被吊销,请重新登录生成新的 session 字符串')
+        }
+      }
     }
-    this.app.log.info('账号信息获取完成')
 
-    this.activeClients.add(client)
-    this.logActiveClientCount()
-
-    return client
+    throw new Error(
+      `TelegramClient 连接失败,已尝试 WSS/TCP,最后错误: ${this.extractErrorMessage(lastError) || '未知'}`
+    )
   }
 
   async disconnectClient(client: TelegramClient | null): Promise<void> {
@@ -143,21 +143,7 @@ export class TgClientService {
       return
     }
 
-    try {
-      if (client.connected) {
-        await client.disconnect()
-        this.app.log.info('TelegramClient 已断开连接')
-      }
-      await client.destroy()
-      this.app.log.info('TelegramClient 已销毁')
-      this.activeClients.delete(client)
-      this.logActiveClientCount()
-    } catch (error) {
-      const errorMessage = error instanceof Error ? error.message : String(error)
-      if (!errorMessage.includes('TIMEOUT')) {
-        this.app.log.error({ msg: '断开连接时发生错误', error: errorMessage })
-      }
-    }
+    await this.disposeClient(client, 'TelegramClient')
   }
 
   getActiveClientCount(): number {
@@ -276,6 +262,81 @@ export class TgClientService {
     return '未知错误'
   }
 
+  private isSessionRevokedError(error: unknown): boolean {
+    const msg = this.extractErrorMessage(error)
+    return (
+      msg.includes('SESSION_REVOKED') ||
+      msg.includes('AUTH_KEY_UNREGISTERED') ||
+      msg.includes('AUTH_KEY_INVALID')
+    )
+  }
+
+  private async connectWithTimeout(client: TelegramClient): Promise<void> {
+    let timer: NodeJS.Timeout | null = null
+
+    try {
+      await Promise.race([
+        client.connect(),
+        new Promise((_, reject) => {
+          timer = setTimeout(() => reject(new Error(`连接超时(${this.connectTimeoutMs}ms)`)), this.connectTimeoutMs)
+        })
+      ])
+    } finally {
+      if (timer) {
+        clearTimeout(timer)
+      }
+    }
+
+    if (!client.connected) {
+      throw new Error('TelegramClient 连接失败,请检查网络或 Session 是否有效')
+    }
+  }
+
+  private async ensureValidSession(client: TelegramClient): Promise<any> {
+    try {
+      this.app.log.info('TelegramClient 连接成功,正在获取账号信息...')
+      return await client.getMe()
+    } catch (error) {
+      if (this.isSessionRevokedError(error)) {
+        throw new Error('Telegram Session 已失效或被吊销,需要重新登录获取新的 session')
+      }
+      throw error
+    } finally {
+      this.app.log.info('账号信息获取完成')
+    }
+  }
+
+  private async disposeClient(client: TelegramClient | null, context: string): Promise<void> {
+    if (!client) {
+      return
+    }
+
+    try {
+      if (client.connected) {
+        await client.disconnect()
+        this.app.log.info(`${context} 已断开连接`)
+      }
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      if (!errorMessage.includes('TIMEOUT')) {
+        this.app.log.error({ msg: `${context} 断开时发生错误`, error: errorMessage })
+      }
+    }
+
+    try {
+      await client.destroy()
+      this.app.log.info(`${context} 已销毁`)
+    } catch (error) {
+      const errorMessage = this.extractErrorMessage(error)
+      if (!errorMessage.includes('TIMEOUT')) {
+        this.app.log.error({ msg: `${context} 销毁时发生错误`, error: errorMessage })
+      }
+    }
+
+    this.activeClients.delete(client)
+    this.logActiveClientCount()
+  }
+
   private logTargetInfo(targetPeer: any): void {
     const entityInfo = targetPeer as any
     const logData: any = {