|
|
@@ -32,6 +32,11 @@ export class TaskExecutor {
|
|
|
private accountCursor = 0
|
|
|
// 账号缓存
|
|
|
private accountCache: TgUser[] = []
|
|
|
+ // 本批次临时排除的账号(连接/功能异常等),避免在同一轮里反复选中导致任务整体失败
|
|
|
+ private accountExcludedInBatch: Set<string> = new Set()
|
|
|
+ // 并发 worker 的账号租约:同一时刻尽量保证一个 tgUser 只被一个 worker 使用
|
|
|
+ private accountLeaseByWorker: Map<number, string> = new Map()
|
|
|
+ private accountLeaseOwner: Map<string, number> = new Map()
|
|
|
|
|
|
constructor(private app: FastifyInstance) {
|
|
|
const ds = app.dataSource
|
|
|
@@ -54,10 +59,13 @@ export class TaskExecutor {
|
|
|
task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultAccountLimit
|
|
|
this.accountUsageInBatch.clear()
|
|
|
this.accountCursor = 0
|
|
|
+ this.accountLeaseByWorker.clear()
|
|
|
+ this.accountLeaseOwner.clear()
|
|
|
// 计算任务需要拉取的账号池大小
|
|
|
const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
|
|
|
this.currentAccountCacheTake = this.computeAccountCacheTake(task, concurrency)
|
|
|
await this.refreshAccountCache()
|
|
|
+ this.accountExcludedInBatch.clear()
|
|
|
|
|
|
await this.process(task)
|
|
|
await this.finalize(task.id)
|
|
|
@@ -117,7 +125,7 @@ export class TaskExecutor {
|
|
|
const workers: Promise<void>[] = []
|
|
|
|
|
|
for (let i = 0; i < concurrency; i++) {
|
|
|
- workers.push(this.workerLoop(task.id))
|
|
|
+ workers.push(this.workerLoop(task.id, i))
|
|
|
}
|
|
|
|
|
|
await Promise.all(workers)
|
|
|
@@ -126,10 +134,11 @@ export class TaskExecutor {
|
|
|
/**
|
|
|
* 单 worker 循环
|
|
|
*/
|
|
|
- private async workerLoop(taskId: number): Promise<void> {
|
|
|
+ private async workerLoop(taskId: number, workerId: number): Promise<void> {
|
|
|
let tgUser: TgUser | null = null
|
|
|
const workerTgClient = new TgClientManager()
|
|
|
let accountUsageInRound = 0
|
|
|
+ // 仅用于 INVITE_TO_GROUP:缓存当前账号已加入的群实体,避免每条都重复解析/入群
|
|
|
let inviteGroupEntity: any | null = null
|
|
|
|
|
|
try {
|
|
|
@@ -155,20 +164,38 @@ export class TaskExecutor {
|
|
|
|
|
|
// tgUser 轮换逻辑
|
|
|
if (!tgUser || accountUsageInRound >= this.currentAccountLimit) {
|
|
|
+ // 换号前:若当前 tgUser 已加入群聊,则先退出群聊再断开
|
|
|
+ await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
|
|
|
await workerTgClient.disconnect()
|
|
|
- tgUser = await this.pickAccount()
|
|
|
+ // 释放旧租约后再重新获取,避免一个 worker 永久占用账号
|
|
|
+ if (tgUser) {
|
|
|
+ this.releaseAccountLease(workerId, tgUser.id)
|
|
|
+ }
|
|
|
+ tgUser = await this.acquireAccount(workerId)
|
|
|
const sessionString = await this.ensureSessionString(tgUser)
|
|
|
|
|
|
try {
|
|
|
await workerTgClient.connect(sessionString)
|
|
|
} catch (error) {
|
|
|
const msg = error instanceof Error ? error.message : String(error)
|
|
|
+ // 会话失效:直接移除该账号
|
|
|
if (this.isSessionRevokedMessage(msg)) {
|
|
|
await this.handleSessionRevoked(tgUser)
|
|
|
tgUser = null
|
|
|
continue
|
|
|
}
|
|
|
- throw error
|
|
|
+ // 连接失败:不让错误冒泡导致任务被调度器判死;先排除此账号,换号继续
|
|
|
+ this.app.log.warn(
|
|
|
+ { taskId, sender: tgUser.id, err: msg },
|
|
|
+ 'TelegramClient connect failed, rotate account'
|
|
|
+ )
|
|
|
+ this.accountExcludedInBatch.add(tgUser.id)
|
|
|
+ this.releaseAccountLease(workerId, tgUser.id)
|
|
|
+ tgUser = null
|
|
|
+ accountUsageInRound = 0
|
|
|
+ inviteGroupEntity = null
|
|
|
+ await workerTgClient.disconnect().catch(() => {})
|
|
|
+ continue
|
|
|
}
|
|
|
|
|
|
accountUsageInRound = 0
|
|
|
@@ -181,15 +208,42 @@ export class TaskExecutor {
|
|
|
await this.sleep(delaySeconds * 1000)
|
|
|
|
|
|
try {
|
|
|
- await this.processTaskItem(task, taskItem, tgUser, workerTgClient)
|
|
|
+ const result = await this.processTaskItem(task, taskItem, tgUser, workerTgClient, inviteGroupEntity)
|
|
|
+ // 更新缓存(仅 INVITE_TO_GROUP 会返回)
|
|
|
+ if (result?.inviteGroupEntity !== undefined) {
|
|
|
+ inviteGroupEntity = result.inviteGroupEntity
|
|
|
+ }
|
|
|
+ // 邀请失败:按你的预期,立即换一个 tgUser 继续流程
|
|
|
+ if (result?.rotateAccount) {
|
|
|
+ this.app.log.info(
|
|
|
+ { taskId, itemId: taskItem.id, sender: tgUser.id, reason: result.reason ?? 'rotate' },
|
|
|
+ 'rotate account due to task item failure'
|
|
|
+ )
|
|
|
+ this.accountExcludedInBatch.add(tgUser.id)
|
|
|
+ // 换号前:退出群聊(如果已加入)
|
|
|
+ await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
|
|
|
+ await workerTgClient.disconnect().catch(() => {})
|
|
|
+ this.releaseAccountLease(workerId, tgUser.id)
|
|
|
+ tgUser = null
|
|
|
+ accountUsageInRound = 0
|
|
|
+ inviteGroupEntity = null
|
|
|
+ }
|
|
|
} catch (error) {
|
|
|
+ // 兜底:理论上 processTaskItem 不应抛错;如果抛错,也不要影响整任务
|
|
|
const msg = error instanceof Error ? error.message : '未知错误'
|
|
|
+ this.app.log.error({ taskId, itemId: taskItem.id, sender: tgUser?.id, err: msg }, 'processTaskItem crashed')
|
|
|
if (tgUser && this.isSessionRevokedMessage(msg)) {
|
|
|
await this.handleSessionRevoked(tgUser)
|
|
|
- await workerTgClient.disconnect()
|
|
|
- tgUser = null
|
|
|
- accountUsageInRound = 0
|
|
|
+ } else if (tgUser) {
|
|
|
+ this.accountExcludedInBatch.add(tgUser.id)
|
|
|
+ }
|
|
|
+ if (tgUser) {
|
|
|
+ this.releaseAccountLease(workerId, tgUser.id)
|
|
|
}
|
|
|
+ await workerTgClient.disconnect().catch(() => {})
|
|
|
+ tgUser = null
|
|
|
+ accountUsageInRound = 0
|
|
|
+ inviteGroupEntity = null
|
|
|
} finally {
|
|
|
accountUsageInRound++
|
|
|
if (tgUser) {
|
|
|
@@ -205,7 +259,12 @@ export class TaskExecutor {
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
|
+ // worker 结束前:尽量退出群聊,避免账号一直挂在群里
|
|
|
+ await this.safeLeaveInviteGroup(workerTgClient, inviteGroupEntity).catch(() => {})
|
|
|
await workerTgClient.disconnect()
|
|
|
+ if (tgUser) {
|
|
|
+ this.releaseAccountLease(workerId, tgUser.id)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -254,10 +313,11 @@ export class TaskExecutor {
|
|
|
task: Task,
|
|
|
item: TaskItem,
|
|
|
sender: TgUser,
|
|
|
- workerTgClient: TgClientManager
|
|
|
- ): Promise<void> {
|
|
|
+ workerTgClient: TgClientManager,
|
|
|
+ inviteGroupEntity: any | null
|
|
|
+ ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
|
|
|
if (task.type === TaskType.INVITE_TO_GROUP) {
|
|
|
- return await this.processInviteToGroup(task, item, sender, workerTgClient)
|
|
|
+ return await this.processInviteToGroup(task, item, sender, workerTgClient, inviteGroupEntity)
|
|
|
}
|
|
|
|
|
|
return await this.processSendMessage(task, item, sender, workerTgClient)
|
|
|
@@ -268,7 +328,7 @@ export class TaskExecutor {
|
|
|
item: TaskItem,
|
|
|
sender: TgUser,
|
|
|
workerTgClient: TgClientManager
|
|
|
- ): Promise<void> {
|
|
|
+ ): Promise<{ rotateAccount?: boolean; reason?: string } | void> {
|
|
|
const message = String(task.payload?.message ?? '').trim()
|
|
|
if (!message) {
|
|
|
await this.taskItemRepo.update(item.id, {
|
|
|
@@ -326,7 +386,8 @@ export class TaskExecutor {
|
|
|
await this.taskRepo.increment({ id: task.id }, 'processed', 1)
|
|
|
|
|
|
this.app.log.warn(`❌ 发送失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
|
|
|
- throw error
|
|
|
+ // SEND_MESSAGE 默认不强制换号(避免快速耗尽账号池);如需也换号,可在此处返回 rotateAccount: true
|
|
|
+ return { rotateAccount: false, reason: msg }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -334,8 +395,11 @@ export class TaskExecutor {
|
|
|
task: Task,
|
|
|
item: TaskItem,
|
|
|
sender: TgUser,
|
|
|
- workerTgClient: TgClientManager
|
|
|
- ): Promise<void> {
|
|
|
+ workerTgClient: TgClientManager,
|
|
|
+ inviteGroupEntity: any | null
|
|
|
+ ): Promise<{ rotateAccount?: boolean; reason?: string; inviteGroupEntity?: any | null } | void> {
|
|
|
+ // 注意:邀请失败时也需要能拿到已加入的群实体用于退群
|
|
|
+ let groupEntity: any | null = inviteGroupEntity
|
|
|
try {
|
|
|
const inviteLink = String(task.payload?.inviteLink ?? '').trim()
|
|
|
if (!inviteLink) {
|
|
|
@@ -357,14 +421,27 @@ export class TaskExecutor {
|
|
|
throw new Error('TelegramClient 未连接')
|
|
|
}
|
|
|
|
|
|
- // tgUser 加入群组,获取群组实体
|
|
|
- const inviteGroupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
|
|
|
- if (!inviteGroupEntity) {
|
|
|
+ // tgUser 加入群组,获取群组实体(每个账号缓存一次)
|
|
|
+ if (!groupEntity) {
|
|
|
+ groupEntity = await workerTgClient.resolveGroupEntityByInviteLink(inviteLink)
|
|
|
+ }
|
|
|
+ if (!groupEntity) {
|
|
|
throw new Error('群拉人任务:未获取到群组实体(inviteGroupEntity 为空)')
|
|
|
}
|
|
|
+ const chatId = groupEntity.chatId ?? groupEntity.id
|
|
|
+ const accessHash = groupEntity.accessHash
|
|
|
+ if (chatId === undefined || chatId === null) {
|
|
|
+ throw new Error('群拉人任务:群组实体缺少 id/chatId(请检查 resolveGroupEntityByInviteLink 返回值)')
|
|
|
+ }
|
|
|
|
|
|
- const inputChannel = await workerTgClient.getInputChannel(inviteGroupEntity.chatId, inviteGroupEntity.accessHash)
|
|
|
- await workerTgClient.inviteMembersToChannelGroup(inputChannel, [targetUser])
|
|
|
+ // 超级群/频道:有 accessHash,走 channels.InviteToChannel
|
|
|
+ if (accessHash !== undefined && accessHash !== null) {
|
|
|
+ const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
|
|
|
+ await workerTgClient.inviteMembersToChannelGroup(inputChannel, [targetUser])
|
|
|
+ } else {
|
|
|
+ // 普通群(Chat):没有 accessHash,走 messages.AddChatUser
|
|
|
+ await workerTgClient.inviteUserToBasicChat(chatId, targetUser)
|
|
|
+ }
|
|
|
|
|
|
await this.taskItemRepo.update(item.id, {
|
|
|
status: TaskItemStatus.SUCCESS,
|
|
|
@@ -378,6 +455,7 @@ export class TaskExecutor {
|
|
|
await this.taskRepo.increment({ id: task.id }, 'success', 1)
|
|
|
|
|
|
this.app.log.info(`✅ 邀请成功 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}`)
|
|
|
+ return { rotateAccount: false, inviteGroupEntity: groupEntity }
|
|
|
} catch (error) {
|
|
|
const msg = error instanceof Error ? error.message : '未知错误'
|
|
|
|
|
|
@@ -392,7 +470,7 @@ export class TaskExecutor {
|
|
|
await this.taskRepo.increment({ id: task.id }, 'processed', 1)
|
|
|
await this.taskRepo.increment({ id: task.id }, 'success', 1)
|
|
|
this.app.log.info(`ℹ️ 成员已在群组中 taskId=${task.id}, itemId=${item.id}, target=${item.target}`)
|
|
|
- return
|
|
|
+ return { rotateAccount: false }
|
|
|
}
|
|
|
|
|
|
await this.taskItemRepo.update(item.id, {
|
|
|
@@ -406,7 +484,33 @@ export class TaskExecutor {
|
|
|
await this.taskRepo.increment({ id: task.id }, 'processed', 1)
|
|
|
|
|
|
this.app.log.warn(`❌ 邀请失败 taskId=${task.id}, itemId=${item.id}, sender=${sender.id}, error: ${msg}`)
|
|
|
- throw error
|
|
|
+ // 关键:邀请失败时也要尽量退出刚加入的群,避免账号一直挂在群里
|
|
|
+ await this.safeLeaveInviteGroup(workerTgClient, groupEntity).catch(() => {})
|
|
|
+ // INVITE_TO_GROUP:按需求,失败就换号继续(避免单号被冻结/受限导致整体停滞)
|
|
|
+ // 同时清空缓存,避免后续误以为仍在群内
|
|
|
+ return { rotateAccount: true, reason: msg, inviteGroupEntity: null }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 换号/断开前:让当前账号退出已加入的群聊
|
|
|
+ * - 仅对 INVITE_TO_GROUP 有意义;其它任务 inviteGroupEntity 为空会直接跳过
|
|
|
+ * - 不抛错:避免退群失败影响整体任务流程
|
|
|
+ */
|
|
|
+ private async safeLeaveInviteGroup(workerTgClient: TgClientManager, inviteGroupEntity: any | null): Promise<void> {
|
|
|
+ if (!inviteGroupEntity) return
|
|
|
+ const chatId = inviteGroupEntity.chatId ?? inviteGroupEntity.id
|
|
|
+ const accessHash = inviteGroupEntity.accessHash
|
|
|
+ try {
|
|
|
+ if (chatId === undefined || chatId === null) return
|
|
|
+ if (accessHash !== undefined && accessHash !== null) {
|
|
|
+ const inputChannel = await workerTgClient.getInputChannel(chatId, accessHash)
|
|
|
+ await workerTgClient.leaveGroup(inputChannel)
|
|
|
+ } else {
|
|
|
+ await workerTgClient.leaveBasicChat(chatId)
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
+ // 忽略退群异常(可能已不在群、权限问题等)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -477,9 +581,9 @@ export class TaskExecutor {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 选择可用的tgUser 账号
|
|
|
+ * 获取账号(带 worker 租约,尽量避免并发 worker 复用同一 tgUser)
|
|
|
*/
|
|
|
- private async pickAccount(): Promise<TgUser> {
|
|
|
+ private async acquireAccount(workerId: number): Promise<TgUser> {
|
|
|
if (this.accountCache.length === 0) {
|
|
|
this.accountCache = await this.senderRepo.find({
|
|
|
where: { delFlag: false },
|
|
|
@@ -494,20 +598,86 @@ export class TaskExecutor {
|
|
|
}
|
|
|
|
|
|
const total = this.accountCache.length
|
|
|
- for (let i = 0; i < total; i++) {
|
|
|
- const index = (this.accountCursor + i) % total
|
|
|
- const account = this.accountCache[index]
|
|
|
- const used = this.accountUsageInBatch.get(account.id) ?? 0
|
|
|
- if (used < this.currentAccountLimit) {
|
|
|
- this.accountCursor = (index + 1) % total
|
|
|
- return account
|
|
|
+ const tryPick = (respectLease: boolean): TgUser | null => {
|
|
|
+ for (let i = 0; i < total; i++) {
|
|
|
+ const index = (this.accountCursor + i) % total
|
|
|
+ const account = this.accountCache[index]
|
|
|
+ if (this.accountExcludedInBatch.has(account.id)) continue
|
|
|
+ if (respectLease) {
|
|
|
+ const leasedBy = this.accountLeaseOwner.get(account.id)
|
|
|
+ if (leasedBy !== undefined && leasedBy !== workerId) continue
|
|
|
+ }
|
|
|
+ const used = this.accountUsageInBatch.get(account.id) ?? 0
|
|
|
+ if (used < this.currentAccountLimit) {
|
|
|
+ this.accountCursor = (index + 1) % total
|
|
|
+ return account
|
|
|
+ }
|
|
|
}
|
|
|
+ return null
|
|
|
+ }
|
|
|
+
|
|
|
+ // 先按“租约”挑选:一个账号尽量只服务一个 worker
|
|
|
+ const picked1 = tryPick(true)
|
|
|
+ if (picked1) {
|
|
|
+ this.setAccountLease(workerId, picked1.id)
|
|
|
+ return picked1
|
|
|
}
|
|
|
|
|
|
this.app.log.info('所有 tgUser 均已达到当前批次上限,重置计数后重新轮询')
|
|
|
this.accountUsageInBatch.clear()
|
|
|
this.accountCursor = 0
|
|
|
- return this.accountCache[0]
|
|
|
+
|
|
|
+ const picked2 = tryPick(true)
|
|
|
+ if (picked2) {
|
|
|
+ this.setAccountLease(workerId, picked2.id)
|
|
|
+ return picked2
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果全部被排除,说明这一批账号都异常/受限:清空排除集再尝试一次
|
|
|
+ if (this.accountExcludedInBatch.size > 0) {
|
|
|
+ this.app.log.warn('本批次所有 tgUser 均被排除,清空排除列表后重试')
|
|
|
+ this.accountExcludedInBatch.clear()
|
|
|
+ const picked3 = tryPick(true)
|
|
|
+ if (picked3) {
|
|
|
+ this.setAccountLease(workerId, picked3.id)
|
|
|
+ return picked3
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 兜底:若账号不足/都被占用,允许复用(不阻塞任务),但给出明显告警
|
|
|
+ this.app.log.warn(
|
|
|
+ { workerId, totalAccounts: total },
|
|
|
+ '没有可用的未占用 tgUser(账号可能不足或都被占用),worker 将复用账号'
|
|
|
+ )
|
|
|
+ const fallback = tryPick(false) ?? this.accountCache[0]
|
|
|
+ this.setAccountLease(workerId, fallback.id)
|
|
|
+ return fallback
|
|
|
+ }
|
|
|
+
|
|
|
+ private setAccountLease(workerId: number, accountId: string): void {
|
|
|
+ // 一个 worker 只持有一个租约:覆盖前先释放旧租约
|
|
|
+ const old = this.accountLeaseByWorker.get(workerId)
|
|
|
+ if (old && old !== accountId) {
|
|
|
+ this.releaseAccountLease(workerId, old)
|
|
|
+ }
|
|
|
+ const owner = this.accountLeaseOwner.get(accountId)
|
|
|
+ if (owner !== undefined && owner !== workerId) {
|
|
|
+ // 理论上 acquireAccount(respectLease=true) 不会走到这里;兜底覆盖,避免状态不一致
|
|
|
+ this.accountLeaseByWorker.delete(owner)
|
|
|
+ }
|
|
|
+ this.accountLeaseByWorker.set(workerId, accountId)
|
|
|
+ this.accountLeaseOwner.set(accountId, workerId)
|
|
|
+ }
|
|
|
+
|
|
|
+ private releaseAccountLease(workerId: number, accountId: string): void {
|
|
|
+ const owned = this.accountLeaseByWorker.get(workerId)
|
|
|
+ if (owned === accountId) {
|
|
|
+ this.accountLeaseByWorker.delete(workerId)
|
|
|
+ }
|
|
|
+ const owner = this.accountLeaseOwner.get(accountId)
|
|
|
+ if (owner === workerId) {
|
|
|
+ this.accountLeaseOwner.delete(accountId)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|