|
|
@@ -6,7 +6,7 @@ import { Task, TaskStatus, TaskType } from '../entities/task.entity'
|
|
|
import { TaskItem, TaskItemStatus } from '../entities/task-item.entity'
|
|
|
import { TgUser } from '../entities/tg-user.entity'
|
|
|
import { TgUserService } from '../services/tg-user.service'
|
|
|
-import { TgClientService } from '../services/tgClient.service'
|
|
|
+import { TgClientManager } from '../services/clients/tg-client.manager'
|
|
|
import { buildStringSessionByDcIdAndAuthKey } from '../utils/tg.util'
|
|
|
|
|
|
export class TaskExecutor {
|
|
|
@@ -16,11 +16,22 @@ export class TaskExecutor {
|
|
|
private senderRepo: Repository<TgUser>
|
|
|
private senderService: TgUserService
|
|
|
|
|
|
- private readonly defaultSenderSendLimit = 5
|
|
|
- private currentSenderSendLimit = this.defaultSenderSendLimit
|
|
|
- private senderUsageInBatch: Map<string, number> = new Map()
|
|
|
- private senderCursor = 0
|
|
|
- private senderCache: TgUser[] = []
|
|
|
+ // 默认账号使用上限
|
|
|
+ private readonly defaultAccountLimit = 5
|
|
|
+ // 当前账号使用上限
|
|
|
+ private currentAccountLimit = this.defaultAccountLimit
|
|
|
+ // 账号缓存默认拉取上限
|
|
|
+ private readonly defaultAccountCacheTake = 100
|
|
|
+ // 账号缓存最大拉取上限
|
|
|
+ private readonly maxAccountCacheTake = 1000
|
|
|
+ // 当前任务的账号缓存拉取上限
|
|
|
+ private currentAccountCacheTake = this.defaultAccountCacheTake
|
|
|
+ // 账号使用批次
|
|
|
+ private accountUsageInBatch: Map<string, number> = new Map()
|
|
|
+ // 账号游标
|
|
|
+ private accountCursor = 0
|
|
|
+ // 账号缓存
|
|
|
+ private accountCache: TgUser[] = []
|
|
|
|
|
|
constructor(private app: FastifyInstance) {
|
|
|
const ds = app.dataSource
|
|
|
@@ -38,12 +49,15 @@ export class TaskExecutor {
|
|
|
try {
|
|
|
await this.beforeExecute(task)
|
|
|
|
|
|
- // 初始化 sender 配置
|
|
|
- this.currentSenderSendLimit =
|
|
|
- task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultSenderSendLimit
|
|
|
- this.senderUsageInBatch.clear()
|
|
|
- this.senderCursor = 0
|
|
|
- await this.refreshSenderCache()
|
|
|
+ // 初始化 tgUser 配置
|
|
|
+ this.currentAccountLimit =
|
|
|
+ task.accountLimit && Number(task.accountLimit) > 0 ? Number(task.accountLimit) : this.defaultAccountLimit
|
|
|
+ this.accountUsageInBatch.clear()
|
|
|
+ this.accountCursor = 0
|
|
|
+ // 计算任务需要拉取的账号池大小
|
|
|
+ const concurrency = Math.min(10, Math.max(1, Number(task.threads ?? 1)))
|
|
|
+ this.currentAccountCacheTake = this.computeAccountCacheTake(task, concurrency)
|
|
|
+ await this.refreshAccountCache()
|
|
|
|
|
|
await this.process(task)
|
|
|
await this.finalize(task.id)
|
|
|
@@ -53,6 +67,31 @@ export class TaskExecutor {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 计算本次任务需要拉取多少个 tgUser 账号进入缓存
|
|
|
+ * - 优先支持 task.payload.accountPoolLimit/accountCacheTake 覆盖
|
|
|
+ * - 否则使用 ceil(total / accountLimit),并至少不小于 threads(减少并发下账号共享)
|
|
|
+ * - 最终受 maxAccountCacheTake 硬上限保护
|
|
|
+ */
|
|
|
+ private computeAccountCacheTake(task: Task, concurrency: number): number {
|
|
|
+ const maxTake = this.maxAccountCacheTake
|
|
|
+
|
|
|
+ const overrideRaw = task.payload?.accountPoolLimit ?? task.payload?.accountCacheTake
|
|
|
+ if (overrideRaw !== undefined && overrideRaw !== null) {
|
|
|
+ const override = Number(overrideRaw)
|
|
|
+ if (!Number.isNaN(override) && override > 0) {
|
|
|
+ return Math.min(maxTake, Math.max(1, Math.floor(override)))
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ const total = Number(task.total ?? 0)
|
|
|
+ const perAccount = Math.max(1, Number(this.currentAccountLimit || 1))
|
|
|
+ const needByTotal = total > 0 ? Math.ceil(total / perAccount) : 0
|
|
|
+
|
|
|
+ const desired = Math.max(concurrency, needByTotal || this.defaultAccountCacheTake)
|
|
|
+ return Math.min(maxTake, desired)
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 执行前校验 & 标记
|
|
|
*/
|
|
|
@@ -88,9 +127,9 @@ export class TaskExecutor {
|
|
|
* 单 worker 循环
|
|
|
*/
|
|
|
private async workerLoop(taskId: number): Promise<void> {
|
|
|
- let sender: TgUser | null = null
|
|
|
- const workerTgClient = new TgClientService()
|
|
|
- let senderSentInRound = 0
|
|
|
+ let tgUser: TgUser | null = null
|
|
|
+ const workerTgClient = new TgClientManager()
|
|
|
+ let accountUsageInRound = 0
|
|
|
let inviteGroupEntity: any | null = null
|
|
|
|
|
|
try {
|
|
|
@@ -114,42 +153,33 @@ export class TaskExecutor {
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- // Sender 轮换逻辑
|
|
|
- if (!sender || senderSentInRound >= this.currentSenderSendLimit) {
|
|
|
+ // tgUser 轮换逻辑
|
|
|
+ if (!tgUser || accountUsageInRound >= this.currentAccountLimit) {
|
|
|
await workerTgClient.disconnect()
|
|
|
- sender = await this.pickSender()
|
|
|
- const sessionString = await this.ensureSessionString(sender)
|
|
|
+ tgUser = await this.pickAccount()
|
|
|
+ const sessionString = await this.ensureSessionString(tgUser)
|
|
|
|
|
|
try {
|
|
|
await workerTgClient.connect(sessionString)
|
|
|
} catch (error) {
|
|
|
const msg = error instanceof Error ? error.message : String(error)
|
|
|
if (this.isSessionRevokedMessage(msg)) {
|
|
|
- await this.handleSessionRevoked(sender)
|
|
|
- sender = null
|
|
|
+ await this.handleSessionRevoked(tgUser)
|
|
|
+ tgUser = null
|
|
|
continue
|
|
|
}
|
|
|
throw error
|
|
|
}
|
|
|
|
|
|
- senderSentInRound = 0
|
|
|
+ accountUsageInRound = 0
|
|
|
inviteGroupEntity = null
|
|
|
|
|
|
- // 获取当前账号信息并延迟
|
|
|
- const me = await workerTgClient
|
|
|
- .getClient()
|
|
|
- ?.getMe()
|
|
|
- .catch(() => null)
|
|
|
+ // 延迟
|
|
|
const delaySeconds = this.getRandomDelaySeconds()
|
|
|
- const displayName = `${me?.firstName ?? ''} ${me?.lastName ?? ''}`.trim() || me?.username || ''
|
|
|
- this.app.log.info(
|
|
|
- `当前登录账号: id: ${me?.id ?? sender.id}, name: ${
|
|
|
- displayName || sender.id
|
|
|
- },延迟 ${delaySeconds}s 后开始发送`
|
|
|
- )
|
|
|
+ this.app.log.info(`延迟 ${delaySeconds}s 后开始处理任务`)
|
|
|
await this.sleep(delaySeconds * 1000)
|
|
|
|
|
|
- // 邀请任务:每次换号后,确保已加入目标群并拿到群实体
|
|
|
+ // 群拉人任务:每次换号后,确保已加入目标群并拿到群实体
|
|
|
if (task.type === TaskType.INVITE_TO_GROUP) {
|
|
|
const inviteLink = String(task.payload?.inviteLink ?? '').trim()
|
|
|
if (!inviteLink) {
|
|
|
@@ -160,20 +190,20 @@ export class TaskExecutor {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- await this.processTaskItem(task, taskItem, sender, workerTgClient, inviteGroupEntity)
|
|
|
+ await this.processTaskItem(task, taskItem, tgUser, workerTgClient, inviteGroupEntity)
|
|
|
} catch (error) {
|
|
|
const msg = error instanceof Error ? error.message : '未知错误'
|
|
|
- if (sender && this.isSessionRevokedMessage(msg)) {
|
|
|
- await this.handleSessionRevoked(sender)
|
|
|
+ if (tgUser && this.isSessionRevokedMessage(msg)) {
|
|
|
+ await this.handleSessionRevoked(tgUser)
|
|
|
await workerTgClient.disconnect()
|
|
|
- sender = null
|
|
|
- senderSentInRound = 0
|
|
|
+ tgUser = null
|
|
|
+ accountUsageInRound = 0
|
|
|
}
|
|
|
} finally {
|
|
|
- senderSentInRound++
|
|
|
- if (sender) {
|
|
|
- const used = (this.senderUsageInBatch.get(sender.id) ?? 0) + 1
|
|
|
- this.senderUsageInBatch.set(sender.id, used)
|
|
|
+ accountUsageInRound++
|
|
|
+ if (tgUser) {
|
|
|
+ const used = (this.accountUsageInBatch.get(tgUser.id) ?? 0) + 1
|
|
|
+ this.accountUsageInBatch.set(tgUser.id, used)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -233,7 +263,7 @@ export class TaskExecutor {
|
|
|
task: Task,
|
|
|
item: TaskItem,
|
|
|
sender: TgUser,
|
|
|
- workerTgClient: TgClientService,
|
|
|
+ workerTgClient: TgClientManager,
|
|
|
inviteGroupEntity: any | null
|
|
|
): Promise<void> {
|
|
|
if (task.type === TaskType.INVITE_TO_GROUP) {
|
|
|
@@ -247,7 +277,7 @@ export class TaskExecutor {
|
|
|
task: Task,
|
|
|
item: TaskItem,
|
|
|
sender: TgUser,
|
|
|
- workerTgClient: TgClientService
|
|
|
+ workerTgClient: TgClientManager
|
|
|
): Promise<void> {
|
|
|
const message = String(task.payload?.message ?? '').trim()
|
|
|
if (!message) {
|
|
|
@@ -314,7 +344,7 @@ export class TaskExecutor {
|
|
|
task: Task,
|
|
|
item: TaskItem,
|
|
|
sender: TgUser,
|
|
|
- workerTgClient: TgClientService,
|
|
|
+ workerTgClient: TgClientManager,
|
|
|
inviteGroupEntity: any | null
|
|
|
): Promise<void> {
|
|
|
try {
|
|
|
@@ -471,74 +501,76 @@ export class TaskExecutor {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 刷新发送者缓存
|
|
|
+ * 刷新 tgUser 账号缓存
|
|
|
*/
|
|
|
- private async refreshSenderCache(): Promise<void> {
|
|
|
- this.senderCache = await this.senderRepo.find({
|
|
|
+ private async refreshAccountCache(): Promise<void> {
|
|
|
+ this.accountCache = await this.senderRepo.find({
|
|
|
where: { delFlag: false },
|
|
|
- order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
|
|
|
+ order: { lastUsageTime: 'ASC', usageCount: 'ASC' },
|
|
|
+ take: this.currentAccountCacheTake
|
|
|
})
|
|
|
- this.senderCursor = 0
|
|
|
+ this.accountCursor = 0
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 选择可用的发送者
|
|
|
+ * 选择可用的tgUser 账号
|
|
|
*/
|
|
|
- private async pickSender(): Promise<TgUser> {
|
|
|
- if (this.senderCache.length === 0) {
|
|
|
- this.senderCache = await this.senderRepo.find({
|
|
|
+ private async pickAccount(): Promise<TgUser> {
|
|
|
+ if (this.accountCache.length === 0) {
|
|
|
+ this.accountCache = await this.senderRepo.find({
|
|
|
where: { delFlag: false },
|
|
|
- order: { lastUsageTime: 'ASC', usageCount: 'ASC' }
|
|
|
+ order: { lastUsageTime: 'ASC', usageCount: 'ASC' },
|
|
|
+ take: this.currentAccountCacheTake
|
|
|
})
|
|
|
- this.senderCursor = 0
|
|
|
+ this.accountCursor = 0
|
|
|
}
|
|
|
|
|
|
- if (this.senderCache.length === 0) {
|
|
|
- throw new Error('暂无可用 sender 账号')
|
|
|
+ if (this.accountCache.length === 0) {
|
|
|
+ throw new Error('暂无可用 tgUser 账号')
|
|
|
}
|
|
|
|
|
|
- const total = this.senderCache.length
|
|
|
+ const total = this.accountCache.length
|
|
|
for (let i = 0; i < total; i++) {
|
|
|
- const index = (this.senderCursor + i) % total
|
|
|
- const sender = this.senderCache[index]
|
|
|
- const used = this.senderUsageInBatch.get(sender.id) ?? 0
|
|
|
- if (used < this.currentSenderSendLimit) {
|
|
|
- this.senderCursor = (index + 1) % total
|
|
|
- return sender
|
|
|
+ const index = (this.accountCursor + i) % total
|
|
|
+ const account = this.accountCache[index]
|
|
|
+ const used = this.accountUsageInBatch.get(account.id) ?? 0
|
|
|
+ if (used < this.currentAccountLimit) {
|
|
|
+ this.accountCursor = (index + 1) % total
|
|
|
+ return account
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- this.app.log.info('所有 sender 均已达到当前批次上限,重置计数后重新轮询')
|
|
|
- this.senderUsageInBatch.clear()
|
|
|
- this.senderCursor = 0
|
|
|
- return this.senderCache[0]
|
|
|
+ this.app.log.info('所有 tgUser 均已达到当前批次上限,重置计数后重新轮询')
|
|
|
+ this.accountUsageInBatch.clear()
|
|
|
+ this.accountCursor = 0
|
|
|
+ return this.accountCache[0]
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 确保发送者有有效的会话字符串
|
|
|
*/
|
|
|
- private async ensureSessionString(sender: TgUser): Promise<string> {
|
|
|
- if (sender.sessionStr) {
|
|
|
- return sender.sessionStr
|
|
|
+ private async ensureSessionString(tgUser: TgUser): Promise<string> {
|
|
|
+ if (tgUser.sessionStr) {
|
|
|
+ return tgUser.sessionStr
|
|
|
}
|
|
|
|
|
|
- if (sender.dcId && sender.authKey) {
|
|
|
- const session = buildStringSessionByDcIdAndAuthKey(sender.dcId, sender.authKey)
|
|
|
- await this.senderRepo.update(sender.id, { sessionStr: session })
|
|
|
+ if (tgUser.dcId && tgUser.authKey) {
|
|
|
+ const session = buildStringSessionByDcIdAndAuthKey(tgUser.dcId, tgUser.authKey)
|
|
|
+ await this.senderRepo.update(tgUser.id, { sessionStr: session })
|
|
|
return session
|
|
|
}
|
|
|
|
|
|
- throw new Error(`sender=${sender.id} 缺少 session 信息`)
|
|
|
+ throw new Error(`tgUser=${tgUser.id} 缺少 session 信息`)
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 处理会话被撤销的情况
|
|
|
*/
|
|
|
- private async handleSessionRevoked(sender: TgUser): Promise<void> {
|
|
|
- await this.senderRepo.update(sender.id, { delFlag: true })
|
|
|
- this.senderCache = this.senderCache.filter(s => s.id !== sender.id)
|
|
|
- this.senderCursor = 0
|
|
|
- this.app.log.warn(`sender=${sender.id} session 失效,已删除`)
|
|
|
+ private async handleSessionRevoked(tgUser: TgUser): Promise<void> {
|
|
|
+ await this.senderRepo.update(tgUser.id, { delFlag: true })
|
|
|
+ this.accountCache = this.accountCache.filter(a => a.id !== tgUser.id)
|
|
|
+ this.accountCursor = 0
|
|
|
+ this.app.log.warn(`tgUser=${tgUser.id} session 失效,已删除`)
|
|
|
}
|
|
|
|
|
|
/**
|