task.service.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. import { Repository } from 'typeorm'
  2. import { FastifyInstance } from 'fastify'
  3. import { Api, TelegramClient } from 'telegram'
  4. import { Task, TaskStatus } from '../entities/task.entity'
  5. import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
  6. import { PaginationResponse } from '../dto/common.dto'
  7. import { Sender } from '../entities/sender.entity'
  8. import { SenderService } from './sender.service'
  9. import { TgClientService } from './tgClient.service'
  10. import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
  11. export class TaskService {
  12. private taskRepository: Repository<Task>
  13. private taskItemRepository: Repository<TaskItem>
  14. private app: FastifyInstance
  15. private processing = false
  16. private static schedulerStarted = false
  17. private senderRepository: Repository<Sender>
  18. private senderService: SenderService
  19. private tgClientService: TgClientService
  20. private readonly senderSendLimit = 5
  21. private senderUsageInBatch: Map<string, number> = new Map()
  22. private senderCursor = 0
  23. private senderCache: Sender[] = []
  24. private readonly pollIntervalMs = 5000
  25. private readonly taskBatchSize = 50
  26. private readonly instanceId = `${process.pid}-${Math.random().toString(36).slice(2, 8)}`
  27. constructor(app: FastifyInstance) {
  28. this.app = app
  29. this.taskRepository = app.dataSource.getRepository(Task)
  30. this.taskItemRepository = app.dataSource.getRepository(TaskItem)
  31. this.senderRepository = app.dataSource.getRepository(Sender)
  32. this.tgClientService = TgClientService.getInstance()
  33. this.senderService = new SenderService(app)
  34. this.app.addHook('onReady', async () => {
  35. this.scheduleTaskSend()
  36. })
  37. }
  38. async create(data: { name: string; message: string; userId: number; buffer: Buffer }): Promise<Task> {
  39. const task = this.taskRepository.create({
  40. name: data.name,
  41. message: data.message,
  42. userId: data.userId
  43. })
  44. const savedTask = await this.taskRepository.save(task)
  45. const total = await this.createTaskItemByBuffer({ taskId: savedTask.id, buffer: data.buffer })
  46. await this.taskRepository.update(savedTask.id, { total })
  47. return await this.taskRepository.findOneOrFail({ where: { id: savedTask.id } })
  48. }
  49. async findById(id: number): Promise<Task> {
  50. return await this.taskRepository.findOneOrFail({ where: { id, delFlag: false } })
  51. }
  52. async findAll(page: number = 0, size: number = 20, userId?: number): Promise<PaginationResponse<Task>> {
  53. const where = userId ? { userId } : {}
  54. const [tasks, total] = await this.taskRepository.findAndCount({
  55. where,
  56. skip: (Number(page) || 0) * (Number(size) || 20),
  57. take: Number(size) || 20,
  58. order: {
  59. createdAt: 'DESC'
  60. }
  61. })
  62. return {
  63. content: tasks,
  64. metadata: {
  65. total: Number(total),
  66. page: Number(page) || 0,
  67. size: Number(size) || 20
  68. }
  69. }
  70. }
  71. async update(id: number, data: Partial<Task>): Promise<void> {
  72. await this.taskRepository.update(id, data)
  73. }
  74. async delete(id: number): Promise<void> {
  75. await this.taskRepository.update(id, { delFlag: true })
  76. }
  77. async createTaskItemByBuffer(data: { taskId: number; buffer: Buffer }): Promise<number> {
  78. const content = data.buffer.toString('utf-8')
  79. const lines = content.split('\n').filter(line => line.trim())
  80. if (lines.length === 0) {
  81. return 0
  82. }
  83. const taskItems = lines.map(line =>
  84. this.taskItemRepository.create({
  85. taskId: data.taskId,
  86. target: line.trim(),
  87. status: TaskItemStatus.PENDING
  88. })
  89. )
  90. await this.taskItemRepository.save(taskItems)
  91. return taskItems.length
  92. }
  93. async findTaskItems(
  94. page: number = 0,
  95. size: number = 20,
  96. taskId?: number,
  97. status?: string
  98. ): Promise<PaginationResponse<TaskItem>> {
  99. const where: any = {}
  100. if (taskId) {
  101. where.taskId = taskId
  102. }
  103. if (status) {
  104. where.status = status
  105. }
  106. const [taskItems, total] = await this.taskItemRepository.findAndCount({
  107. where,
  108. skip: (Number(page) || 0) * (Number(size) || 20),
  109. take: Number(size) || 20,
  110. order: {
  111. createdAt: 'DESC'
  112. }
  113. })
  114. return {
  115. content: taskItems,
  116. metadata: {
  117. total: Number(total),
  118. page: Number(page) || 0,
  119. size: Number(size) || 20
  120. }
  121. }
  122. }
  123. async startTask(id: number): Promise<void> {
  124. const task = await this.findById(id)
  125. if (!task) {
  126. throw new Error('任务不存在')
  127. }
  128. if (task.delFlag) {
  129. throw new Error('任务已被删除')
  130. }
  131. if (![TaskStatus.PENDING, TaskStatus.PAUSED].includes(task.status as TaskStatus)) {
  132. throw new Error('当前状态不可启动')
  133. }
  134. await this.taskRepository.update(id, {
  135. status: TaskStatus.SENDING,
  136. startedAt: task.startedAt ?? new Date()
  137. })
  138. }
  139. async pauseTask(id: number): Promise<void> {
  140. const task = await this.findById(id)
  141. if (!task) {
  142. throw new Error('任务不存在')
  143. }
  144. if (task.delFlag) {
  145. throw new Error('任务已被删除')
  146. }
  147. if (task.status !== TaskStatus.SENDING) {
  148. throw new Error('仅发送中的任务可暂停')
  149. }
  150. await this.taskRepository.update(id, { status: TaskStatus.PAUSED })
  151. }
  152. private scheduleTaskSend() {
  153. if (TaskService.schedulerStarted) {
  154. return
  155. }
  156. const interval = setInterval(() => void this.taskSendCycle(), this.pollIntervalMs)
  157. TaskService.schedulerStarted = true
  158. this.app.addHook('onClose', async () => {
  159. clearInterval(interval)
  160. TaskService.schedulerStarted = false
  161. })
  162. this.app.log.info(
  163. `任务发送轮询已启动,间隔=${this.pollIntervalMs}ms,实例=${this.instanceId}, 批次=${this.taskBatchSize}`
  164. )
  165. }
  166. private async taskSendCycle() {
  167. if (this.processing) {
  168. return
  169. }
  170. this.processing = true
  171. try {
  172. await this.startTaskSend()
  173. } catch (error) {
  174. const msg = error instanceof Error ? `${error.message}; stack=${error.stack ?? 'no stack'}` : '未知错误'
  175. this.app.log.error(`处理发送任务失败: ${msg}`)
  176. } finally {
  177. this.processing = false
  178. }
  179. }
  180. private async startTaskSend() {
  181. const task = await this.taskRepository.findOne({
  182. where: { status: TaskStatus.SENDING, delFlag: false },
  183. order: { startedAt: 'ASC', id: 'ASC' }
  184. })
  185. if (!task) {
  186. return
  187. }
  188. const pendingItems = await this.taskItemRepository.find({
  189. where: { taskId: task.id, status: TaskItemStatus.PENDING },
  190. order: { id: 'ASC' },
  191. take: this.taskBatchSize
  192. })
  193. if (pendingItems.length === 0) {
  194. await this.finalizeTaskIfDone(task.id)
  195. return
  196. }
  197. let batchSent = 0
  198. let batchSuccess = 0
  199. let batchFailed = 0
  200. for (const item of pendingItems) {
  201. const current = await this.taskRepository.findOne({ where: { id: task.id } })
  202. if (!current || current.status !== TaskStatus.SENDING) {
  203. this.app.log.info(`任务 ${task.id} 已暂停或停止,终止本批次发送`)
  204. break
  205. }
  206. try {
  207. await this.sendTaskItem(task, item)
  208. batchSuccess++
  209. } catch (error) {
  210. const msg = error instanceof Error ? error.message : '未知错误'
  211. await this.taskItemRepository.update(item.id, {
  212. status: TaskItemStatus.FAILED,
  213. sentAt: new Date()
  214. })
  215. batchFailed++
  216. this.app.log.warn(`发送失败 taskId=${task.id}, item=${item.id}: ${msg}`)
  217. }
  218. batchSent++
  219. }
  220. if (batchSent > 0) {
  221. await this.taskRepository.increment({ id: task.id }, 'sent', batchSent)
  222. }
  223. if (batchSuccess > 0) {
  224. await this.taskRepository.increment({ id: task.id }, 'successCount', batchSuccess)
  225. }
  226. if (batchSent < pendingItems.length) {
  227. return
  228. }
  229. await this.finalizeTaskIfDone(task.id)
  230. }
  231. private async sendTaskItem(task: Task, taskItem: TaskItem): Promise<void> {
  232. const sender = await this.pickSender()
  233. const sessionString = await this.ensureSessionString(sender)
  234. let client: TelegramClient | null = null
  235. try {
  236. client = await this.tgClientService.connect(sessionString)
  237. const parsedTarget = this.parseTarget(taskItem.target)
  238. if (!parsedTarget) {
  239. throw new Error('target 格式错误,请检查是否正确')
  240. }
  241. const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
  242. if (!targetPeer) {
  243. throw new Error('target 无效,无法获取目标信息')
  244. }
  245. const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
  246. if (!canSendMessage) {
  247. throw new Error('目标用户不允许接收消息或已被限制')
  248. }
  249. await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
  250. try {
  251. await this.tgClientService.clearConversation(client, targetPeer)
  252. } catch (clearError) {
  253. const msg = clearError instanceof Error ? clearError.message : '未知错误'
  254. this.app.log.warn(`清除会话失败 [${taskItem.target}]: ${msg}`)
  255. }
  256. try {
  257. await this.tgClientService.deleteTempContact(client, (targetPeer as any).id)
  258. } catch (deleteError) {
  259. const msg = deleteError instanceof Error ? deleteError.message : '未知错误'
  260. this.app.log.warn(`删除临时联系人失败 [${taskItem.target}]: ${msg}`)
  261. }
  262. await this.taskItemRepository.update(taskItem.id, {
  263. status: TaskItemStatus.SUCCESS,
  264. sentAt: new Date()
  265. })
  266. await this.senderService.incrementUsageCount(sender.id)
  267. const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
  268. this.senderUsageInBatch.set(sender.id, used)
  269. if (used >= this.senderSendLimit) {
  270. this.app.log.info(`sender=${sender.id} 已达单次发送上限 ${this.senderSendLimit},切换下一个账号`)
  271. await this.tgClientService.disconnect()
  272. }
  273. } catch (error) {
  274. if (client) {
  275. try {
  276. await this.tgClientService.disconnect()
  277. } catch (disconnectError) {
  278. const msg = disconnectError instanceof Error ? disconnectError.message : '未知错误'
  279. this.app.log.warn(`断开连接失败: ${msg}`)
  280. }
  281. }
  282. throw error
  283. }
  284. }
  285. private async finalizeTaskIfDone(taskId: number): Promise<void> {
  286. const pendingCount = await this.taskItemRepository.count({
  287. where: { taskId, status: TaskItemStatus.PENDING }
  288. })
  289. if (pendingCount > 0) {
  290. return
  291. }
  292. const successCount = await this.taskItemRepository.count({
  293. where: { taskId, status: TaskItemStatus.SUCCESS }
  294. })
  295. const failedCount = await this.taskItemRepository.count({
  296. where: { taskId, status: TaskItemStatus.FAILED }
  297. })
  298. await this.taskRepository.update(taskId, {
  299. status: TaskStatus.COMPLETED,
  300. sent: successCount + failedCount,
  301. successCount
  302. })
  303. }
  304. private async pickSender(): Promise<Sender> {
  305. if (this.senderCache.length === 0) {
  306. this.senderCache = await this.senderRepository.find({
  307. where: { delFlag: false },
  308. order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
  309. })
  310. this.senderCursor = 0
  311. }
  312. if (this.senderCache.length === 0) {
  313. throw new Error('暂无可用 sender 账号')
  314. }
  315. const total = this.senderCache.length
  316. for (let i = 0; i < total; i++) {
  317. const index = (this.senderCursor + i) % total
  318. const sender = this.senderCache[index]
  319. const used = this.senderUsageInBatch.get(sender.id) ?? 0
  320. if (used < this.senderSendLimit) {
  321. this.senderCursor = (index + 1) % total
  322. return sender
  323. }
  324. }
  325. // 所有 sender 均已达到当前批次上限,重置计数重新分配
  326. this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
  327. this.senderUsageInBatch.clear()
  328. this.senderCursor = 0
  329. return this.senderCache[0]
  330. }
  331. private async ensureSessionString(sender: Sender): Promise<string> {
  332. if (sender.sessionStr) {
  333. return sender.sessionStr
  334. }
  335. if (sender.dcId && sender.authKey) {
  336. const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
  337. await this.senderRepository.update(sender.id, { sessionStr: session })
  338. return session
  339. }
  340. throw new Error(`sender=${sender.id} 缺少 session 信息`)
  341. }
  342. private parseTarget(targetId: string): string | number | null {
  343. const trimmed = targetId.trim()
  344. if (trimmed.startsWith('@') || trimmed.startsWith('+')) {
  345. return trimmed
  346. }
  347. const phoneRegex = /^\d+$/
  348. if (phoneRegex.test(trimmed)) {
  349. return `+${trimmed}`
  350. }
  351. const integerRegex = /^-?\d+$/
  352. if (integerRegex.test(trimmed)) {
  353. return Number(trimmed)
  354. }
  355. return null
  356. }
  357. private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
  358. try {
  359. const fullUser = await client.invoke(
  360. new Api.users.GetFullUser({
  361. id: targetPeer
  362. })
  363. )
  364. const fullUserData = fullUser.fullUser as any
  365. if (fullUserData?.blocked) {
  366. return false
  367. }
  368. if (targetPeer.bot && targetPeer.botChatHistory === false) {
  369. return false
  370. }
  371. if (targetPeer.deleted) {
  372. return false
  373. }
  374. if (targetPeer.fake || targetPeer.scam) {
  375. return false
  376. }
  377. return true
  378. } catch (error) {
  379. const errorMessage = error instanceof Error ? error.message : '未知错误'
  380. if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
  381. throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
  382. }
  383. if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
  384. return false
  385. }
  386. return true
  387. }
  388. }
  389. }