task.executor.ts 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. import { FastifyInstance } from 'fastify'
  2. import { DataSource } from 'typeorm'
  3. import { Repository } from 'typeorm'
  4. import { TelegramClient, Api } from 'telegram'
  5. import { Task, TaskStatus, TaskType } from '../entities/task.entity'
  6. import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
  7. import { TgUser } from '../entities/tg-user.entity'
  8. import { TgUserService } from '../services/tg-user.service'
  9. import { TgClientManager } from '../services/clients/tg-client.manager'
  10. import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
  11. export class TaskExecutor {
  12. private ds: DataSource
  13. private taskRepo: Repository<Task>
  14. private taskItemRepo: Repository<TaskItem>
  15. private senderRepo: Repository<TgUser>
  16. private senderService: TgUserService
  17. // 默认账号使用上限
  18. private readonly defaultAccountLimit = 5
  19. // 当前账号使用上限
  20. private currentAccountLimit = this.defaultAccountLimit
  21. // 账号缓存默认拉取上限
  22. private readonly defaultAccountCacheTake = 100
  23. // 账号缓存最大拉取上限
  24. private readonly maxAccountCacheTake = 1000
  25. // 当前任务的账号缓存拉取上限
  26. private currentAccountCacheTake = this.defaultAccountCacheTake
  27. // 账号使用批次
  28. private accountUsageInBatch: Map<string, number> = new Map()
  29. // 账号游标
  30. private accountCursor = 0
  31. // 账号缓存
  32. private accountCache: TgUser[] = []
  33. // 本批次临时排除的账号(连接/功能异常等),避免在同一轮里反复选中导致任务整体失败
  34. private accountExcludedInBatch: Set<string> = new Set()
  35. constructor(private app: FastifyInstance) {
  36. const ds = app.dataSource
  37. this.ds = ds
  38. this.senderService = new TgUserService(app)
  39. this.taskRepo = ds.getRepository(Task)
  40. this.taskItemRepo = ds.getRepository(TaskItem)
  41. this.senderRepo = ds.getRepository(TgUser)
  42. }
  43. /**
  44. * TaskScheduler 唯一入口
  45. */
  46. async execute(task: Task): Promise<void> {
  47. try {
  48. await this.beforeExecute(task)
  49. // 初始化 tgUser 配置
  50. this.currentAccountLimit =
  51. task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultAccountLimit
  52. this.accountUsageInBatch.clear()
  53. this.accountCursor = 0
  54. // 计算任务需要拉取的账号池大小
  55. const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
  56. this.currentAccountCacheTake = this.computeAccountCacheTake(task, concurrency)
  57. await this.refreshAccountCache()
  58. this.accountExcludedInBatch.clear()
  59. await this.process(task)
  60. await this.finalize(task.id)
  61. } catch (err) {
  62. this.app.log.error({ err, taskId: task.id }, 'TaskExecutor.execute failed')
  63. throw err
  64. }
  65. }
  66. /**
  67. * 计算本次任务需要拉取多少个 tgUser 账号进入缓存
  68. * - 优先支持 task.payload.accountPoolLimit/accountCacheTake 覆盖
  69. * - 否则使用 ceil(total / accountLimit),并至少不小于 threads(减少并发下账号共享)
  70. * - 最终受 maxAccountCacheTake 硬上限保护
  71. */
  72. private computeAccountCacheTake(task: Task, concurrency: number): number {
  73. const maxTake = this.maxAccountCacheTake
  74. const overrideRaw = task.payload?.accountPoolLimit ?? task.payload?.accountCacheTake
  75. if (overrideRaw !== undefined && overrideRaw !== null) {
  76. const override = Number(overrideRaw)
  77. if (!Number.isNaN(override) && override > 0) {
  78. return Math.min(maxTake, Math.max(1, Math.floor(override)))
  79. }
  80. }
  81. const total = Number(task.total ?? 0)
  82. const perAccount = Math.max(1, Number(this.currentAccountLimit || 1))
  83. const needByTotal = total > 0 ? Math.ceil(total / perAccount) : 0
  84. const desired = Math.max(concurrency, needByTotal || this.defaultAccountCacheTake)
  85. return Math.min(maxTake, desired)
  86. }
  87. /**
  88. * 执行前校验 & 标记
  89. */
  90. private async beforeExecute(task: Task): Promise<void> {
  91. if (task.status !== TaskStatus.SENDING) {
  92. throw new Error(`Task ${task.id} status invalid: ${task.status}`)
  93. }
  94. if (task.cancelRequested) {
  95. await this.taskRepo.update(task.id, {
  96. status: TaskStatus.CANCELED
  97. })
  98. throw new Error(`Task ${task.id} canceled before execution`)
  99. }
  100. }
  101. /**
  102. * 核心发送逻辑(并发 worker)
  103. */
  104. private async process(task: Task): Promise<void> {
  105. const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
  106. const workers: Promise<void>[] = []
  107. for (let i = 0; i < concurrency; i++) {
  108. workers.push(this.workerLoop(task.id))
  109. }
  110. await Promise.all(workers)
  111. }
  112. /**
  113. * 单 worker 循环
  114. */
  115. private async workerLoop(taskId: number): Promise<void> {
  116. let tgUser: TgUser | null = null
  117. const workerTgClient = new TgClientManager()
  118. let accountUsageInRound = 0
  119. // 仅用于 INVITE_TO_GROUP:缓存当前账号已加入的群实体,避免每条都重复解析/入群
  120. let inviteGroupEntity: any | null = null
  121. try {
  122. while (true) {
  123. const task = await this.taskRepo.findOneBy({ id: taskId })
  124. if (!task) return
  125. if (task.cancelRequested) {
  126. if (task.status === TaskStatus.PAUSED) {
  127. throw new Error('TASK_PAUSED')
  128. }
  129. throw new Error('TASK_CANCELED')
  130. }
  131. if (task.status !== TaskStatus.SENDING) {
  132. return
  133. }
  134. const taskItem = await this.pickNextTaskItem(taskId)
  135. if (!taskItem) {
  136. return
  137. }
  138. // tgUser 轮换逻辑
  139. if (!tgUser || accountUsageInRound >= this.currentAccountLimit) {
  140. // 换号前:若当前 tgUser 已加入群聊,则先退出群聊再断开
  141. await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
  142. await workerTgClient.disconnect()
  143. tgUser = await this.pickAccount()
  144. const sessionString = await this.ensureSessionString(tgUser)
  145. try {
  146. await workerTgClient.connect(sessionString)
  147. } catch (error) {
  148. const msg = error instanceof Error ? error.message : String(error)
  149. // 会话失效:直接移除该账号
  150. if (this.isSessionRevokedMessage(msg)) {
  151. await this.handleSessionRevoked(tgUser)
  152. tgUser = null
  153. continue
  154. }
  155. // 连接失败:不让错误冒泡导致任务被调度器判死;先排除此账号,换号继续
  156. this.app.log.warn(
  157. { taskId, sender: tgUser.id, err: msg },
  158. 'TelegramClient connect failed, rotate account'
  159. )
  160. this.accountExcludedInBatch.add(tgUser.id)
  161. tgUser = null
  162. accountUsageInRound = 0
  163. inviteGroupEntity = null
  164. await workerTgClient.disconnect().catch(() => {})
  165. continue
  166. }
  167. accountUsageInRound = 0
  168. inviteGroupEntity = null
  169. }
  170. // 延迟
  171. const delaySeconds = this.getRandomDelaySeconds()
  172. this.app.log.info(`延迟 ${delaySeconds}s 后开始处理任务`)
  173. await this.sleep(delaySeconds * 1000)
  174. try {
  175. const result = await this.processTaskItem(task, taskItem, tgUser, workerTgClient, inviteGroupEntity)
  176. // 更新缓存(仅 INVITE_TO_GROUP 会返回)
  177. if (result?.inviteGroupEntity !== undefined) {
  178. inviteGroupEntity = result.inviteGroupEntity
  179. }
  180. // 邀请失败:按你的预期,立即换一个 tgUser 继续流程
  181. if (result?.rotateAccount) {
  182. this.app.log.info(
  183. { taskId, itemId: taskItem.id, sender: tgUser.id, reason: result.reason ?? 'rotate' },
  184. 'rotate account due to task item failure'
  185. )
  186. this.accountExcludedInBatch.add(tgUser.id)
  187. // 换号前:退出群聊(如果已加入)
  188. await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
  189. await workerTgClient.disconnect().catch(() => {})
  190. tgUser = null
  191. accountUsageInRound = 0
  192. inviteGroupEntity = null
  193. }
  194. } catch (error) {
  195. // 兜底:理论上 processTaskItem 不应抛错;如果抛错,也不要影响整任务
  196. const msg = error instanceof Error ? error.message : '未知错误'
  197. this.app.log.error({ taskId, itemId: taskItem.id, sender: tgUser?.id, err: msg }, 'processTaskItem crashed')
  198. if (tgUser && this.isSessionRevokedMessage(msg)) {
  199. await this.handleSessionRevoked(tgUser)
  200. } else if (tgUser) {
  201. this.accountExcludedInBatch.add(tgUser.id)
  202. }
  203. await workerTgClient.disconnect().catch(() => {})
  204. tgUser = null
  205. accountUsageInRound = 0
  206. inviteGroupEntity = null
  207. } finally {
  208. accountUsageInRound++
  209. if (tgUser) {
  210. const used = (this.accountUsageInBatch.get(tgUser.id) ?? 0) + 1
  211. this.accountUsageInBatch.set(tgUser.id, used)
  212. }
  213. }
  214. // 处理间隔(每条随机)
  215. const intervalMs = this.pickIntervalMs(task.intervalTime)
  216. if (intervalMs > 0) {
  217. await this.sleep(intervalMs)
  218. }
  219. }
  220. } finally {
  221. // worker 结束前:尽量退出群聊,避免账号一直挂在群里
  222. await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
  223. await workerTgClient.disconnect()
  224. }
  225. }
  226. /**
  227. * 拉取一个待发送的 TaskItem(DB 层保证并发安全)
  228. */
  229. private async pickNextTaskItem(taskId: number): Promise<TaskItem | null> {
  230. const queryRunner = this.ds.createQueryRunner()
  231. await queryRunner.connect()
  232. await queryRunner.startTransaction()
  233. try {
  234. const repo = queryRunner.manager.getRepository(TaskItem)
  235. const item = await repo
  236. .createQueryBuilder('item')
  237. .setLock('pessimistic_write')
  238. .where('item.taskId = :taskId', { taskId })
  239. .andWhere('item.status = :status', { status: TaskItemStatus.PENDING })
  240. .orderBy('item.id', 'ASC')
  241. .getOne()
  242. if (!item) {
  243. await queryRunner.commitTransaction()
  244. return null
  245. }
  246. await repo.update(item.id, {
  247. status: TaskItemStatus.PROCESSING,
  248. operatingAt: new Date(),
  249. errorMsg: null
  250. })
  251. await queryRunner.commitTransaction()
  252. return { ...item, status: TaskItemStatus.PROCESSING }
  253. } catch (err) {
  254. await queryRunner.rollbackTransaction()
  255. throw err
  256. } finally {
  257. await queryRunner.release()
  258. }
  259. }
  260. /**
  261. * 处理单个 TaskItem(按 Task.type 分发)
  262. */
  263. private async processTaskItem(
  264. task: Task,
  265. item: TaskItem,
  266. sender: TgUser,
  267. workerTgClient: TgClientManager,
  268. inviteGroupEntity: any | null
  269. ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
  270. if (task.type === TaskType.INVITE_TO_GROUP) {
  271. return await this.processInviteToGroup(task, item, sender, workerTgClient, inviteGroupEntity)
  272. }
  273. return await this.processSendMessage(task, item, sender, workerTgClient)
  274. }
  275. private async processSendMessage(
  276. task: Task,
  277. item: TaskItem,
  278. sender: TgUser,
  279. workerTgClient: TgClientManager
  280. ): Promise<{ rotateAccount?: boolean; reason?: string } | void> {
  281. const message = String(task.payload?.message ?? '').trim()
  282. if (!message) {
  283. await this.taskItemRepo.update(item.id, {
  284. status: TaskItemStatus.FAILED,
  285. operatingAt: new Date(),
  286. operationId: sender.id,
  287. errorMsg: '消息内容为空'
  288. })
  289. await this.taskRepo.increment({ id: task.id }, 'processed', 1)
  290. return
  291. }
  292. try {
  293. const parsedTarget = this.parseTarget(item.target)
  294. if (!parsedTarget) {
  295. throw new Error('target 格式错误,请检查是否正确')
  296. }
  297. const targetPeer = await workerTgClient.getTargetPeer(parsedTarget)
  298. if (!targetPeer) {
  299. throw new Error('target 无效,无法获取目标信息')
  300. }
  301. const canSendMessage = await this.checkCanSendMessage(workerTgClient.getClient()!, targetPeer)
  302. if (!canSendMessage) {
  303. throw new Error('目标用户不允许接收消息或已被限制')
  304. }
  305. await workerTgClient.sendMessageToPeer(targetPeer, message)
  306. await workerTgClient.clearConversation(targetPeer).catch(() => {})
  307. await workerTgClient.deleteTempContact((targetPeer as any).id).catch(() => {})
  308. await this.taskItemRepo.update(item.id, {
  309. status: TaskItemStatus.SUCCESS,
  310. operatingAt: new Date(),
  311. operationId: sender.id,
  312. errorMsg: null
  313. })
  314. await this.senderService.incrementUsageCount(sender.id)
  315. await this.taskRepo.increment({ id: task.id }, 'processed', 1)
  316. await this.taskRepo.increment({ id: task.id }, 'success', 1)
  317. this.app.log.info(`✅ 发送成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
  318. } catch (error) {
  319. const msg = error instanceof Error ? error.message : '未知错误'
  320. await this.taskItemRepo.update(item.id, {
  321. status: TaskItemStatus.FAILED,
  322. operatingAt: new Date(),
  323. operationId: sender.id,
  324. errorMsg: msg
  325. })
  326. await this.senderService.incrementUsageCount(sender.id)
  327. await this.taskRepo.increment({ id: task.id }, 'processed', 1)
  328. this.app.log.warn(`❌ 发送失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
  329. // SEND_MESSAGE 默认不强制换号(避免快速耗尽账号池);如需也换号,可在此处返回 rotateAccount: true
  330. return { rotateAccount: false, reason: msg }
  331. }
  332. }
  333. private async processInviteToGroup(
  334. task: Task,
  335. item: TaskItem,
  336. sender: TgUser,
  337. workerTgClient: TgClientManager,
  338. inviteGroupEntity: any | null
  339. ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
  340. try {
  341. const inviteLink = String(task.payload?.inviteLink ?? '').trim()
  342. if (!inviteLink) {
  343. throw new Error('群拉人任务:邀请链接为空,请检查任务配置信息')
  344. }
  345. const parsedTarget = this.parseTarget(item.target)
  346. if (!parsedTarget) {
  347. throw new Error('target 格式错误,请检查是否正确')
  348. }
  349. const targetUser = await workerTgClient.getTargetPeer(parsedTarget)
  350. if (!targetUser) {
  351. throw new Error('target 无效,无法获取目标信息')
  352. }
  353. const client = workerTgClient.getClient()
  354. if (!client) {
  355. throw new Error('TelegramClient 未连接')
  356. }
  357. // tgUser 加入群组,获取群组实体(每个账号缓存一次)
  358. let groupEntity = inviteGroupEntity
  359. if (!groupEntity) {
  360. groupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
  361. }
  362. if (!groupEntity) {
  363. throw new Error('群拉人任务:未获取到群组实体(inviteGroupEntity 为空)')
  364. }
  365. const chatId = groupEntity.chatId ?? groupEntity.id
  366. const accessHash = groupEntity.accessHash
  367. if (chatId === undefined || chatId === null || accessHash === undefined || accessHash === null) {
  368. throw new Error('群拉人任务:群组实体缺少 id/chatId/accessHash(请检查 resolveGroupEntityByInviteLink 返回值)')
  369. }
  370. const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
  371. await workerTgClient.inviteMembersToChannelGroup(inputChannel, [targetUser])
  372. await this.taskItemRepo.update(item.id, {
  373. status: TaskItemStatus.SUCCESS,
  374. operatingAt: new Date(),
  375. operationId: sender.id,
  376. errorMsg: null
  377. })
  378. await this.senderService.incrementUsageCount(sender.id)
  379. await this.taskRepo.increment({ id: task.id }, 'processed', 1)
  380. await this.taskRepo.increment({ id: task.id }, 'success', 1)
  381. this.app.log.info(`✅ 邀请成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
  382. return { rotateAccount: false, inviteGroupEntity: groupEntity }
  383. } catch (error) {
  384. const msg = error instanceof Error ? error.message : '未知错误'
  385. // 已在群内:计为成功
  386. if (msg.includes('USER_ALREADY_PARTICIPANT')) {
  387. await this.taskItemRepo.update(item.id, {
  388. status: TaskItemStatus.SUCCESS,
  389. operatingAt: new Date(),
  390. operationId: sender.id,
  391. errorMsg: null
  392. })
  393. await this.taskRepo.increment({ id: task.id }, 'processed', 1)
  394. await this.taskRepo.increment({ id: task.id }, 'success', 1)
  395. this.app.log.info(`ℹ️ 成员已在群组中 taskId=${task.id}, itemId=${item.id}, target=${item.target}`)
  396. return { rotateAccount: false }
  397. }
  398. await this.taskItemRepo.update(item.id, {
  399. status: TaskItemStatus.FAILED,
  400. operatingAt: new Date(),
  401. operationId: sender.id,
  402. errorMsg: msg
  403. })
  404. await this.senderService.incrementUsageCount(sender.id)
  405. await this.taskRepo.increment({ id: task.id }, 'processed', 1)
  406. this.app.log.warn(`❌ 邀请失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
  407. // INVITE_TO_GROUP:按需求,失败就换号继续(避免单号被冻结/受限导致整体停滞)
  408. return { rotateAccount: true, reason: msg }
  409. }
  410. }
  411. /**
  412. * 换号/断开前:让当前账号退出已加入的群聊
  413. * - 仅对 INVITE_TO_GROUP 有意义;其它任务 inviteGroupEntity 为空会直接跳过
  414. * - 不抛错:避免退群失败影响整体任务流程
  415. */
  416. private async safeLeaveInviteGroup(workerTgClient: TgClientManager, inviteGroupEntity: any | null): Promise<void> {
  417. if (!inviteGroupEntity) return
  418. const chatId = inviteGroupEntity.chatId ?? inviteGroupEntity.id
  419. const accessHash = inviteGroupEntity.accessHash
  420. if (chatId === undefined || chatId === null || accessHash === undefined || accessHash === null) return
  421. try {
  422. const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
  423. await workerTgClient.leaveGroup(inputChannel)
  424. } catch {
  425. // 忽略退群异常(可能已不在群、权限问题等)
  426. }
  427. }
  428. /**
  429. * 收尾逻辑
  430. */
  431. private async finalize(taskId: number): Promise<void> {
  432. const remain = await this.taskItemRepo
  433. .createQueryBuilder('item')
  434. .where('item.taskId = :taskId', { taskId })
  435. .andWhere('item.status IN (:...statuses)', { statuses: [TaskItemStatus.PENDING, TaskItemStatus.PROCESSING] })
  436. .getCount()
  437. if (remain > 0) {
  438. return
  439. }
  440. const task = await this.taskRepo.findOneBy({ id: taskId })
  441. if (!task) return
  442. if (task.cancelRequested) {
  443. await this.taskRepo.update(taskId, {
  444. status: TaskStatus.CANCELED
  445. })
  446. return
  447. }
  448. await this.taskRepo.update(taskId, {
  449. status: TaskStatus.COMPLETED
  450. })
  451. }
  452. private pickIntervalMs(intervalTime?: string | null): number {
  453. const raw = String(intervalTime ?? '').trim()
  454. if (!raw) return 0
  455. const normalized = raw.replace('~', '-').replace(',', '-').replace(/\s+/g, '-')
  456. const parts = normalized.split('-').filter(Boolean)
  457. let minSec: number
  458. let maxSec: number
  459. if (parts.length === 1) {
  460. minSec = Number(parts[0])
  461. maxSec = Number(parts[0])
  462. } else {
  463. minSec = Number(parts[0])
  464. maxSec = Number(parts[1])
  465. }
  466. if (Number.isNaN(minSec) || Number.isNaN(maxSec)) return 0
  467. if (minSec < 0 || maxSec < 0) return 0
  468. if (maxSec < minSec) [minSec, maxSec] = [maxSec, minSec]
  469. const sec = minSec === maxSec ? minSec : Math.floor(Math.random() * (maxSec - minSec + 1)) + minSec
  470. return sec * 1000
  471. }
  472. /**
  473. * 刷新 tgUser 账号缓存
  474. */
  475. private async refreshAccountCache(): Promise<void> {
  476. this.accountCache = await this.senderRepo.find({
  477. where: { delFlag: false },
  478. order: { lastUsageTime: 'ASC', usageCount: 'ASC' },
  479. take: this.currentAccountCacheTake
  480. })
  481. this.accountCursor = 0
  482. }
  483. /**
  484. * 选择可用的tgUser 账号
  485. */
  486. private async pickAccount(): Promise<TgUser> {
  487. if (this.accountCache.length === 0) {
  488. this.accountCache = await this.senderRepo.find({
  489. where: { delFlag: false },
  490. order: { lastUsageTime: 'ASC', usageCount: 'ASC' },
  491. take: this.currentAccountCacheTake
  492. })
  493. this.accountCursor = 0
  494. }
  495. if (this.accountCache.length === 0) {
  496. throw new Error('暂无可用 tgUser 账号')
  497. }
  498. const total = this.accountCache.length
  499. const tryPick = (): TgUser | null => {
  500. for (let i = 0; i < total; i++) {
  501. const index = (this.accountCursor + i) % total
  502. const account = this.accountCache[index]
  503. if (this.accountExcludedInBatch.has(account.id)) continue
  504. const used = this.accountUsageInBatch.get(account.id) ?? 0
  505. if (used < this.currentAccountLimit) {
  506. this.accountCursor = (index + 1) % total
  507. return account
  508. }
  509. }
  510. return null
  511. }
  512. const picked1 = tryPick()
  513. if (picked1) return picked1
  514. this.app.log.info('所有 tgUser 均已达到当前批次上限,重置计数后重新轮询')
  515. this.accountUsageInBatch.clear()
  516. this.accountCursor = 0
  517. const picked2 = tryPick()
  518. if (picked2) return picked2
  519. // 如果全部被排除,说明这一批账号都异常/受限:清空排除集再尝试一次
  520. if (this.accountExcludedInBatch.size > 0) {
  521. this.app.log.warn('本批次所有 tgUser 均被排除,清空排除列表后重试')
  522. this.accountExcludedInBatch.clear()
  523. const picked3 = tryPick()
  524. if (picked3) return picked3
  525. }
  526. // 兜底:返回第一个,避免直接抛错导致任务中断(但大概率会在连接失败后再次被排除并轮换)
  527. return this.accountCache[0]
  528. }
  529. /**
  530. * 确保发送者有有效的会话字符串
  531. */
  532. private async ensureSessionString(tgUser: TgUser): Promise<string> {
  533. if (tgUser.sessionStr) {
  534. return tgUser.sessionStr
  535. }
  536. if (tgUser.dcId && tgUser.authKey) {
  537. const session = buildStringSessionByDcIdAndAuthKey(tgUser.dcId, tgUser.authKey)
  538. await this.senderRepo.update(tgUser.id, { sessionStr: session })
  539. return session
  540. }
  541. throw new Error(`tgUser=${tgUser.id} 缺少 session 信息`)
  542. }
  543. /**
  544. * 处理会话被撤销的情况
  545. */
  546. private async handleSessionRevoked(tgUser: TgUser): Promise<void> {
  547. await this.senderRepo.update(tgUser.id, { delFlag: true })
  548. this.accountCache = this.accountCache.filter(a => a.id !== tgUser.id)
  549. this.accountCursor = 0
  550. this.app.log.warn(`tgUser=${tgUser.id} session 失效,已删除`)
  551. }
  552. /**
  553. * 检查是否可以发送消息给目标用户
  554. */
  555. private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
  556. try {
  557. const fullUser = await client.invoke(
  558. new Api.users.GetFullUser({
  559. id: targetPeer
  560. })
  561. )
  562. const fullUserData = fullUser.fullUser as any
  563. if (fullUserData?.blocked) {
  564. return false
  565. }
  566. if (targetPeer.bot && targetPeer.botChatHistory === false) {
  567. return false
  568. }
  569. if (targetPeer.deleted) {
  570. return false
  571. }
  572. if (targetPeer.fake || targetPeer.scam) {
  573. return false
  574. }
  575. return true
  576. } catch (error) {
  577. const errorMessage = error instanceof Error ? error.message : '未知错误'
  578. if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
  579. throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
  580. }
  581. if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
  582. return false
  583. }
  584. return true
  585. }
  586. }
  587. /**
  588. * 解析目标标识符
  589. */
  590. private parseTarget(targetId: string): string | number | null {
  591. const trimmed = targetId.trim()
  592. // 用户名 手机号
  593. if (trimmed.startsWith('@') || trimmed.startsWith('+')) {
  594. return trimmed
  595. }
  596. // 手机号 不带+号,使用正则
  597. const phoneRegex = /^\d+$/
  598. if (phoneRegex.test(trimmed)) {
  599. return `+${trimmed}`
  600. }
  601. // 用户 id
  602. const integerRegex = /^-?\d+$/
  603. if (integerRegex.test(trimmed)) {
  604. return Number(trimmed)
  605. }
  606. return null
  607. }
  608. /**
  609. * 获取随机延迟秒数
  610. */
  611. private getRandomDelaySeconds(min: number = 5, max: number = 10): number {
  612. return Math.floor(Math.random() * (max - min + 1)) + min
  613. }
  614. /**
  615. * 检查错误消息是否表示会话被撤销
  616. */
  617. private isSessionRevokedMessage(msg: string): boolean {
  618. return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
  619. }
  620. /**
  621. * 延迟指定毫秒数
  622. */
  623. private async sleep(ms: number): Promise<void> {
  624. return await new Promise(resolve => setTimeout(resolve, ms))
  625. }
  626. }