| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456 |
- import { Repository } from 'typeorm'
- import { FastifyInstance } from 'fastify'
- import { Api, TelegramClient } from 'telegram'
- import { Task, TaskStatus } from '../entities/task.entity'
- import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
- import { PaginationResponse } from '../dto/common.dto'
- import { Sender } from '../entities/sender.entity'
- import { SenderService } from './sender.service'
- import { TgClientService } from './tgClient.service'
- import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
- export class TaskService {
- private taskRepository: Repository<Task>
- private taskItemRepository: Repository<TaskItem>
- private app: FastifyInstance
- private processing = false
- private static schedulerStarted = false
- private senderRepository: Repository<Sender>
- private senderService: SenderService
- private tgClientService: TgClientService
- private readonly senderSendLimit = 5
- private senderUsageInBatch: Map<string, number> = new Map()
- private senderCursor = 0
- private senderCache: Sender[] = []
- private readonly pollIntervalMs = 5000
- private readonly taskBatchSize = 50
- private readonly instanceId = `${process.pid}-${Math.random().toString(36).slice(2, 8)}`
- constructor(app: FastifyInstance) {
- this.app = app
- this.taskRepository = app.dataSource.getRepository(Task)
- this.taskItemRepository = app.dataSource.getRepository(TaskItem)
- this.senderRepository = app.dataSource.getRepository(Sender)
- this.tgClientService = TgClientService.getInstance()
- this.senderService = new SenderService(app)
- this.app.addHook('onReady', async () => {
- this.scheduleTaskSend()
- })
- }
- async create(data: { name: string; message: string; userId: number; buffer: Buffer }): Promise<Task> {
- const task = this.taskRepository.create({
- name: data.name,
- message: data.message,
- userId: data.userId
- })
- const savedTask = await this.taskRepository.save(task)
- const total = await this.createTaskItemByBuffer({ taskId: savedTask.id, buffer: data.buffer })
- await this.taskRepository.update(savedTask.id, { total })
- return await this.taskRepository.findOneOrFail({ where: { id: savedTask.id } })
- }
- async findById(id: number): Promise<Task> {
- return await this.taskRepository.findOneOrFail({ where: { id, delFlag: false } })
- }
- async findAll(page: number = 0, size: number = 20, userId?: number): Promise<PaginationResponse<Task>> {
- const where = userId ? { userId } : {}
- const [tasks, total] = await this.taskRepository.findAndCount({
- where,
- skip: (Number(page) || 0) * (Number(size) || 20),
- take: Number(size) || 20,
- order: {
- createdAt: 'DESC'
- }
- })
- return {
- content: tasks,
- metadata: {
- total: Number(total),
- page: Number(page) || 0,
- size: Number(size) || 20
- }
- }
- }
- async update(id: number, data: Partial<Task>): Promise<void> {
- await this.taskRepository.update(id, data)
- }
- async delete(id: number): Promise<void> {
- await this.taskRepository.update(id, { delFlag: true })
- }
- async createTaskItemByBuffer(data: { taskId: number; buffer: Buffer }): Promise<number> {
- const content = data.buffer.toString('utf-8')
- const lines = content.split('\n').filter(line => line.trim())
- if (lines.length === 0) {
- return 0
- }
- const taskItems = lines.map(line =>
- this.taskItemRepository.create({
- taskId: data.taskId,
- target: line.trim(),
- status: TaskItemStatus.PENDING
- })
- )
- await this.taskItemRepository.save(taskItems)
- return taskItems.length
- }
- async findTaskItems(
- page: number = 0,
- size: number = 20,
- taskId?: number,
- status?: string
- ): Promise<PaginationResponse<TaskItem>> {
- const where: any = {}
- if (taskId) {
- where.taskId = taskId
- }
- if (status) {
- where.status = status
- }
- const [taskItems, total] = await this.taskItemRepository.findAndCount({
- where,
- skip: (Number(page) || 0) * (Number(size) || 20),
- take: Number(size) || 20,
- order: {
- createdAt: 'DESC'
- }
- })
- return {
- content: taskItems,
- metadata: {
- total: Number(total),
- page: Number(page) || 0,
- size: Number(size) || 20
- }
- }
- }
- async startTask(id: number): Promise<void> {
- const task = await this.findById(id)
- if (!task) {
- throw new Error('任务不存在')
- }
- if (task.delFlag) {
- throw new Error('任务已被删除')
- }
- if (![TaskStatus.PENDING, TaskStatus.PAUSED].includes(task.status as TaskStatus)) {
- throw new Error('当前状态不可启动')
- }
- await this.taskRepository.update(id, {
- status: TaskStatus.SENDING,
- startedAt: task.startedAt ?? new Date()
- })
- }
- async pauseTask(id: number): Promise<void> {
- const task = await this.findById(id)
- if (!task) {
- throw new Error('任务不存在')
- }
- if (task.delFlag) {
- throw new Error('任务已被删除')
- }
- if (task.status !== TaskStatus.SENDING) {
- throw new Error('仅发送中的任务可暂停')
- }
- await this.taskRepository.update(id, { status: TaskStatus.PAUSED })
- }
- private scheduleTaskSend() {
- if (TaskService.schedulerStarted) {
- return
- }
- const interval = setInterval(() => void this.taskSendCycle(), this.pollIntervalMs)
- TaskService.schedulerStarted = true
- this.app.addHook('onClose', async () => {
- clearInterval(interval)
- TaskService.schedulerStarted = false
- })
- this.app.log.info(
- `任务发送轮询已启动,间隔=${this.pollIntervalMs}ms,实例=${this.instanceId}, 批次=${this.taskBatchSize}`
- )
- }
- private async taskSendCycle() {
- if (this.processing) {
- return
- }
- this.processing = true
- try {
- await this.startTaskSend()
- } catch (error) {
- const msg = error instanceof Error ? `${error.message}; stack=${error.stack ?? 'no stack'}` : '未知错误'
- this.app.log.error(`处理发送任务失败: ${msg}`)
- } finally {
- this.processing = false
- }
- }
- private async startTaskSend() {
- const task = await this.taskRepository.findOne({
- where: { status: TaskStatus.SENDING, delFlag: false },
- order: { startedAt: 'ASC', id: 'ASC' }
- })
- if (!task) {
- return
- }
- const pendingItems = await this.taskItemRepository.find({
- where: { taskId: task.id, status: TaskItemStatus.PENDING },
- order: { id: 'ASC' },
- take: this.taskBatchSize
- })
- if (pendingItems.length === 0) {
- await this.finalizeTaskIfDone(task.id)
- return
- }
- let batchSent = 0
- let batchSuccess = 0
- let batchFailed = 0
- for (const item of pendingItems) {
- const current = await this.taskRepository.findOne({ where: { id: task.id } })
- if (!current || current.status !== TaskStatus.SENDING) {
- this.app.log.info(`任务 ${task.id} 已暂停或停止,终止本批次发送`)
- break
- }
- try {
- await this.sendTaskItem(task, item)
- batchSuccess++
- } catch (error) {
- const msg = error instanceof Error ? error.message : '未知错误'
- await this.taskItemRepository.update(item.id, {
- status: TaskItemStatus.FAILED,
- sentAt: new Date()
- })
- batchFailed++
- this.app.log.warn(`发送失败 taskId=${task.id}, item=${item.id}: ${msg}`)
- }
- batchSent++
- }
- if (batchSent > 0) {
- await this.taskRepository.increment({ id: task.id }, 'sent', batchSent)
- }
- if (batchSuccess > 0) {
- await this.taskRepository.increment({ id: task.id }, 'successCount', batchSuccess)
- }
- if (batchSent < pendingItems.length) {
- return
- }
- await this.finalizeTaskIfDone(task.id)
- }
- private async sendTaskItem(task: Task, taskItem: TaskItem): Promise<void> {
- const sender = await this.pickSender()
- const sessionString = await this.ensureSessionString(sender)
- let client: TelegramClient | null = null
- try {
- client = await this.tgClientService.connect(sessionString)
- const parsedTarget = this.parseTarget(taskItem.target)
- if (!parsedTarget) {
- throw new Error('target 格式错误,请检查是否正确')
- }
- const targetPeer = await this.tgClientService.getTargetPeer(client, parsedTarget)
- if (!targetPeer) {
- throw new Error('target 无效,无法获取目标信息')
- }
- const canSendMessage = await this.checkCanSendMessage(client, targetPeer)
- if (!canSendMessage) {
- throw new Error('目标用户不允许接收消息或已被限制')
- }
- await this.tgClientService.sendMessageToPeer(client, targetPeer, task.message)
- try {
- await this.tgClientService.clearConversation(client, targetPeer)
- } catch (clearError) {
- const msg = clearError instanceof Error ? clearError.message : '未知错误'
- this.app.log.warn(`清除会话失败 [${taskItem.target}]: ${msg}`)
- }
- try {
- await this.tgClientService.deleteTempContact(client, (targetPeer as any).id)
- } catch (deleteError) {
- const msg = deleteError instanceof Error ? deleteError.message : '未知错误'
- this.app.log.warn(`删除临时联系人失败 [${taskItem.target}]: ${msg}`)
- }
- await this.taskItemRepository.update(taskItem.id, {
- status: TaskItemStatus.SUCCESS,
- sentAt: new Date()
- })
- await this.senderService.incrementUsageCount(sender.id)
- const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
- this.senderUsageInBatch.set(sender.id, used)
- if (used >= this.senderSendLimit) {
- this.app.log.info(`sender=${sender.id} 已达单次发送上限 ${this.senderSendLimit},切换下一个账号`)
- await this.tgClientService.disconnect()
- }
- } catch (error) {
- if (client) {
- try {
- await this.tgClientService.disconnect()
- } catch (disconnectError) {
- const msg = disconnectError instanceof Error ? disconnectError.message : '未知错误'
- this.app.log.warn(`断开连接失败: ${msg}`)
- }
- }
- throw error
- }
- }
- private async finalizeTaskIfDone(taskId: number): Promise<void> {
- const pendingCount = await this.taskItemRepository.count({
- where: { taskId, status: TaskItemStatus.PENDING }
- })
- if (pendingCount > 0) {
- return
- }
- const successCount = await this.taskItemRepository.count({
- where: { taskId, status: TaskItemStatus.SUCCESS }
- })
- const failedCount = await this.taskItemRepository.count({
- where: { taskId, status: TaskItemStatus.FAILED }
- })
- await this.taskRepository.update(taskId, {
- status: TaskStatus.COMPLETED,
- sent: successCount + failedCount,
- successCount
- })
- }
- private async pickSender(): Promise<Sender> {
- if (this.senderCache.length === 0) {
- this.senderCache = await this.senderRepository.find({
- where: { delFlag: false },
- order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
- })
- this.senderCursor = 0
- }
- if (this.senderCache.length === 0) {
- throw new Error('暂无可用 sender 账号')
- }
- const total = this.senderCache.length
- for (let i = 0; i < total; i++) {
- const index = (this.senderCursor + i) % total
- const sender = this.senderCache[index]
- const used = this.senderUsageInBatch.get(sender.id) ?? 0
- if (used < this.senderSendLimit) {
- this.senderCursor = (index + 1) % total
- return sender
- }
- }
- // 所有 sender 均已达到当前批次上限,重置计数重新分配
- this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
- this.senderUsageInBatch.clear()
- this.senderCursor = 0
- return this.senderCache[0]
- }
- private async ensureSessionString(sender: Sender): Promise<string> {
- if (sender.sessionStr) {
- return sender.sessionStr
- }
- if (sender.dcId && sender.authKey) {
- const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
- await this.senderRepository.update(sender.id, { sessionStr: session })
- return session
- }
- throw new Error(`sender=${sender.id} 缺少 session 信息`)
- }
- private parseTarget(targetId: string): string | number | null {
- const trimmed = targetId.trim()
- if (trimmed.startsWith('@') || trimmed.startsWith('+')) {
- return trimmed
- }
- const phoneRegex = /^\d+$/
- if (phoneRegex.test(trimmed)) {
- return `+${trimmed}`
- }
- const integerRegex = /^-?\d+$/
- if (integerRegex.test(trimmed)) {
- return Number(trimmed)
- }
- return null
- }
- private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
- try {
- const fullUser = await client.invoke(
- new Api.users.GetFullUser({
- id: targetPeer
- })
- )
- const fullUserData = fullUser.fullUser as any
- if (fullUserData?.blocked) {
- return false
- }
- if (targetPeer.bot && targetPeer.botChatHistory === false) {
- return false
- }
- if (targetPeer.deleted) {
- return false
- }
- if (targetPeer.fake || targetPeer.scam) {
- return false
- }
- return true
- } catch (error) {
- const errorMessage = error instanceof Error ? error.message : '未知错误'
- if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
- throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
- }
- if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
- return false
- }
- return true
- }
- }
- }
|