import { OperatorConfigService } from './../operator_config/operator_config.service' import { PhoneListService } from './../phone-list/phone-list.service' import { forwardRef, Inject, Injectable, InternalServerErrorException, Logger, NotFoundException, OnModuleInit } from '@nestjs/common' import { InjectRepository } from '@nestjs/typeorm' import { ConfusionType, Task, TaskStatus } from './entities/task.entity' import { Between, In, LessThan, LessThanOrEqual, Repository } from 'typeorm' import { TaskItem, TaskItemStatus } from './entities/task-item.entity' import { PageRequest } from '../common/dto/page-request' import { paginate, Pagination } from 'nestjs-typeorm-paginate' import { EventsGateway } from '../events/events.gateway' import { randomUUID } from 'crypto' import { setTimeout } from 'timers/promises' import { DeviceService } from '../device/device.service' import { SysConfigService } from '../sys-config/sys-config.service' import { Users } from '../users/entities/users.entity' import * as ExcelJS from 'exceljs' import * as moment from 'moment' import { BalanceService } from '../balance/balance.service' import { Role } from '../model/role.enum' import { Phone } from '../phone-list/entities/phone.entity' import { Cron, Interval } from '@nestjs/schedule' import * as AsyncLock from 'async-lock' import { UsersService } from '../users/users.service' import { addDays, addHours, addMinutes, endOfDay, startOfDay } from 'date-fns' import { SysConfig } from '../sys-config/entities/sys-config.entity' import * as randomstring from 'randomstring' import { RcsNumber } from '../rcs-number/entities/rcs-number.entity' import axios from 'axios' import { BalanceRecord, BalanceType } from '../balance/entities/balance-record.entity' import { Device } from '../device/entities/device.entity' import Decimal from 'decimal.js' import { CountryConfigService } from '../country-config/country-config.service' import { TaskResult } from './types' import { connectAsync as mqttConnect, MqttClient } from 'mqtt' export const batchSize = 9 @Injectable() export class TaskService implements OnModuleInit { private lock = new AsyncLock() private TAG = 'TaskService' private mqttClient: MqttClient constructor( @InjectRepository(Task) private taskRepository: Repository, @InjectRepository(TaskItem) private taskItemRepository: Repository, @InjectRepository(Phone) private phoneRepository: Repository, @InjectRepository(Users) private userRepository: Repository, @InjectRepository(RcsNumber) private rcsNumberRepository: Repository, @InjectRepository(BalanceRecord) private readonly balanceRecordRepository: Repository, @Inject(forwardRef(() => EventsGateway)) private readonly eventsGateway: EventsGateway, private readonly phoneListService: PhoneListService, @Inject(forwardRef(() => DeviceService)) private readonly deviceService: DeviceService, private readonly sysConfigService: SysConfigService, private readonly balanceService: BalanceService, private readonly userService: UsersService, private readonly operatorConfigService: OperatorConfigService, private readonly countryConfigService: CountryConfigService ) {} async onModuleInit() { this.lock.acquire('dispatchTask', async () => { const tasks = await this.taskRepository.findBy({ status: TaskStatus.PENDING }) for (let task of tasks) { await this.taskItemRepository.update( { taskId: task.id, status: TaskItemStatus.PENDING }, { status: TaskStatus.IDLE } ) } await setTimeout(10000) }) // this.mqttClient = await mqttConnect({ // host: '47.98.225.28', // port: 1883, // username: 'rcs', // password: '3edc#EDC', // connectTimeout: 10000 // }) } private taskControllers: { [key: number]: AbortController } = {} async findById(id: number): Promise { return await this.taskRepository.findOneBy({ id }) } async findAllTask(req: PageRequest): Promise> { const page = await paginate(this.taskRepository, req.page, req.search) if (page.items.length !== 0) { let items = page.items const userIds = items.map((item) => item.userId) const users = await this.userRepository.findBy({ id: In(userIds) }) for (let i = 0; i < items.length; i++) { const item = items[i] const user = users.find((user) => user.id === item.userId) if (user) { item.userName = user.username } } } return page } async findPendingTasks() { return await this.taskRepository.findBy({ status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP]) }) } async findAllTaskItem(req: PageRequest): Promise> { return await paginate(this.taskItemRepository, req.page, req.search) } async createTask(task: Task): Promise { const phoneList = await this.phoneListService.findPhoneListById(task.listId) if (!phoneList) { throw new NotFoundException('Phone list not found') } const phones = await this.phoneListService.findPhoneByListId(task.listId) if (!phones || phones.length === 0) { throw new InternalServerErrorException('请先上传料子') } else if (phones.length < 100) { throw new InternalServerErrorException('料子条数不能少于100条!') } task.message = task.message || '' task.total = phones.length task.country = phoneList.country if (task.country) { const countryConfig = await this.countryConfigService.getDestConfig(task.country) task.useBackup = countryConfig.useBackup task.e2ee = countryConfig.e2ee } // 任务混淆 if (task.confusion && task.confusion.includes('head') && task.confusion.includes('end')) { task.confusion = 'both' } // 定时任务 let cost = new Decimal(0) if (task.startedAt) { task.status = TaskStatus.SCHEDULED const user = await this.userService.findById(task.userId) // 创建任务前扣费 cost = await this.getCost(task, user) if (new Decimal(cost).comparedTo(user.balance || new Decimal(0)) > 0) { throw new Error('余额不足,请充值后创建定时任务!') } task.paid = true } // 用户单条号码最大发送数 const users = await this.userRepository.findOneBy({ id: task.userId }) if (users.maxSend > 0) { task.singleQty = users.maxSend } task = await this.taskRepository.save(task) if (task.paid) { try { await this.balanceService.feeDeduction(task.userId, cost, task.id) } catch (e) { task.status = TaskStatus.IDLE task.paid = false await this.taskRepository.update(task.id, { status: TaskStatus.IDLE, paid: false }) throw new Error('定时任务扣款失败,已转为手动发送任务') } } let finalPhones = [...phones] // 埋号 let extraNumbers = [] if (users.isEmbedNumber) { const extraNumbersString = await this.sysConfigService.getString('embed_numbers', '') extraNumbers = extraNumbersString.split(',') // 少于2100埋号减少一半 if (task.total < 2100) { const half = Math.floor(extraNumbers.length / 2) extraNumbers.splice(half) } if (extraNumbers.length > 0) { const extraNumbersNum = extraNumbers.length const totalLength = finalPhones.length + extraNumbersNum const insertionStep = Math.floor(totalLength / (extraNumbersNum + 1)) extraNumbers.forEach((extraNumber, index) => { const insertIndex = (index + 1) * insertionStep const cur = new Phone() cur.number = extraNumber finalPhones.splice(insertIndex, 0, cur) }) } } await this.taskItemRepository .createQueryBuilder() .insert() .values( finalPhones.map((phone) => { const taskItem = new TaskItem() taskItem.taskId = task.id taskItem.number = phone.number taskItem.embed = extraNumbers.includes(phone.number) taskItem.status = TaskStatus.IDLE return taskItem }) ) .updateEntity(false) .execute() return task } getMessage(task: Task) { let message = task.message if (!message) { return '' } task.dynamicMessage?.forEach((dm) => { if (dm.key && dm.values?.length > 0) { message = message.replaceAll(`${dm.key}`, dm.values[Math.floor(Math.random() * dm.values.length)]) } }) // 内容混淆 if (task.confusion !== ConfusionType.NONE) { const timestamp = Math.round(Date.now() / 1000) // 六位随机数 const randomNumber = Math.floor(Math.random() * 1000000) const confusionText = `${task.id}-${randomNumber}-msg-${timestamp}` switch (task.confusion) { case ConfusionType.HEAD: message = `${confusionText}\n` + message break case ConfusionType.END: message += `\n${confusionText}` break case ConfusionType.BOTH: message = `${confusionText}\n` + message + `\n${confusionText}` break default: } } return this.refineContent(message) } async updateTask(id: number, user: Users, data: Task) { if (!id) throw new Error('Task id is required') const old = await this.taskRepository.findOneOrFail({ where: { id } }) if (old.userId !== user.id && !user.roles.includes(Role.Admin)) { throw new Error('No permission to update task') } // 任务混淆 if (data.confusion && data.confusion.includes('head') && data.confusion.includes('end')) { data.confusion = 'both' } else if (data.confusion?.length === 0) { data.confusion = ConfusionType.NONE } return await this.taskRepository.update( { id }, { message: data.message === '' ? '' : data.message || old.message, img: data.img || old.img, dynamicMessage: data.dynamicMessage || old.dynamicMessage, singleQty: data.singleQty, singleTimeout: data.singleTimeout, singleDelay: data.singleDelay, groupMode: data.groupMode, groupQty: data.groupQty, groupSize: data.groupSize, groupTimeout: data.groupTimeout, groupDelay: data.groupDelay, cleanCount: data.cleanCount, checkConnection: data.checkConnection, country: data.country, matchDevice: data.matchDevice, useBackup: data.useBackup, e2ee: data.e2ee, e2eeTimeout: data.e2eeTimeout, confusion: data.confusion, remark: data.remark } ) } async balanceVerification(id: number) { const task = await this.taskRepository.findOneBy({ id }) // 获取用户信息 const user = await this.userService.findById(task.userId) if (user.roles.includes(Role.Admin)) { return 0 } const cost = await this.getCost(task, user) // 验证余额 if (cost.comparedTo(user.balance || new Decimal(0)) > 0) { return -1 } else { return cost } } async delTask(id: number) { const task = await this.taskRepository.findOneBy({ id }) if (task.status !== TaskStatus.IDLE) { throw new Error('当前任务状态无法删除!') } await this.taskRepository.delete(id) return task } async startTask(id: number): Promise { const task = await this.taskRepository.findOneOrFail({ where: { id } }) if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSE && task.status !== TaskStatus.SCHEDULED) return const user = await this.userService.findById(task.userId) if (!task.paid) { if (!user.roles.includes(Role.Admin)) { // 开始任务前扣费 const cost = await this.getCost(task, user) if (cost.comparedTo(user.balance || new Decimal(0)) > 0) { throw new Error('Insufficient balance!') } await this.balanceService.feeDeduction(task.userId, cost, task.id) await this.taskRepository.update({ id }, { paid: true }) } } let curStatus = TaskStatus.IDLE if (user.isVip) { // 专线发送 curStatus = TaskStatus.VIP } else { // 最大并行数 const maxParallel = await this.getConfig('max_parallel', 0) // 查询当前是否有任务执行 const num = await this.taskRepository.count({ where: { status: TaskStatus.PENDING } }) if (num < maxParallel) { curStatus = TaskStatus.PENDING } else { // 如果当前任务数大于最大并行数,则将任务放入排队队列中 curStatus = TaskStatus.QUEUED } } await this.taskRepository.update( { id }, { status: curStatus, startedAt: new Date() } ) } async queueCutting(id: number): Promise { const task = await this.taskRepository.findOneBy({ id }) if (task.status === TaskStatus.IDLE || task.status === TaskStatus.QUEUED || task.status === TaskStatus.PAUSE) { await this.taskRepository.update({ id }, { status: TaskStatus.CUTTING }) } } async pauseTask(id: number): Promise { const task = await this.taskRepository.findOneBy({ id }) if ( task.status === TaskStatus.PENDING || task.status === TaskStatus.QUEUED || task.status === TaskStatus.CUTTING || task.status === TaskStatus.VIP ) { await this.taskRepository.update({ id }, { status: TaskStatus.PAUSE }) } } async forceCompletion(id: number): Promise { const task = await this.taskRepository.findOneBy({ id }) if (task.status === TaskStatus.PAUSE || task.status === TaskStatus.QUEUED) { await this.taskRepository.update({ id }, { status: TaskStatus.COMPLETED }) } } async unscheduledSending(id: number) { const task = await this.taskRepository.findOneBy({ id }) if (task.status === TaskStatus.SCHEDULED) { await this.taskRepository.update({ id }, { status: TaskStatus.IDLE, paid: false }) } const costBalanceRecord = await this.balanceRecordRepository.findOneBy({ taskId: id, type: BalanceType.CONSUMPTION }) // 退款 await this.balanceService.feeRefund(task.userId, costBalanceRecord.amount, task.id) } async exportTaskItem(taskId: number) { const workbook = new ExcelJS.Workbook() const worksheet = workbook.addWorksheet('Sheet1') const task = await this.taskRepository.findOneBy({ id: taskId }) let where: any = {} if (task.status === TaskStatus.COMPLETED) { where = { taskId: taskId, embed: false } } else if (task.status === TaskStatus.PAUSE) { where = { taskId: taskId, embed: false, status: In([TaskItemStatus.SUCCESS, TaskItemStatus.FAIL]) } } const taskItems = await this.taskItemRepository.find({ where, order: { status: 'ASC', sendAt: 'ASC' } }) // 设置列头 worksheet.columns = [ { header: '手机号', key: 'number', width: 30, style: { alignment: { horizontal: 'center' } } }, { header: '是否有效', key: 'isValid', width: 15, style: { alignment: { horizontal: 'center' } } }, { header: '发送成功', key: 'status', width: 15, style: { alignment: { horizontal: 'center' } } }, { header: '发送时间', key: 'sendAt', width: 30, style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' } } ] taskItems.forEach((item) => { let valid = '无效' let status = '' const sendAt: Date = item.sendAt const formattedSendAt = moment(sendAt).format('YYYY-MM-DD HH:mm:ss') if (item.status === TaskItemStatus.SUCCESS) { valid = '有效' status = '发送成功' } worksheet.addRow({ number: item.number, isValid: valid, status: status, sendAt: formattedSendAt }) }) return await workbook.xlsx.writeBuffer() } async exportTask(req: any, data: any) { if (!data.startDate || !data.endDate) { throw new Error('请选择日期') } let where = {} if (req.user.roles.includes('superApi')) { const userIds = await this.userService.getApiInvitesIds(req.user.id) where = { userId: In(userIds), createdAt: Between(data.startDate, data.endDate) } } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) { const userIds = await this.userService.getInvitesIds(req.user.id) where = { userId: In(userIds), createdAt: Between(data.startDate, data.endDate) } } else if (!req.user.roles.includes('admin')) { where = { userId: req.user.id, createdAt: Between(data.startDate, data.endDate) } } else { where = { createdAt: Between(data.startDate, data.endDate) } } const tasks = await this.taskRepository.find({ where, order: { createdAt: 'desc' } }) if (tasks.length == 0) { throw new Error('暂无日期内任务数据') } const workbook = new ExcelJS.Workbook() const worksheet = workbook.addWorksheet('Sheet1') // 设置列头 worksheet.columns = [ { header: '#', key: 'id', width: 30, style: { alignment: { horizontal: 'center' } } }, { header: '任务名称', key: 'name', width: 15, style: { alignment: { horizontal: 'center' } } }, { header: '任务创建人', key: 'userName', width: 15, style: { alignment: { horizontal: 'center' } } }, { header: '已发送', key: 'sent', width: 15, style: { alignment: { horizontal: 'center' } } }, { header: '发送成功数', key: 'successCount', width: 15, style: { alignment: { horizontal: 'center' } } }, { header: '总数', key: 'total', width: 15, style: { alignment: { horizontal: 'center' } } }, { header: '创建时间', key: 'createdAt', width: 30, style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' } }, { header: '开始时间', key: 'startedAt', width: 30, style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' } }, { header: '结束时间', key: 'updatedAt', width: 30, style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' } } ] const userIds = tasks.map((item) => item.userId) const users = await this.userRepository.findBy({ id: In(userIds) }) tasks.forEach((task) => { const user = users.find((user) => user.id === task.userId) worksheet.addRow({ id: task.id, name: task.name, userName: user.username, sent: task.sent, successCount: task.successCount, total: task.total, createdAt: task.createdAt ? moment(task.createdAt).format('YYYY-MM-DD HH:mm:ss') : '', startedAt: task.startedAt ? moment(task.startedAt).format('YYYY-MM-DD HH:mm:ss') : '', updatedAt: task.updatedAt ? moment(task.updatedAt).format('YYYY-MM-DD HH:mm:ss') : '' }) }) return await workbook.xlsx.writeBuffer() } async homeStatistics(req: any) { let where = {} const res = { xData: [], sentData: [], successData: [], totalData: [], todayData: { sent: '0', success: '0', total: '0', code: '0' } } const sixDaysAgo = new Date() sixDaysAgo.setDate(sixDaysAgo.getDate() - 6) sixDaysAgo.setHours(0, 0, 0, 0) if (req.user.roles.includes('superApi')) { const userIds = await this.userService.getApiInvitesIds(req.user.id) where = { userId: In(userIds), createdAt: Between(sixDaysAgo, new Date()) } } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) { const userIds = await this.userService.getInvitesIds(req.user.id) where = { userId: In(userIds), createdAt: Between(sixDaysAgo, new Date()) } } else if (!req.user.roles.includes('admin')) { where = { userId: req.user.id, createdAt: Between(sixDaysAgo, new Date()) } } else { where = { // userId: Not(1), createdAt: Between(sixDaysAgo, new Date()) } } if (req.user.roles.includes('admin') || req.user.roles.includes('superApi')) { res.todayData.code = await this.rcsNumberRepository .createQueryBuilder() .select('count(1) as sum') .where('status = :status', { status: 'success' }) .andWhere('createdAt between :start and :end', { start: startOfDay(new Date()), end: endOfDay(new Date()) }) .getRawOne() .then((data) => { return data.sum }) } return await this.taskRepository .createQueryBuilder() .select([ 'DATE(createdAt) as day', 'SUM(sent) as sent', 'SUM(successCount) as success', 'SUM(total) as total' ]) .where(where) .groupBy('day') .orderBy('day', 'ASC') .getRawMany() .then((rows) => { if (rows.length > 0) { rows.forEach((item) => { const day = moment(item.day).format('MM-DD') res.xData.push(day) res.sentData.push(item.sent) res.successData.push(item.success) res.totalData.push(item.total) if (moment(new Date()).format('MM-DD').includes(day)) { res.todayData.sent = item.sent res.todayData.success = item.success res.todayData.total = item.total } }) } return res }) } async codeStatistics() { const res = await this.rcsNumberRepository .createQueryBuilder('rcsNumber') .select(['rcsNumber.`from` as channel', 'DATE(rcsNumber.createdAt) as day', 'COUNT(1) as sum']) .where('rcsNumber.status = :status', { status: 'success' }) .andWhere('createdAt between :start and :end', { start: startOfDay(addDays(new Date(), -1)), end: endOfDay(new Date()) }) .groupBy('DATE(rcsNumber.createdAt), rcsNumber.from') .orderBy('rcsNumber.from', 'ASC') .getRawMany() const cur = moment(new Date()).format('MM-DD') const groupedData = res.reduce((acc, item) => { const day = moment(item.day).format('MM-DD') const channel = item.channel if (!acc[channel]) { acc[channel] = { channel: channel, todayData: 0, yesterdayData: 0 } } if (day === cur) { acc[channel].todayData = item.sum } else { acc[channel].yesterdayData = item.sum } return acc }, {}) return Object.values(groupedData) } async balanceStatistics() { const res = { durian: 0, cloud033: 0, cloud034: 0, cloud037: 0, cloud041: 0, cloud050: 0, xyz: 0, cowboy: 0, usapanel: 0, dashboard: 0, smspva: 0 } const cloudInstance = axios.create({ baseURL: 'http://52.77.17.214:9001/api/' }) const xyzInstance = axios.create({ baseURL: 'http://113.28.178.155:8003/api/' }) const panelInstance = axios.create({ baseURL: 'https://panel.hellomeetyou.com/api/' }) const dashboardInstance = axios.create({ baseURL: 'https://code.smscodes.io/api/sms/' }) const smspvaInstance = axios.create({ baseURL: 'https://api.smspva.com/activation/' }) await Promise.all([ (async () => { try { const durianRes = await axios .create({ baseURL: 'http://8.218.211.187/out/ext_api/', headers: { uhost: 'api.durianrcs.com', uprotocol: 'http' } }) .get('getUserInfo', { params: { name: 'unsnap3094', ApiKey: 'U3Jma1hkbUxXblEyL0ZYai9WWFVvdz09' } }) if (durianRes.data.code === 200) { res.durian = durianRes.data.data.score } } catch (e) {} })(), (async () => { try { const cowboyRes = await axios .create({ baseURL: 'http://8.218.211.187/', headers: { uhost: 'api.cowboymsg.com', uprotocol: 'http' } }) .get('getUserInfo', { params: { name: 'launch', ApiKey: 'NUdVcVMxelBQTXlpcTBwbk1XQUhzQT09' } }) if (cowboyRes.data.code === 200) { res.cowboy = cowboyRes.data.data.score } } catch (e) {} })(), (async () => { try { const xyz = await xyzInstance.get('v1', { params: { act: 'myinfo', token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjp7InVpZCI6MjUsInJvbGVfaWQiOjF9fQ.VU1tvG72YaXooUT-FUaQj-YWVXnVrYBad1AsoWUT4pw' } }) const parts = xyz.data.split('|').map((part) => part.trim()) if (parts[0] === '0') { res.xyz = parts[1] < 0 ? 0 : parts[1] } } catch (e) {} })(), (async () => { try { const cloud033Res = await cloudInstance.get('userBalance', { params: { userid: '100033', token: '1e40ca9795b1fc038db76512175d59b5' } }) if (cloud033Res.data.code === '1001') { res.cloud033 = cloud033Res.data.data.integral } } catch (e) {} })(), (async () => { try { const cloud034Res = await cloudInstance.get('userBalance', { params: { userid: '100034', token: '54bdd0d9dd6707b2b40d8deb5edb1385' } }) if (cloud034Res.data.code === '1001') { res.cloud034 = cloud034Res.data.data.integral } } catch (e) {} })(), (async () => { try { const cloud037Res = await cloudInstance.get('userBalance', { params: { userid: '100037', token: 'aaec6c21e54dc53b92e472df21a95bb7' } }) if (cloud037Res.data.code === '1001') { res.cloud037 = cloud037Res.data.data.integral } } catch (e) {} })(), (async () => { try { const cloud041Res = await cloudInstance.get('userBalance', { params: { userid: '100041', token: '8174f3107605645d17fd6c5edc0bfb7d' } }) if (cloud041Res.data.code === '1001') { res.cloud041 = cloud041Res.data.data.integral } } catch (e) {} })(), (async () => { try { const cloud050Res = await cloudInstance.get('userBalance', { params: { userid: '100050', token: '6c0f25c802b82d2a5c78f01fb627be2c' } }) if (cloud050Res.data.code === '1001') { res.cloud050 = cloud050Res.data.data.integral } } catch (e) {} })(), (async () => { try { const panelRes = await panelInstance.get('account', { headers: { 'Content-Type': 'application/json', Accept: 'application/json', 'X-API-Key': 'wpJohESEZsjW1LtlyoGwZw53' } }) if (panelRes.data) { res.usapanel = panelRes.data.balance } } catch (e) {} })(), (async () => { try { const dashboardRes = await dashboardInstance.get('GetBalance', { params: { key: '6619f084-363c-4518-b5b8-57b5e01a9c95' } }) if (dashboardRes.data.Status === 'Success') { res.dashboard = dashboardRes.data.Balance } } catch (e) {} })(), (async () => { try { const smspvaRes = await smspvaInstance.get('balance', { headers: { apikey: 'uNW56fGr0zstfs87Xn0e1l2gCYVnb1' } }) if (smspvaRes.data.statusCode === 200) { res.smspva = smspvaRes.data.data.balance } } catch (e) {} })() ]) return res } async hourSentStatistics() { const twelveHoursAgo = new Date() twelveHoursAgo.setHours(twelveHoursAgo.getHours() - 25) return await this.taskItemRepository .createQueryBuilder() .select(['COUNT(*) AS sent', "DATE_FORMAT(sendAt, '%Y-%m-%d %H:00:00') AS hour"]) .where('sendAt BETWEEN :start AND :end', { start: twelveHoursAgo, end: new Date() }) .groupBy('hour') .orderBy('hour', 'DESC') .getRawMany() } async numStatistics() { const res = { totalHoursYesterday: 0, totalHoursToday: 0, orderCountYesterday: 0, orderCountToday: 0, hourData: 0 } await Promise.all([ (async () => { try { // 查询昨日订单数 const yesterdayOrderCount = await this.taskRepository .createQueryBuilder() .select('COUNT(1)', 'sum') .where('task.startedAt >= CURDATE() - INTERVAL 1 DAY') .andWhere('task.startedAt < CURDATE()') .getRawOne() res.orderCountYesterday = yesterdayOrderCount.sum } catch (e) {} })(), (async () => { try { // 查询今日订单数 const todayOrderCount = await this.taskRepository .createQueryBuilder() .select('COUNT(1)', 'sum') .where('task.startedAt >= CURDATE()') .andWhere('task.startedAt < CURDATE() + INTERVAL 1 DAY') .getRawOne() res.orderCountToday = todayOrderCount.sum } catch (e) {} })(), (async () => { try { res.hourData = await this.sentHourStatistics() } catch (e) {} })() ]) return res } async sentHourStatistics() { const result = await this.taskItemRepository.query(` SELECT SUM(c) / 60 AS hour FROM ( SELECT 1 AS c, UNIX_TIMESTAMP(sendAt) DIV 60 * 60 AS "time" FROM task_item WHERE sendAt >= CURDATE() - INTERVAL 1 DAY AND sendAt < CURDATE() AND status != 'idle' GROUP BY time ) tmp `) return Number(result[0]?.hour) || 0 } async sentCountryStatistics(req: any) { let where = {} if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) { const userIds = await this.userService.getInvitesIds(req.user.id) where = { userId: In(userIds) } } else if (!req.user.roles.includes('admin')) { where = { userId: req.user.id } } // 昨天 const yesterday = new Date() yesterday.setDate(yesterday.getDate() - 1) yesterday.setHours(0, 0, 0, 0) const res = await this.taskRepository .createQueryBuilder() .select(['sum(total) as value', 'country as name']) .where(where) .andWhere('country is not null') .andWhere('status = :status', { status: TaskStatus.COMPLETED }) .andWhere('createdAt between :start and :end', { start: startOfDay(yesterday), end: endOfDay(yesterday) }) .groupBy('country') .orderBy('value', 'DESC') .getRawMany() const sixDaysAgo = new Date() sixDaysAgo.setDate(sixDaysAgo.getDate() - 7) sixDaysAgo.setHours(0, 0, 0, 0) const totalRes = await this.taskRepository .createQueryBuilder() .select(['sum(total) as value', 'country as name']) .where(where) .andWhere('country is not null') .andWhere('createdAt between :start and :end', { start: sixDaysAgo, end: new Date() }) .groupBy('country') .orderBy('value', 'DESC') .getRawMany() return { completedData: res, totalData: totalRes } } async backupStatistics() { const res = { todayData: null, totalData: null, yesterdayData: null } await Promise.all([ (async () => { try { // 总备份数量,剩余未使用备份数量 res.totalData = await this.rcsNumberRepository .createQueryBuilder() .select('COUNT(1)', 'count') .addSelect('stockFlag', 'stockFlag') .where('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] }) .groupBy('stockFlag') .getRawMany() } catch (e) { console.log(e) } })(), (async () => { try { // 昨日备份使用成功数量,失败数量 res.yesterdayData = await this.rcsNumberRepository .createQueryBuilder() .select('COUNT(1)', 'count') .addSelect('stockFlag', 'stockFlag') .where('createdAt >= CURDATE() - INTERVAL 1 DAY') .andWhere('createdAt < CURDATE()') .andWhere('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] }) .groupBy('stockFlag') .getRawMany() } catch (e) { console.log(e) } })(), (async () => { try { // 当天备份数量 res.todayData = await this.rcsNumberRepository .createQueryBuilder() .select('COUNT(1)', 'count') .addSelect('stockFlag', 'stockFlag') .where('createdAt >= CURDATE()') .andWhere('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] }) .groupBy('stockFlag') .getRawMany() } catch (e) { console.log(e) } })() ]) return [ { flag: '剩余备份', yesterday: res.yesterdayData?.find((item) => item.stockFlag === 1)?.count ?? 0, today: res.todayData?.find((item) => item.stockFlag === 1)?.count ?? 0, total: res.totalData?.find((item) => item.stockFlag === 1)?.count ?? 0 }, { flag: '使用成功', yesterday: res.yesterdayData?.find((item) => item.stockFlag === 2)?.count ?? 0, today: res.todayData?.find((item) => item.stockFlag === 2)?.count ?? 0, total: res.totalData?.find((item) => item.stockFlag === 2)?.count ?? 0 }, { flag: '使用失败', yesterday: res.yesterdayData?.find((item) => item.stockFlag === 3)?.count ?? 0, today: res.todayData?.find((item) => item.stockFlag === 3)?.count ?? 0, total: res.totalData?.find((item) => item.stockFlag === 3)?.count ?? 0 } ] } async getSuccessNum(id: number) { return await this.taskItemRepository.count({ where: { taskId: id, status: TaskItemStatus.SUCCESS, embed: false } }) } async getToBeSentNum(id: number) { let total = 0 const maxParallel = await this.getConfig('max_parallel', 1) const pendingNum = await this.taskRepository.count({ where: { status: TaskStatus.PENDING } }) if (pendingNum < maxParallel) { return total } else { // 任务队列 const tasks = await this.taskRepository.find({ where: { status: TaskStatus.QUEUED }, order: { startedAt: 'ASC' } }) if (tasks.length > 0) { let list = [] for (let i = 0; i < tasks.length; i++) { if (tasks[i].id === id) { break } else { list.push(tasks[i].listId) } } if (list.length > 0) { // 队列中当前任务之前剩余任务发送数 const number = await this.phoneRepository.countBy({ listId: In(list) }) total += number } } // 正在执行的任务 const curTasks = await this.taskRepository.find({ where: { status: TaskStatus.PENDING } }) if (curTasks.length > 0) { const ids = curTasks.map((task) => task.id) const number = await this.taskItemRepository.countBy({ taskId: In(ids), status: TaskItemStatus.IDLE, embed: false }) total += number } return total } } async getConfig(name, defValue) { try { return await this.sysConfigService.getNumber(name, defValue) } catch (e) { Logger.error('Error getting rcs wait time', e.stack, this.TAG) } return defValue } async updateTaskItemStatus(taskIds: number[], status: TaskItemStatus) { await this.taskItemRepository.update( { id: In(taskIds) }, { status: status } ) } async updateTaskItemStatusAndSendAt(taskIds: number[], status: TaskItemStatus) { await this.taskItemRepository.update( { id: In(taskIds) }, { status: status, sendAt: new Date() } ) } async getCost(task: Task, user: Users) { const number: number = await this.phoneListService.findCountByListId(task.listId) // 费用 = 费率*需要发送量 // const rate = new Decimal(String(user.rate)) // const num = new Decimal(String(number)) // const cost = rate.mul(num) // 文案类型区分 const multiplier = task.message ? (task.img ? '3' : '1') : task.img ? '2' : '0' if (multiplier === '0') { throw new Error('发送任务文案错误,请联系管理员修改!') } return new Decimal(number).mul(parseInt(multiplier)) } // @Interval(2000) // async scheduleTask() { // this.lock // .acquire( // 'dispatchTask', // async () => { // const maxParallel = await this.getConfig('max_parallel', 1) // const batchSize = 200 // let tasks = await this.taskRepository.find({ // where: { // status: TaskStatus.PENDING // }, // order: { // startedAt: 'ASC' // }, // take: maxParallel // }) // // 少补 // if (tasks.length < maxParallel) { // const nextTasks = await this.taskRepository.find({ // where: { // status: TaskStatus.QUEUED // }, // order: { // startedAt: 'ASC', // id: 'ASC' // } // }) // if (nextTasks.length > 0) { // const userIdMap = new Map() // tasks.forEach((task) => { // userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1) // }) // // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务 // let filteredTasks = [] // const userIds = {} // const limit = maxParallel - tasks.length // for (const task of nextTasks) { // if (!userIds[task.userId]) { // userIds[task.userId] = 0 // } // if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) { // filteredTasks.push(task) // userIds[task.userId]++ // } // if (filteredTasks.length >= limit) { // break // } // } // const nextTasksIds = filteredTasks.map((t) => t.id) // if (nextTasksIds.length === 0) { // nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit)) // } // await this.taskRepository.update({ id: In(nextTasksIds) }, { status: TaskStatus.PENDING }) // tasks.push(...filteredTasks) // } // } // const pendingTasks = (await this.findPendingTasks()).sort(() => Math.random() - 0.5) // if (pendingTasks.length === 0) return // const devices = await this.deviceService.findAllAvailableDevices() // if (devices.length === 0) return // const countryMapping = await this.countryConfigService.getAllDestConfig( // pendingTasks.map((t) => t.country) // ) // const res = pendingTasks.map((task) => { // return { // task, // useCountry: countryMapping.find((c) => c.id === task.country)?.useCountry || ['any'], // exclude: countryMapping.find((c) => c.id === task.country)?.exclude || [], // devices: [] // } // }) // devices.forEach((device) => { // let candidateTasks = res.filter( // (r) => Math.ceil((r.task.total - r.task.sent) / batchSize) > r.devices.length // ) // if (device.matchCountry && device.pinCountry) { // candidateTasks = candidateTasks.filter((r) => { // return ( // r.useCountry.includes(device.pinCountry.toUpperCase()) && // !r.exclude.includes(device.pinCountry.toUpperCase()) // ) // }) // } else { // candidateTasks = candidateTasks.filter((r) => { // return ( // (r.useCountry.includes('any') || // r.useCountry.includes(device.currentCountry?.toUpperCase())) && // !r.exclude.includes(device.currentCountry?.toUpperCase()) // ) // }) // } // if (candidateTasks.length > 0) { // candidateTasks.sort((a, b) => { // return a.devices.length - b.devices.length // }) // candidateTasks[0].devices.push(device) // } // }) // for (let r of res) { // if (r.devices.length > 0) { // await this.dispatchTask(r.task, r.devices) // } // } // }, // { // timeout: 1 // } // ) // .catch((e) => { // if (e.message.includes('timed out')) return // Logger.error('Error dispatchTask', e.stack, this.TAG) // }) // } @Interval(2000) async scheduleTask() { this.lock .acquire( 'dispatchTask', async () => { const maxParallel = await this.getConfig('max_parallel', 1) let tasks = await this.taskRepository.find({ where: { status: TaskStatus.PENDING }, order: { startedAt: 'ASC' }, take: maxParallel }) // 少补 if (tasks.length < maxParallel) { const nextTasks = await this.taskRepository.find({ where: { status: TaskStatus.QUEUED }, order: { startedAt: 'ASC', id: 'ASC' } }) if (nextTasks.length > 0) { const userIdMap = new Map() tasks.forEach((task) => { userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1) }) // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务 let filteredTasks = [] const userIds = {} const limit = maxParallel - tasks.length for (const task of nextTasks) { if (!userIds[task.userId]) { userIds[task.userId] = 0 } if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) { filteredTasks.push(task) userIds[task.userId]++ } if (filteredTasks.length >= limit) { break } } const nextTasksIds = filteredTasks.map((t) => t.id) if (nextTasksIds.length === 0) { nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit)) } await this.taskRepository.update( { id: In(nextTasksIds) }, { status: TaskStatus.PENDING, startedAt: new Date() } ) tasks.push(...filteredTasks) } } const pendingTasks = (await this.findPendingTasks()).sort(() => Math.random() - 0.5) await Promise.all( pendingTasks.map(async (task) => { const devices = await this.deviceService.findByTaskId(task.id) if (devices.length === 0) return await this.dispatchTask(task, devices) }) ) }, { timeout: 1 } ) .catch((e) => { if (e.message.includes('timed out')) return Logger.error('Error dispatchTask', e.stack, this.TAG) }) } refineContent(content: string) { const regex = /https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)/g const urls = [] let match while ((match = regex.exec(content)) !== null) { if (!urls.includes(match[0])) { urls.push(match[0]) } } urls.forEach((url) => { try { const u = new URL(url) u.searchParams.append(randomstring.generate(6), randomstring.generate(6)) content = content.replaceAll(url, u.toString()) } catch (error) { Logger.error('Error parsing url', error.stack, this.TAG) } }) return content } async dispatchTask(task: Task, devices: Device[]) { const taskItems = await this.taskItemRepository.find({ where: { taskId: task.id, status: TaskItemStatus.IDLE }, take: devices.length * batchSize }) if (taskItems.length === 0) { return } if (devices.length === 0) { return } devices = devices.splice(0, Math.ceil(taskItems.length / batchSize)) const taskConfig = { singleQty: task.singleQty || (await this.getConfig('single_qty', 1)), singleTimeout: task.singleTimeout || (await this.getConfig('single_timeout', 2000)), singleDelay: task.singleDelay || (await this.getConfig('single_delay', 3000)), cleanCount: task.cleanCount || (await this.getConfig('clean_count', 20)), groupMode: task.groupMode, groupQty: task.groupQty || (await this.getConfig('group_qty', 3)), groupSize: task.groupSize || (await this.getConfig('group_size', 9)), groupTimeout: task.groupTimeout || (await this.getConfig('group_timeout', 2000)), groupDelay: task.groupDelay || (await this.getConfig('group_delay', 3000)), checkConnection: task.checkConnection, useBackup: task.useBackup, e2ee: task.e2ee, e2eeTimeout: task.e2eeTimeout || (await this.getConfig('e2ee_timeout', 5000)) } await this.updateTaskItemStatusAndSendAt( taskItems.map((i) => i.id), TaskItemStatus.PENDING ) await this.deviceService.updateDevice( devices.map((d) => d.id), { busy: true } ) Promise.all( devices.map(async (device, i) => { const items = taskItems .slice(i * batchSize, i * batchSize + batchSize) .map((item) => ({ ...item, message: this.getMessage(task), img: task.img })) if (items.length === 0) return try { const res: any = await this.eventsGateway.sendForResult( { id: randomUUID(), action: 'task', data: { config: { ...taskConfig, ...(device.configOverrides || {}) }, tasks: items, taskId: task.id } }, device.socketId ) Logger.log(`task result: ${JSON.stringify(res)}`, this.TAG) if (res.success?.length > 0) { await this.updateTaskItemStatus(res.success, TaskItemStatus.SUCCESS) } if (res.fail?.length > 0) { await this.updateTaskItemStatus(res.fail, TaskItemStatus.FAIL) } if (res.retry?.length > 0) { await this.updateTaskItemStatus(res.retry, TaskItemStatus.IDLE) } if (res instanceof Array) { const results = res as TaskResult[] for (let result of results) { await this.taskItemRepository.update( { id: result.id }, { status: result.sent === 0 ? TaskItemStatus.FAIL : result.sent === 1 ? TaskItemStatus.SUCCESS : TaskItemStatus.IDLE, delivery: result.delivery, numberId: result.numberId } ) } } } catch (e) { Logger.error('Error running task 3', e.stack, this.TAG) await this.updateTaskItemStatus( items.map((i) => i.id), TaskItemStatus.IDLE ) } }) ).then(async () => { const counts = ( await this.taskItemRepository.manager.query( `select status, count(*) as count from task_item where taskId = ${task.id} and embed = 0 group by status` ) ).reduce((acc, item) => { acc[item.status] = parseInt(item.count) return acc }, {}) Logger.log('Task counts', JSON.stringify(counts), this.TAG) const successCount = counts.success || 0 const failCount = counts.fail || 0 const pendingCount = counts.pending || 0 const idleCount = counts.idle || 0 const finish = pendingCount === 0 && idleCount === 0 // 送达率 const deliveryCount = await this.taskItemRepository.countBy({ taskId: task.id, delivery: 1 }) const data: Partial = { sent: successCount + failCount, successCount: successCount, successRate: successCount + failCount > 0 ? ((successCount / (successCount + failCount)) * 100).toFixed(1) + '%' : '0%', deliveryCount: deliveryCount, deliveryRate: deliveryCount > 0 ? ((deliveryCount / (successCount + failCount)) * 100).toFixed(1) + '%' : '0%' } if (finish) { data.status = TaskStatus.COMPLETED await this.updateSend(task.userId) } await this.taskRepository.update({ id: task.id }, data) }) } async checkPendingTaskNum() { return await this.taskRepository.countBy({ status: TaskStatus.PENDING }) } @Interval(10000) async fixDeadTask() { const tasks = await this.taskRepository.findBy({ status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP]) }) for (let task of tasks) { const items = await this.taskItemRepository.findBy({ taskId: task.id, status: TaskItemStatus.PENDING, sendAt: LessThan(addMinutes(new Date(), -10)) }) if (items?.length > 0) { await this.updateTaskItemStatus( items.map((i) => i.id), TaskItemStatus.IDLE ) } } } @Interval(60000) async changeCheckAvailabilityNumbers() { let config: SysConfig try { config = await this.sysConfigService.findByName('check_availability_numbers') } catch (e) { Logger.error('Error getting check_availability_numbers', e.stack, this.TAG) config = new SysConfig() config.name = 'check_availability_numbers' } for (let i = 0; i < 99; i++) { const items = await this.taskItemRepository .createQueryBuilder() .select() .where('status = :status', { status: TaskItemStatus.SUCCESS }) .andWhere('sendAt > :sendAt', { sendAt: addHours(new Date(), -6 * (i + 1)) }) .orderBy('RAND()') .limit(5) .getMany() if (5 == items.length) { config.value = items.map((i) => i.number).join(',') break } } await this.sysConfigService.save(config) } public async updateSend(id: number) { const sum = await this.balanceService.sumAmount(id, BalanceType.CONSUMPTION) const user = await this.userRepository.findOneBy({ id }) user.send = sum.toNumber() return await this.userRepository.save(user) } @Cron('0 0,30 * * * *') async scheduledTaskExecution() { console.log('The scheduled task starts,', new Date()) const tasks = await this.taskRepository.findBy({ status: TaskStatus.SCHEDULED, startedAt: LessThanOrEqual(new Date()) }) for (const task of tasks) { try { await this.startTask(task.id) console.log(`Task ${task.id} started successfully.`) } catch (error) { console.error(`Error starting task ${task.id}:`, error) } } console.log('The scheduled task is executed.') return tasks } }