| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724 |
- import { FastifyInstance } from 'fastify'
- import { DataSource } from 'typeorm'
- import { Repository } from 'typeorm'
- import { TelegramClient, Api } from 'telegram'
- import { Task, TaskStatus, TaskType } from '../entities/task.entity'
- import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
- import { TgUser } from '../entities/tg-user.entity'
- import { TgUserService } from '../services/tg-user.service'
- import { TgClientManager } from '../services/clients/tg-client.manager'
- import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
- export class TaskExecutor {
- private ds: DataSource
- private taskRepo: Repository<Task>
- private taskItemRepo: Repository<TaskItem>
- private senderRepo: Repository<TgUser>
- private senderService: TgUserService
- // 默认账号使用上限
- private readonly defaultAccountLimit = 5
- // 当前账号使用上限
- private currentAccountLimit = this.defaultAccountLimit
- // 账号缓存默认拉取上限
- private readonly defaultAccountCacheTake = 100
- // 账号缓存最大拉取上限
- private readonly maxAccountCacheTake = 1000
- // 当前任务的账号缓存拉取上限
- private currentAccountCacheTake = this.defaultAccountCacheTake
- // 账号使用批次
- private accountUsageInBatch: Map<string, number> = new Map()
- // 账号游标
- private accountCursor = 0
- // 账号缓存
- private accountCache: TgUser[] = []
- // 本批次临时排除的账号(连接/功能异常等),避免在同一轮里反复选中导致任务整体失败
- private accountExcludedInBatch: Set<string> = new Set()
- constructor(private app: FastifyInstance) {
- const ds = app.dataSource
- this.ds = ds
- this.senderService = new TgUserService(app)
- this.taskRepo = ds.getRepository(Task)
- this.taskItemRepo = ds.getRepository(TaskItem)
- this.senderRepo = ds.getRepository(TgUser)
- }
- /**
- * TaskScheduler 唯一入口
- */
- async execute(task: Task): Promise<void> {
- try {
- await this.beforeExecute(task)
- // 初始化 tgUser 配置
- this.currentAccountLimit =
- task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultAccountLimit
- this.accountUsageInBatch.clear()
- this.accountCursor = 0
- // 计算任务需要拉取的账号池大小
- const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
- this.currentAccountCacheTake = this.computeAccountCacheTake(task, concurrency)
- await this.refreshAccountCache()
- this.accountExcludedInBatch.clear()
- await this.process(task)
- await this.finalize(task.id)
- } catch (err) {
- this.app.log.error({ err, taskId: task.id }, 'TaskExecutor.execute failed')
- throw err
- }
- }
- /**
- * 计算本次任务需要拉取多少个 tgUser 账号进入缓存
- * - 优先支持 task.payload.accountPoolLimit/accountCacheTake 覆盖
- * - 否则使用 ceil(total / accountLimit),并至少不小于 threads(减少并发下账号共享)
- * - 最终受 maxAccountCacheTake 硬上限保护
- */
- private computeAccountCacheTake(task: Task, concurrency: number): number {
- const maxTake = this.maxAccountCacheTake
- const overrideRaw = task.payload?.accountPoolLimit ?? task.payload?.accountCacheTake
- if (overrideRaw !== undefined && overrideRaw !== null) {
- const override = Number(overrideRaw)
- if (!Number.isNaN(override) && override > 0) {
- return Math.min(maxTake, Math.max(1, Math.floor(override)))
- }
- }
- const total = Number(task.total ?? 0)
- const perAccount = Math.max(1, Number(this.currentAccountLimit || 1))
- const needByTotal = total > 0 ? Math.ceil(total / perAccount) : 0
- const desired = Math.max(concurrency, needByTotal || this.defaultAccountCacheTake)
- return Math.min(maxTake, desired)
- }
- /**
- * 执行前校验 & 标记
- */
- private async beforeExecute(task: Task): Promise<void> {
- if (task.status !== TaskStatus.SENDING) {
- throw new Error(`Task ${task.id} status invalid: ${task.status}`)
- }
- if (task.cancelRequested) {
- await this.taskRepo.update(task.id, {
- status: TaskStatus.CANCELED
- })
- throw new Error(`Task ${task.id} canceled before execution`)
- }
- }
- /**
- * 核心发送逻辑(并发 worker)
- */
- private async process(task: Task): Promise<void> {
- const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
- const workers: Promise<void>[] = []
- for (let i = 0; i < concurrency; i++) {
- workers.push(this.workerLoop(task.id))
- }
- await Promise.all(workers)
- }
- /**
- * 单 worker 循环
- */
- private async workerLoop(taskId: number): Promise<void> {
- let tgUser: TgUser | null = null
- const workerTgClient = new TgClientManager()
- let accountUsageInRound = 0
- // 仅用于 INVITE_TO_GROUP:缓存当前账号已加入的群实体,避免每条都重复解析/入群
- let inviteGroupEntity: any | null = null
- try {
- while (true) {
- const task = await this.taskRepo.findOneBy({ id: taskId })
- if (!task) return
- if (task.cancelRequested) {
- if (task.status === TaskStatus.PAUSED) {
- throw new Error('TASK_PAUSED')
- }
- throw new Error('TASK_CANCELED')
- }
- if (task.status !== TaskStatus.SENDING) {
- return
- }
- const taskItem = await this.pickNextTaskItem(taskId)
- if (!taskItem) {
- return
- }
- // tgUser 轮换逻辑
- if (!tgUser || accountUsageInRound >= this.currentAccountLimit) {
- // 换号前:若当前 tgUser 已加入群聊,则先退出群聊再断开
- await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
- await workerTgClient.disconnect()
- tgUser = await this.pickAccount()
- const sessionString = await this.ensureSessionString(tgUser)
- try {
- await workerTgClient.connect(sessionString)
- } catch (error) {
- const msg = error instanceof Error ? error.message : String(error)
- // 会话失效:直接移除该账号
- if (this.isSessionRevokedMessage(msg)) {
- await this.handleSessionRevoked(tgUser)
- tgUser = null
- continue
- }
- // 连接失败:不让错误冒泡导致任务被调度器判死;先排除此账号,换号继续
- this.app.log.warn(
- { taskId, sender: tgUser.id, err: msg },
- 'TelegramClient connect failed, rotate account'
- )
- this.accountExcludedInBatch.add(tgUser.id)
- tgUser = null
- accountUsageInRound = 0
- inviteGroupEntity = null
- await workerTgClient.disconnect().catch(() => {})
- continue
- }
- accountUsageInRound = 0
- inviteGroupEntity = null
- }
- // 延迟
- const delaySeconds = this.getRandomDelaySeconds()
- this.app.log.info(`延迟 ${delaySeconds}s 后开始处理任务`)
- await this.sleep(delaySeconds * 1000)
- try {
- const result = await this.processTaskItem(task, taskItem, tgUser, workerTgClient, inviteGroupEntity)
- // 更新缓存(仅 INVITE_TO_GROUP 会返回)
- if (result?.inviteGroupEntity !== undefined) {
- inviteGroupEntity = result.inviteGroupEntity
- }
- // 邀请失败:按你的预期,立即换一个 tgUser 继续流程
- if (result?.rotateAccount) {
- this.app.log.info(
- { taskId, itemId: taskItem.id, sender: tgUser.id, reason: result.reason ?? 'rotate' },
- 'rotate account due to task item failure'
- )
- this.accountExcludedInBatch.add(tgUser.id)
- // 换号前:退出群聊(如果已加入)
- await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
- await workerTgClient.disconnect().catch(() => {})
- tgUser = null
- accountUsageInRound = 0
- inviteGroupEntity = null
- }
- } catch (error) {
- // 兜底:理论上 processTaskItem 不应抛错;如果抛错,也不要影响整任务
- const msg = error instanceof Error ? error.message : '未知错误'
- this.app.log.error({ taskId, itemId: taskItem.id, sender: tgUser?.id, err: msg }, 'processTaskItem crashed')
- if (tgUser && this.isSessionRevokedMessage(msg)) {
- await this.handleSessionRevoked(tgUser)
- } else if (tgUser) {
- this.accountExcludedInBatch.add(tgUser.id)
- }
- await workerTgClient.disconnect().catch(() => {})
- tgUser = null
- accountUsageInRound = 0
- inviteGroupEntity = null
- } finally {
- accountUsageInRound++
- if (tgUser) {
- const used = (this.accountUsageInBatch.get(tgUser.id) ?? 0) + 1
- this.accountUsageInBatch.set(tgUser.id, used)
- }
- }
- // 处理间隔(每条随机)
- const intervalMs = this.pickIntervalMs(task.intervalTime)
- if (intervalMs > 0) {
- await this.sleep(intervalMs)
- }
- }
- } finally {
- // worker 结束前:尽量退出群聊,避免账号一直挂在群里
- await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
- await workerTgClient.disconnect()
- }
- }
- /**
- * 拉取一个待发送的 TaskItem(DB 层保证并发安全)
- */
- private async pickNextTaskItem(taskId: number): Promise<TaskItem | null> {
- const queryRunner = this.ds.createQueryRunner()
- await queryRunner.connect()
- await queryRunner.startTransaction()
- try {
- const repo = queryRunner.manager.getRepository(TaskItem)
- const item = await repo
- .createQueryBuilder('item')
- .setLock('pessimistic_write')
- .where('item.taskId = :taskId', { taskId })
- .andWhere('item.status = :status', { status: TaskItemStatus.PENDING })
- .orderBy('item.id', 'ASC')
- .getOne()
- if (!item) {
- await queryRunner.commitTransaction()
- return null
- }
- await repo.update(item.id, {
- status: TaskItemStatus.PROCESSING,
- operatingAt: new Date(),
- errorMsg: null
- })
- await queryRunner.commitTransaction()
- return { ...item, status: TaskItemStatus.PROCESSING }
- } catch (err) {
- await queryRunner.rollbackTransaction()
- throw err
- } finally {
- await queryRunner.release()
- }
- }
- /**
- * 处理单个 TaskItem(按 Task.type 分发)
- */
- private async processTaskItem(
- task: Task,
- item: TaskItem,
- sender: TgUser,
- workerTgClient: TgClientManager,
- inviteGroupEntity: any | null
- ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
- if (task.type === TaskType.INVITE_TO_GROUP) {
- return await this.processInviteToGroup(task, item, sender, workerTgClient, inviteGroupEntity)
- }
- return await this.processSendMessage(task, item, sender, workerTgClient)
- }
- private async processSendMessage(
- task: Task,
- item: TaskItem,
- sender: TgUser,
- workerTgClient: TgClientManager
- ): Promise<{ rotateAccount?: boolean; reason?: string } | void> {
- const message = String(task.payload?.message ?? '').trim()
- if (!message) {
- await this.taskItemRepo.update(item.id, {
- status: TaskItemStatus.FAILED,
- operatingAt: new Date(),
- operationId: sender.id,
- errorMsg: '消息内容为空'
- })
- await this.taskRepo.increment({ id: task.id }, 'processed', 1)
- return
- }
- try {
- const parsedTarget = this.parseTarget(item.target)
- if (!parsedTarget) {
- throw new Error('target 格式错误,请检查是否正确')
- }
- const targetPeer = await workerTgClient.getTargetPeer(parsedTarget)
- if (!targetPeer) {
- throw new Error('target 无效,无法获取目标信息')
- }
- const canSendMessage = await this.checkCanSendMessage(workerTgClient.getClient()!, targetPeer)
- if (!canSendMessage) {
- throw new Error('目标用户不允许接收消息或已被限制')
- }
- await workerTgClient.sendMessageToPeer(targetPeer, message)
- await workerTgClient.clearConversation(targetPeer).catch(() => {})
- await workerTgClient.deleteTempContact((targetPeer as any).id).catch(() => {})
- await this.taskItemRepo.update(item.id, {
- status: TaskItemStatus.SUCCESS,
- operatingAt: new Date(),
- operationId: sender.id,
- errorMsg: null
- })
- await this.senderService.incrementUsageCount(sender.id)
- await this.taskRepo.increment({ id: task.id }, 'processed', 1)
- await this.taskRepo.increment({ id: task.id }, 'success', 1)
- this.app.log.info(`✅ 发送成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
- } catch (error) {
- const msg = error instanceof Error ? error.message : '未知错误'
- await this.taskItemRepo.update(item.id, {
- status: TaskItemStatus.FAILED,
- operatingAt: new Date(),
- operationId: sender.id,
- errorMsg: msg
- })
- await this.senderService.incrementUsageCount(sender.id)
- await this.taskRepo.increment({ id: task.id }, 'processed', 1)
- this.app.log.warn(`❌ 发送失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
- // SEND_MESSAGE 默认不强制换号(避免快速耗尽账号池);如需也换号,可在此处返回 rotateAccount: true
- return { rotateAccount: false, reason: msg }
- }
- }
- private async processInviteToGroup(
- task: Task,
- item: TaskItem,
- sender: TgUser,
- workerTgClient: TgClientManager,
- inviteGroupEntity: any | null
- ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
- try {
- const inviteLink = String(task.payload?.inviteLink ?? '').trim()
- if (!inviteLink) {
- throw new Error('群拉人任务:邀请链接为空,请检查任务配置信息')
- }
- const parsedTarget = this.parseTarget(item.target)
- if (!parsedTarget) {
- throw new Error('target 格式错误,请检查是否正确')
- }
- const targetUser = await workerTgClient.getTargetPeer(parsedTarget)
- if (!targetUser) {
- throw new Error('target 无效,无法获取目标信息')
- }
- const client = workerTgClient.getClient()
- if (!client) {
- throw new Error('TelegramClient 未连接')
- }
- // tgUser 加入群组,获取群组实体(每个账号缓存一次)
- let groupEntity = inviteGroupEntity
- if (!groupEntity) {
- groupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
- }
- if (!groupEntity) {
- throw new Error('群拉人任务:未获取到群组实体(inviteGroupEntity 为空)')
- }
- const chatId = groupEntity.chatId ?? groupEntity.id
- const accessHash = groupEntity.accessHash
- if (chatId === undefined || chatId === null || accessHash === undefined || accessHash === null) {
- throw new Error('群拉人任务:群组实体缺少 id/chatId/accessHash(请检查 resolveGroupEntityByInviteLink 返回值)')
- }
- const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
- await workerTgClient.inviteMembersToChannelGroup(inputChannel, [targetUser])
- await this.taskItemRepo.update(item.id, {
- status: TaskItemStatus.SUCCESS,
- operatingAt: new Date(),
- operationId: sender.id,
- errorMsg: null
- })
- await this.senderService.incrementUsageCount(sender.id)
- await this.taskRepo.increment({ id: task.id }, 'processed', 1)
- await this.taskRepo.increment({ id: task.id }, 'success', 1)
- this.app.log.info(`✅ 邀请成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
- return { rotateAccount: false, inviteGroupEntity: groupEntity }
- } catch (error) {
- const msg = error instanceof Error ? error.message : '未知错误'
- // 已在群内:计为成功
- if (msg.includes('USER_ALREADY_PARTICIPANT')) {
- await this.taskItemRepo.update(item.id, {
- status: TaskItemStatus.SUCCESS,
- operatingAt: new Date(),
- operationId: sender.id,
- errorMsg: null
- })
- await this.taskRepo.increment({ id: task.id }, 'processed', 1)
- await this.taskRepo.increment({ id: task.id }, 'success', 1)
- this.app.log.info(`ℹ️ 成员已在群组中 taskId=${task.id}, itemId=${item.id}, target=${item.target}`)
- return { rotateAccount: false }
- }
- await this.taskItemRepo.update(item.id, {
- status: TaskItemStatus.FAILED,
- operatingAt: new Date(),
- operationId: sender.id,
- errorMsg: msg
- })
- await this.senderService.incrementUsageCount(sender.id)
- await this.taskRepo.increment({ id: task.id }, 'processed', 1)
- this.app.log.warn(`❌ 邀请失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
- // INVITE_TO_GROUP:按需求,失败就换号继续(避免单号被冻结/受限导致整体停滞)
- return { rotateAccount: true, reason: msg }
- }
- }
- /**
- * 换号/断开前:让当前账号退出已加入的群聊
- * - 仅对 INVITE_TO_GROUP 有意义;其它任务 inviteGroupEntity 为空会直接跳过
- * - 不抛错:避免退群失败影响整体任务流程
- */
- private async safeLeaveInviteGroup(workerTgClient: TgClientManager, inviteGroupEntity: any | null): Promise<void> {
- if (!inviteGroupEntity) return
- const chatId = inviteGroupEntity.chatId ?? inviteGroupEntity.id
- const accessHash = inviteGroupEntity.accessHash
- if (chatId === undefined || chatId === null || accessHash === undefined || accessHash === null) return
- try {
- const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
- await workerTgClient.leaveGroup(inputChannel)
- } catch {
- // 忽略退群异常(可能已不在群、权限问题等)
- }
- }
- /**
- * 收尾逻辑
- */
- private async finalize(taskId: number): Promise<void> {
- const remain = await this.taskItemRepo
- .createQueryBuilder('item')
- .where('item.taskId = :taskId', { taskId })
- .andWhere('item.status IN (:...statuses)', { statuses: [TaskItemStatus.PENDING, TaskItemStatus.PROCESSING] })
- .getCount()
- if (remain > 0) {
- return
- }
- const task = await this.taskRepo.findOneBy({ id: taskId })
- if (!task) return
- if (task.cancelRequested) {
- await this.taskRepo.update(taskId, {
- status: TaskStatus.CANCELED
- })
- return
- }
- await this.taskRepo.update(taskId, {
- status: TaskStatus.COMPLETED
- })
- }
- private pickIntervalMs(intervalTime?: string | null): number {
- const raw = String(intervalTime ?? '').trim()
- if (!raw) return 0
- const normalized = raw.replace('~', '-').replace(',', '-').replace(/\s+/g, '-')
- const parts = normalized.split('-').filter(Boolean)
- let minSec: number
- let maxSec: number
- if (parts.length === 1) {
- minSec = Number(parts[0])
- maxSec = Number(parts[0])
- } else {
- minSec = Number(parts[0])
- maxSec = Number(parts[1])
- }
- if (Number.isNaN(minSec) || Number.isNaN(maxSec)) return 0
- if (minSec < 0 || maxSec < 0) return 0
- if (maxSec < minSec) [minSec, maxSec] = [maxSec, minSec]
- const sec = minSec === maxSec ? minSec : Math.floor(Math.random() * (maxSec - minSec + 1)) + minSec
- return sec * 1000
- }
- /**
- * 刷新 tgUser 账号缓存
- */
- private async refreshAccountCache(): Promise<void> {
- this.accountCache = await this.senderRepo.find({
- where: { delFlag: false },
- order: { lastUsageTime: 'ASC', usageCount: 'ASC' },
- take: this.currentAccountCacheTake
- })
- this.accountCursor = 0
- }
- /**
- * 选择可用的tgUser 账号
- */
- private async pickAccount(): Promise<TgUser> {
- if (this.accountCache.length === 0) {
- this.accountCache = await this.senderRepo.find({
- where: { delFlag: false },
- order: { lastUsageTime: 'ASC', usageCount: 'ASC' },
- take: this.currentAccountCacheTake
- })
- this.accountCursor = 0
- }
- if (this.accountCache.length === 0) {
- throw new Error('暂无可用 tgUser 账号')
- }
- const total = this.accountCache.length
- const tryPick = (): TgUser | null => {
- for (let i = 0; i < total; i++) {
- const index = (this.accountCursor + i) % total
- const account = this.accountCache[index]
- if (this.accountExcludedInBatch.has(account.id)) continue
- const used = this.accountUsageInBatch.get(account.id) ?? 0
- if (used < this.currentAccountLimit) {
- this.accountCursor = (index + 1) % total
- return account
- }
- }
- return null
- }
- const picked1 = tryPick()
- if (picked1) return picked1
- this.app.log.info('所有 tgUser 均已达到当前批次上限,重置计数后重新轮询')
- this.accountUsageInBatch.clear()
- this.accountCursor = 0
- const picked2 = tryPick()
- if (picked2) return picked2
- // 如果全部被排除,说明这一批账号都异常/受限:清空排除集再尝试一次
- if (this.accountExcludedInBatch.size > 0) {
- this.app.log.warn('本批次所有 tgUser 均被排除,清空排除列表后重试')
- this.accountExcludedInBatch.clear()
- const picked3 = tryPick()
- if (picked3) return picked3
- }
- // 兜底:返回第一个,避免直接抛错导致任务中断(但大概率会在连接失败后再次被排除并轮换)
- return this.accountCache[0]
- }
- /**
- * 确保发送者有有效的会话字符串
- */
- private async ensureSessionString(tgUser: TgUser): Promise<string> {
- if (tgUser.sessionStr) {
- return tgUser.sessionStr
- }
- if (tgUser.dcId && tgUser.authKey) {
- const session = buildStringSessionByDcIdAndAuthKey(tgUser.dcId, tgUser.authKey)
- await this.senderRepo.update(tgUser.id, { sessionStr: session })
- return session
- }
- throw new Error(`tgUser=${tgUser.id} 缺少 session 信息`)
- }
- /**
- * 处理会话被撤销的情况
- */
- private async handleSessionRevoked(tgUser: TgUser): Promise<void> {
- await this.senderRepo.update(tgUser.id, { delFlag: true })
- this.accountCache = this.accountCache.filter(a => a.id !== tgUser.id)
- this.accountCursor = 0
- this.app.log.warn(`tgUser=${tgUser.id} session 失效,已删除`)
- }
- /**
- * 检查是否可以发送消息给目标用户
- */
- private async checkCanSendMessage(client: TelegramClient, targetPeer: any): Promise<boolean> {
- try {
- const fullUser = await client.invoke(
- new Api.users.GetFullUser({
- id: targetPeer
- })
- )
- const fullUserData = fullUser.fullUser as any
- if (fullUserData?.blocked) {
- return false
- }
- if (targetPeer.bot && targetPeer.botChatHistory === false) {
- return false
- }
- if (targetPeer.deleted) {
- return false
- }
- if (targetPeer.fake || targetPeer.scam) {
- return false
- }
- return true
- } catch (error) {
- const errorMessage = error instanceof Error ? error.message : '未知错误'
- if (errorMessage.includes('AUTH_KEY_UNREGISTERED')) {
- throw new Error('认证密钥未注册,请检查 session 是否有效或需要重新授权')
- }
- if (errorMessage.includes('PRIVACY') || errorMessage.includes('USER_PRIVACY_RESTRICTED')) {
- return false
- }
- return true
- }
- }
- /**
- * 解析目标标识符
- */
- private parseTarget(targetId: string): string | number | null {
- const trimmed = targetId.trim()
- // 用户名 手机号
- if (trimmed.startsWith('@') || trimmed.startsWith('+')) {
- return trimmed
- }
- // 手机号 不带+号,使用正则
- const phoneRegex = /^\d+$/
- if (phoneRegex.test(trimmed)) {
- return `+${trimmed}`
- }
- // 用户 id
- const integerRegex = /^-?\d+$/
- if (integerRegex.test(trimmed)) {
- return Number(trimmed)
- }
- return null
- }
- /**
- * 获取随机延迟秒数
- */
- private getRandomDelaySeconds(min: number = 5, max: number = 10): number {
- return Math.floor(Math.random() * (max - min + 1)) + min
- }
- /**
- * 检查错误消息是否表示会话被撤销
- */
- private isSessionRevokedMessage(msg: string): boolean {
- return msg.includes('SESSION_REVOKED') || msg.includes('AUTH_KEY_UNREGISTERED') || msg.includes('AUTH_KEY_INVALID')
- }
- /**
- * 延迟指定毫秒数
- */
- private async sleep(ms: number): Promise<void> {
- return await new Promise(resolve => setTimeout(resolve, ms))
- }
- }
|