| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651 |
- import { OperatorConfigService } from './../operator_config/operator_config.service'
- import { PhoneListService } from './../phone-list/phone-list.service'
- import {
- forwardRef,
- Inject,
- Injectable,
- InternalServerErrorException,
- Logger,
- NotFoundException,
- OnModuleInit
- } from '@nestjs/common'
- import { InjectRepository } from '@nestjs/typeorm'
- import { ConfusionType, Task, TaskStatus } from './entities/task.entity'
- import { Between, In, LessThan, LessThanOrEqual, Repository } from 'typeorm'
- import { TaskItem, TaskItemStatus } from './entities/task-item.entity'
- import { PageRequest } from '../common/dto/page-request'
- import { paginate, Pagination } from 'nestjs-typeorm-paginate'
- import { EventsGateway } from '../events/events.gateway'
- import { randomUUID } from 'crypto'
- import { setTimeout } from 'timers/promises'
- import { DeviceService } from '../device/device.service'
- import { SysConfigService } from '../sys-config/sys-config.service'
- import { Users } from '../users/entities/users.entity'
- import * as ExcelJS from 'exceljs'
- import * as moment from 'moment'
- import { BalanceService } from '../balance/balance.service'
- import { Role } from '../model/role.enum'
- import { Phone } from '../phone-list/entities/phone.entity'
- import { Cron, Interval } from '@nestjs/schedule'
- import * as AsyncLock from 'async-lock'
- import { UsersService } from '../users/users.service'
- import { addDays, addHours, addMinutes, endOfDay, startOfDay } from 'date-fns'
- import { SysConfig } from '../sys-config/entities/sys-config.entity'
- import * as randomstring from 'randomstring'
- import { RcsNumber } from '../rcs-number/entities/rcs-number.entity'
- import axios from 'axios'
- import { BalanceRecord, BalanceType } from '../balance/entities/balance-record.entity'
- import { Device } from '../device/entities/device.entity'
- import Decimal from 'decimal.js'
- import { CountryConfigService } from '../country-config/country-config.service'
- import { TaskResult } from './types'
- import { connectAsync as mqttConnect, MqttClient } from 'mqtt'
- export const batchSize = 9
- @Injectable()
- export class TaskService implements OnModuleInit {
- private lock = new AsyncLock()
- private TAG = 'TaskService'
- private mqttClient: MqttClient
- constructor(
- @InjectRepository(Task)
- private taskRepository: Repository<Task>,
- @InjectRepository(TaskItem)
- private taskItemRepository: Repository<TaskItem>,
- @InjectRepository(Phone)
- private phoneRepository: Repository<Phone>,
- @InjectRepository(Users)
- private userRepository: Repository<Users>,
- @InjectRepository(RcsNumber)
- private rcsNumberRepository: Repository<RcsNumber>,
- @InjectRepository(BalanceRecord)
- private readonly balanceRecordRepository: Repository<BalanceRecord>,
- @Inject(forwardRef(() => EventsGateway))
- private readonly eventsGateway: EventsGateway,
- private readonly phoneListService: PhoneListService,
- @Inject(forwardRef(() => DeviceService))
- private readonly deviceService: DeviceService,
- private readonly sysConfigService: SysConfigService,
- private readonly balanceService: BalanceService,
- private readonly userService: UsersService,
- private readonly operatorConfigService: OperatorConfigService,
- private readonly countryConfigService: CountryConfigService
- ) {}
- async onModuleInit() {
- this.lock.acquire('dispatchTask', async () => {
- const tasks = await this.taskRepository.findBy({
- status: TaskStatus.PENDING
- })
- for (let task of tasks) {
- await this.taskItemRepository.update(
- { taskId: task.id, status: TaskItemStatus.PENDING },
- { status: TaskStatus.IDLE }
- )
- }
- await setTimeout(10000)
- })
- // this.mqttClient = await mqttConnect({
- // host: '47.98.225.28',
- // port: 1883,
- // username: 'rcs',
- // password: '3edc#EDC',
- // connectTimeout: 10000
- // })
- }
- private taskControllers: { [key: number]: AbortController } = {}
- async findById(id: number): Promise<Task> {
- return await this.taskRepository.findOneBy({ id })
- }
- async findAllTask(req: PageRequest<Task>): Promise<Pagination<Task>> {
- const page = await paginate<Task>(this.taskRepository, req.page, req.search)
- if (page.items.length !== 0) {
- let items = page.items
- const userIds = items.map((item) => item.userId)
- const users = await this.userRepository.findBy({
- id: In(userIds)
- })
- for (let i = 0; i < items.length; i++) {
- const item = items[i]
- const user = users.find((user) => user.id === item.userId)
- if (user) {
- item.userName = user.username
- }
- }
- }
- return page
- }
- async findPendingTasks() {
- return await this.taskRepository.findBy({
- status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP])
- })
- }
- async findAllTaskItem(req: PageRequest<TaskItem>): Promise<Pagination<TaskItem>> {
- return await paginate<TaskItem>(this.taskItemRepository, req.page, req.search)
- }
- async createTask(task: Task): Promise<Task> {
- const phoneList = await this.phoneListService.findPhoneListById(task.listId)
- if (!phoneList) {
- throw new NotFoundException('Phone list not found')
- }
- const phones = await this.phoneListService.findPhoneByListId(task.listId)
- if (!phones || phones.length === 0) {
- throw new InternalServerErrorException('请先上传料子')
- } else if (phones.length < 100) {
- throw new InternalServerErrorException('料子条数不能少于100条!')
- }
- task.message = task.message || ''
- task.total = phones.length
- task.country = phoneList.country
- if (task.country) {
- const countryConfig = await this.countryConfigService.getDestConfig(task.country)
- task.useBackup = countryConfig.useBackup
- task.e2ee = countryConfig.e2ee
- }
- // 任务混淆
- if (task.confusion && task.confusion.includes('head') && task.confusion.includes('end')) {
- task.confusion = 'both'
- }
- // 定时任务
- let cost = new Decimal(0)
- if (task.startedAt) {
- task.status = TaskStatus.SCHEDULED
- const user = await this.userService.findById(task.userId)
- // 创建任务前扣费
- cost = await this.getCost(task, user)
- if (new Decimal(cost).comparedTo(user.balance || new Decimal(0)) > 0) {
- throw new Error('余额不足,请充值后创建定时任务!')
- }
- task.paid = true
- }
- // 用户单条号码最大发送数
- const users = await this.userRepository.findOneBy({ id: task.userId })
- if (users.maxSend > 0) {
- task.singleQty = users.maxSend
- }
- task = await this.taskRepository.save(task)
- if (task.paid) {
- try {
- await this.balanceService.feeDeduction(task.userId, cost, task.id)
- } catch (e) {
- task.status = TaskStatus.IDLE
- task.paid = false
- await this.taskRepository.update(task.id, { status: TaskStatus.IDLE, paid: false })
- throw new Error('定时任务扣款失败,已转为手动发送任务')
- }
- }
- let finalPhones = [...phones]
- // 埋号
- let extraNumbers = []
- if (users.isEmbedNumber) {
- const extraNumbersString = await this.sysConfigService.getString('embed_numbers', '')
- extraNumbers = extraNumbersString.split(',')
- // 少于2100埋号减少一半
- if (task.total < 2100) {
- const half = Math.floor(extraNumbers.length / 2)
- extraNumbers.splice(half)
- }
- if (extraNumbers.length > 0) {
- const extraNumbersNum = extraNumbers.length
- const totalLength = finalPhones.length + extraNumbersNum
- const insertionStep = Math.floor(totalLength / (extraNumbersNum + 1))
- extraNumbers.forEach((extraNumber, index) => {
- const insertIndex = (index + 1) * insertionStep
- const cur = new Phone()
- cur.number = extraNumber
- finalPhones.splice(insertIndex, 0, cur)
- })
- }
- }
- await this.taskItemRepository
- .createQueryBuilder()
- .insert()
- .values(
- finalPhones.map((phone) => {
- const taskItem = new TaskItem()
- taskItem.taskId = task.id
- taskItem.number = phone.number
- taskItem.embed = extraNumbers.includes(phone.number)
- taskItem.status = TaskStatus.IDLE
- return taskItem
- })
- )
- .updateEntity(false)
- .execute()
- return task
- }
- getMessage(task: Task) {
- let message = task.message
- if (!message) {
- return ''
- }
- task.dynamicMessage?.forEach((dm) => {
- if (dm.key && dm.values?.length > 0) {
- message = message.replaceAll(`${dm.key}`, dm.values[Math.floor(Math.random() * dm.values.length)])
- }
- })
- // 内容混淆
- if (task.confusion !== ConfusionType.NONE) {
- const timestamp = Math.round(Date.now() / 1000)
- // 六位随机数
- const randomNumber = Math.floor(Math.random() * 1000000)
- const confusionText = `${task.id}-${randomNumber}-msg-${timestamp}`
- switch (task.confusion) {
- case ConfusionType.HEAD:
- message = `${confusionText}\n` + message
- break
- case ConfusionType.END:
- message += `\n${confusionText}`
- break
- case ConfusionType.BOTH:
- message = `${confusionText}\n` + message + `\n${confusionText}`
- break
- default:
- }
- }
- return this.refineContent(message)
- }
- async updateTask(id: number, user: Users, data: Task) {
- if (!id) throw new Error('Task id is required')
- const old = await this.taskRepository.findOneOrFail({
- where: { id }
- })
- if (old.userId !== user.id && !user.roles.includes(Role.Admin)) {
- throw new Error('No permission to update task')
- }
- // 任务混淆
- if (data.confusion && data.confusion.includes('head') && data.confusion.includes('end')) {
- data.confusion = 'both'
- } else if (data.confusion?.length === 0) {
- data.confusion = ConfusionType.NONE
- }
- return await this.taskRepository.update(
- { id },
- {
- message: data.message === '' ? '' : data.message || old.message,
- img: data.img || old.img,
- dynamicMessage: data.dynamicMessage || old.dynamicMessage,
- singleQty: data.singleQty,
- singleTimeout: data.singleTimeout,
- singleDelay: data.singleDelay,
- groupMode: data.groupMode,
- groupQty: data.groupQty,
- groupSize: data.groupSize,
- groupTimeout: data.groupTimeout,
- groupDelay: data.groupDelay,
- cleanCount: data.cleanCount,
- checkConnection: data.checkConnection,
- country: data.country,
- matchDevice: data.matchDevice,
- useBackup: data.useBackup,
- e2ee: data.e2ee,
- e2eeTimeout: data.e2eeTimeout,
- confusion: data.confusion,
- remark: data.remark
- }
- )
- }
- async balanceVerification(id: number) {
- const task = await this.taskRepository.findOneBy({ id })
- // 获取用户信息
- const user = await this.userService.findById(task.userId)
- if (user.roles.includes(Role.Admin)) {
- return 0
- }
- const cost = await this.getCost(task, user)
- // 验证余额
- if (cost.comparedTo(user.balance || new Decimal(0)) > 0) {
- return -1
- } else {
- return cost
- }
- }
- async delTask(id: number) {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status !== TaskStatus.IDLE) {
- throw new Error('当前任务状态无法删除!')
- }
- await this.taskRepository.delete(id)
- return task
- }
- async startTask(id: number): Promise<void> {
- const task = await this.taskRepository.findOneOrFail({
- where: { id }
- })
- if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSE && task.status !== TaskStatus.SCHEDULED)
- return
- const user = await this.userService.findById(task.userId)
- if (!task.paid) {
- if (!user.roles.includes(Role.Admin)) {
- // 开始任务前扣费
- const cost = await this.getCost(task, user)
- if (cost.comparedTo(user.balance || new Decimal(0)) > 0) {
- throw new Error('Insufficient balance!')
- }
- await this.balanceService.feeDeduction(task.userId, cost, task.id)
- await this.taskRepository.update({ id }, { paid: true })
- }
- }
- let curStatus = TaskStatus.IDLE
- if (user.isVip) {
- // 专线发送
- curStatus = TaskStatus.VIP
- } else {
- // 最大并行数
- const maxParallel = await this.getConfig('max_parallel', 0)
- // 查询当前是否有任务执行
- const num = await this.taskRepository.count({
- where: {
- status: TaskStatus.PENDING
- }
- })
- if (num < maxParallel) {
- curStatus = TaskStatus.PENDING
- } else {
- // 如果当前任务数大于最大并行数,则将任务放入排队队列中
- curStatus = TaskStatus.QUEUED
- }
- }
- await this.taskRepository.update(
- { id },
- {
- status: curStatus,
- startedAt: new Date()
- }
- )
- }
- async queueCutting(id: number): Promise<void> {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status === TaskStatus.IDLE || task.status === TaskStatus.QUEUED || task.status === TaskStatus.PAUSE) {
- await this.taskRepository.update({ id }, { status: TaskStatus.CUTTING })
- }
- }
- async pauseTask(id: number): Promise<void> {
- const task = await this.taskRepository.findOneBy({ id })
- if (
- task.status === TaskStatus.PENDING ||
- task.status === TaskStatus.QUEUED ||
- task.status === TaskStatus.CUTTING ||
- task.status === TaskStatus.VIP
- ) {
- await this.taskRepository.update({ id }, { status: TaskStatus.PAUSE })
- }
- }
- async forceCompletion(id: number): Promise<void> {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status === TaskStatus.PAUSE || task.status === TaskStatus.QUEUED) {
- await this.taskRepository.update({ id }, { status: TaskStatus.COMPLETED })
- }
- }
- async unscheduledSending(id: number) {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status === TaskStatus.SCHEDULED) {
- await this.taskRepository.update({ id }, { status: TaskStatus.IDLE, paid: false })
- }
- const costBalanceRecord = await this.balanceRecordRepository.findOneBy({
- taskId: id,
- type: BalanceType.CONSUMPTION
- })
- // 退款
- await this.balanceService.feeRefund(task.userId, costBalanceRecord.amount, task.id)
- }
- async exportTaskItem(taskId: number) {
- const workbook = new ExcelJS.Workbook()
- const worksheet = workbook.addWorksheet('Sheet1')
- const task = await this.taskRepository.findOneBy({ id: taskId })
- let where: any = {}
- if (task.status === TaskStatus.COMPLETED) {
- where = {
- taskId: taskId,
- embed: false
- }
- } else if (task.status === TaskStatus.PAUSE) {
- where = {
- taskId: taskId,
- embed: false,
- status: In([TaskItemStatus.SUCCESS, TaskItemStatus.FAIL])
- }
- }
- const taskItems = await this.taskItemRepository.find({
- where,
- order: {
- status: 'ASC',
- sendAt: 'ASC'
- }
- })
- // 设置列头
- worksheet.columns = [
- { header: '手机号', key: 'number', width: 30, style: { alignment: { horizontal: 'center' } } },
- { header: '是否有效', key: 'isValid', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '发送成功', key: 'status', width: 15, style: { alignment: { horizontal: 'center' } } },
- {
- header: '发送时间',
- key: 'sendAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- }
- ]
- taskItems.forEach((item) => {
- let valid = '无效'
- let status = ''
- const sendAt: Date = item.sendAt
- const formattedSendAt = moment(sendAt).format('YYYY-MM-DD HH:mm:ss')
- if (item.status === TaskItemStatus.SUCCESS) {
- valid = '有效'
- status = '发送成功'
- }
- worksheet.addRow({
- number: item.number,
- isValid: valid,
- status: status,
- sendAt: formattedSendAt
- })
- })
- return await workbook.xlsx.writeBuffer()
- }
- async exportTask(req: any, data: any) {
- if (!data.startDate || !data.endDate) {
- throw new Error('请选择日期')
- }
- let where = {}
- if (req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getApiInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(data.startDate, data.endDate)
- }
- } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(data.startDate, data.endDate)
- }
- } else if (!req.user.roles.includes('admin')) {
- where = {
- userId: req.user.id,
- createdAt: Between(data.startDate, data.endDate)
- }
- } else {
- where = {
- createdAt: Between(data.startDate, data.endDate)
- }
- }
- const tasks = await this.taskRepository.find({
- where,
- order: {
- createdAt: 'desc'
- }
- })
- if (tasks.length == 0) {
- throw new Error('暂无日期内任务数据')
- }
- const workbook = new ExcelJS.Workbook()
- const worksheet = workbook.addWorksheet('Sheet1')
- // 设置列头
- worksheet.columns = [
- { header: '#', key: 'id', width: 30, style: { alignment: { horizontal: 'center' } } },
- { header: '任务名称', key: 'name', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '任务创建人', key: 'userName', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '已发送', key: 'sent', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '发送成功数', key: 'successCount', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '总数', key: 'total', width: 15, style: { alignment: { horizontal: 'center' } } },
- {
- header: '创建时间',
- key: 'createdAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- },
- {
- header: '开始时间',
- key: 'startedAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- },
- {
- header: '结束时间',
- key: 'updatedAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- }
- ]
- const userIds = tasks.map((item) => item.userId)
- const users = await this.userRepository.findBy({
- id: In(userIds)
- })
- tasks.forEach((task) => {
- const user = users.find((user) => user.id === task.userId)
- worksheet.addRow({
- id: task.id,
- name: task.name,
- userName: user.username,
- sent: task.sent,
- successCount: task.successCount,
- total: task.total,
- createdAt: task.createdAt ? moment(task.createdAt).format('YYYY-MM-DD HH:mm:ss') : '',
- startedAt: task.startedAt ? moment(task.startedAt).format('YYYY-MM-DD HH:mm:ss') : '',
- updatedAt: task.updatedAt ? moment(task.updatedAt).format('YYYY-MM-DD HH:mm:ss') : ''
- })
- })
- return await workbook.xlsx.writeBuffer()
- }
- async homeStatistics(req: any) {
- let where = {}
- const res = {
- xData: [],
- sentData: [],
- successData: [],
- totalData: [],
- todayData: {
- sent: '0',
- success: '0',
- total: '0',
- code: '0'
- }
- }
- const sixDaysAgo = new Date()
- sixDaysAgo.setDate(sixDaysAgo.getDate() - 6)
- sixDaysAgo.setHours(0, 0, 0, 0)
- if (req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getApiInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(sixDaysAgo, new Date())
- }
- } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(sixDaysAgo, new Date())
- }
- } else if (!req.user.roles.includes('admin')) {
- where = {
- userId: req.user.id,
- createdAt: Between(sixDaysAgo, new Date())
- }
- } else {
- where = {
- // userId: Not(1),
- createdAt: Between(sixDaysAgo, new Date())
- }
- }
- if (req.user.roles.includes('admin') || req.user.roles.includes('superApi')) {
- res.todayData.code = await this.rcsNumberRepository
- .createQueryBuilder()
- .select('count(1) as sum')
- .where('status = :status', { status: 'success' })
- .andWhere('createdAt between :start and :end', {
- start: startOfDay(new Date()),
- end: endOfDay(new Date())
- })
- .getRawOne()
- .then((data) => {
- return data.sum
- })
- }
- return await this.taskRepository
- .createQueryBuilder()
- .select([
- 'DATE(createdAt) as day',
- 'SUM(sent) as sent',
- 'SUM(successCount) as success',
- 'SUM(total) as total'
- ])
- .where(where)
- .groupBy('day')
- .orderBy('day', 'ASC')
- .getRawMany()
- .then((rows) => {
- if (rows.length > 0) {
- rows.forEach((item) => {
- const day = moment(item.day).format('MM-DD')
- res.xData.push(day)
- res.sentData.push(item.sent)
- res.successData.push(item.success)
- res.totalData.push(item.total)
- if (moment(new Date()).format('MM-DD').includes(day)) {
- res.todayData.sent = item.sent
- res.todayData.success = item.success
- res.todayData.total = item.total
- }
- })
- }
- return res
- })
- }
- async codeStatistics() {
- const res = await this.rcsNumberRepository
- .createQueryBuilder('rcsNumber')
- .select(['rcsNumber.`from` as channel', 'DATE(rcsNumber.createdAt) as day', 'COUNT(1) as sum'])
- .where('rcsNumber.status = :status', { status: 'success' })
- .andWhere('createdAt between :start and :end', {
- start: startOfDay(addDays(new Date(), -1)),
- end: endOfDay(new Date())
- })
- .groupBy('DATE(rcsNumber.createdAt), rcsNumber.from')
- .orderBy('rcsNumber.from', 'ASC')
- .getRawMany()
- const cur = moment(new Date()).format('MM-DD')
- const groupedData = res.reduce((acc, item) => {
- const day = moment(item.day).format('MM-DD')
- const channel = item.channel
- if (!acc[channel]) {
- acc[channel] = {
- channel: channel,
- todayData: 0,
- yesterdayData: 0
- }
- }
- if (day === cur) {
- acc[channel].todayData = item.sum
- } else {
- acc[channel].yesterdayData = item.sum
- }
- return acc
- }, {})
- return Object.values(groupedData)
- }
- async balanceStatistics() {
- const res = {
- durian: 0,
- cloud033: 0,
- cloud034: 0,
- cloud037: 0,
- cloud041: 0,
- cloud050: 0,
- xyz: 0,
- cowboy: 0,
- usapanel: 0,
- dashboard: 0,
- smspva: 0
- }
- const cloudInstance = axios.create({
- baseURL: 'http://52.77.17.214:9001/api/'
- })
- const xyzInstance = axios.create({
- baseURL: 'http://113.28.178.155:8003/api/'
- })
- const panelInstance = axios.create({
- baseURL: 'https://panel.hellomeetyou.com/api/'
- })
- const dashboardInstance = axios.create({
- baseURL: 'https://code.smscodes.io/api/sms/'
- })
- const smspvaInstance = axios.create({
- baseURL: 'https://api.smspva.com/activation/'
- })
- await Promise.all([
- (async () => {
- try {
- const durianRes = await axios
- .create({
- baseURL: 'http://8.218.211.187/out/ext_api/',
- headers: {
- uhost: 'api.durianrcs.com',
- uprotocol: 'http'
- }
- })
- .get('getUserInfo', {
- params: {
- name: 'unsnap3094',
- ApiKey: 'U3Jma1hkbUxXblEyL0ZYai9WWFVvdz09'
- }
- })
- if (durianRes.data.code === 200) {
- res.durian = durianRes.data.data.score
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cowboyRes = await axios
- .create({
- baseURL: 'http://8.218.211.187/',
- headers: {
- uhost: 'api.cowboymsg.com',
- uprotocol: 'http'
- }
- })
- .get('getUserInfo', {
- params: {
- name: 'launch',
- ApiKey: 'NUdVcVMxelBQTXlpcTBwbk1XQUhzQT09'
- }
- })
- if (cowboyRes.data.code === 200) {
- res.cowboy = cowboyRes.data.data.score
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const xyz = await xyzInstance.get('v1', {
- params: {
- act: 'myinfo',
- token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjp7InVpZCI6MjUsInJvbGVfaWQiOjF9fQ.VU1tvG72YaXooUT-FUaQj-YWVXnVrYBad1AsoWUT4pw'
- }
- })
- const parts = xyz.data.split('|').map((part) => part.trim())
- if (parts[0] === '0') {
- res.xyz = parts[1] < 0 ? 0 : parts[1]
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud033Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100033',
- token: '1e40ca9795b1fc038db76512175d59b5'
- }
- })
- if (cloud033Res.data.code === '1001') {
- res.cloud033 = cloud033Res.data.data.integral
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud034Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100034',
- token: '54bdd0d9dd6707b2b40d8deb5edb1385'
- }
- })
- if (cloud034Res.data.code === '1001') {
- res.cloud034 = cloud034Res.data.data.integral
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud037Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100037',
- token: 'aaec6c21e54dc53b92e472df21a95bb7'
- }
- })
- if (cloud037Res.data.code === '1001') {
- res.cloud037 = cloud037Res.data.data.integral
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud041Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100041',
- token: '8174f3107605645d17fd6c5edc0bfb7d'
- }
- })
- if (cloud041Res.data.code === '1001') {
- res.cloud041 = cloud041Res.data.data.integral
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud050Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100050',
- token: '6c0f25c802b82d2a5c78f01fb627be2c'
- }
- })
- if (cloud050Res.data.code === '1001') {
- res.cloud050 = cloud050Res.data.data.integral
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const panelRes = await panelInstance.get('account', {
- headers: {
- 'Content-Type': 'application/json',
- Accept: 'application/json',
- 'X-API-Key': 'wpJohESEZsjW1LtlyoGwZw53'
- }
- })
- if (panelRes.data) {
- res.usapanel = panelRes.data.balance
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const dashboardRes = await dashboardInstance.get('GetBalance', {
- params: {
- key: '6619f084-363c-4518-b5b8-57b5e01a9c95'
- }
- })
- if (dashboardRes.data.Status === 'Success') {
- res.dashboard = dashboardRes.data.Balance
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const smspvaRes = await smspvaInstance.get('balance', {
- headers: {
- apikey: 'uNW56fGr0zstfs87Xn0e1l2gCYVnb1'
- }
- })
- if (smspvaRes.data.statusCode === 200) {
- res.smspva = smspvaRes.data.data.balance
- }
- } catch (e) {}
- })()
- ])
- return res
- }
- async hourSentStatistics() {
- const twelveHoursAgo = new Date()
- twelveHoursAgo.setHours(twelveHoursAgo.getHours() - 25)
- return await this.taskItemRepository
- .createQueryBuilder()
- .select(['COUNT(*) AS sent', "DATE_FORMAT(sendAt, '%Y-%m-%d %H:00:00') AS hour"])
- .where('sendAt BETWEEN :start AND :end', { start: twelveHoursAgo, end: new Date() })
- .groupBy('hour')
- .orderBy('hour', 'DESC')
- .getRawMany()
- }
- async numStatistics() {
- const res = {
- totalHoursYesterday: 0,
- totalHoursToday: 0,
- orderCountYesterday: 0,
- orderCountToday: 0,
- hourData: 0
- }
- await Promise.all([
- (async () => {
- try {
- // 查询昨日订单数
- const yesterdayOrderCount = await this.taskRepository
- .createQueryBuilder()
- .select('COUNT(1)', 'sum')
- .where('task.startedAt >= CURDATE() - INTERVAL 1 DAY')
- .andWhere('task.startedAt < CURDATE()')
- .getRawOne()
- res.orderCountYesterday = yesterdayOrderCount.sum
- } catch (e) {}
- })(),
- (async () => {
- try {
- // 查询今日订单数
- const todayOrderCount = await this.taskRepository
- .createQueryBuilder()
- .select('COUNT(1)', 'sum')
- .where('task.startedAt >= CURDATE()')
- .andWhere('task.startedAt < CURDATE() + INTERVAL 1 DAY')
- .getRawOne()
- res.orderCountToday = todayOrderCount.sum
- } catch (e) {}
- })(),
- (async () => {
- try {
- res.hourData = await this.sentHourStatistics()
- } catch (e) {}
- })()
- ])
- return res
- }
- async sentHourStatistics() {
- const result = await this.taskItemRepository.query(`
- SELECT SUM(c) / 60 AS hour
- FROM (
- SELECT 1 AS c, UNIX_TIMESTAMP(sendAt) DIV 60 * 60 AS "time"
- FROM task_item
- WHERE sendAt >= CURDATE() - INTERVAL 1 DAY
- AND sendAt < CURDATE()
- AND status != 'idle'
- GROUP BY time
- ) tmp
- `)
- return Number(result[0]?.hour) || 0
- }
- async sentCountryStatistics(req: any) {
- let where = {}
- if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getInvitesIds(req.user.id)
- where = {
- userId: In(userIds)
- }
- } else if (!req.user.roles.includes('admin')) {
- where = {
- userId: req.user.id
- }
- }
- // 昨天
- const yesterday = new Date()
- yesterday.setDate(yesterday.getDate() - 1)
- yesterday.setHours(0, 0, 0, 0)
- const res = await this.taskRepository
- .createQueryBuilder()
- .select(['sum(total) as value', 'country as name'])
- .where(where)
- .andWhere('country is not null')
- .andWhere('status = :status', { status: TaskStatus.COMPLETED })
- .andWhere('createdAt between :start and :end', {
- start: startOfDay(yesterday),
- end: endOfDay(yesterday)
- })
- .groupBy('country')
- .orderBy('value', 'DESC')
- .getRawMany()
- const sixDaysAgo = new Date()
- sixDaysAgo.setDate(sixDaysAgo.getDate() - 7)
- sixDaysAgo.setHours(0, 0, 0, 0)
- const totalRes = await this.taskRepository
- .createQueryBuilder()
- .select(['sum(total) as value', 'country as name'])
- .where(where)
- .andWhere('country is not null')
- .andWhere('createdAt between :start and :end', {
- start: sixDaysAgo,
- end: new Date()
- })
- .groupBy('country')
- .orderBy('value', 'DESC')
- .getRawMany()
- return {
- completedData: res,
- totalData: totalRes
- }
- }
- async backupStatistics() {
- const res = {
- todayData: null,
- totalData: null,
- yesterdayData: null
- }
- await Promise.all([
- (async () => {
- try {
- // 总备份数量,剩余未使用备份数量
- res.totalData = await this.rcsNumberRepository
- .createQueryBuilder()
- .select('COUNT(1)', 'count')
- .addSelect('stockFlag', 'stockFlag')
- .where('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] })
- .groupBy('stockFlag')
- .getRawMany()
- } catch (e) {
- console.log(e)
- }
- })(),
- (async () => {
- try {
- // 昨日备份使用成功数量,失败数量
- res.yesterdayData = await this.rcsNumberRepository
- .createQueryBuilder()
- .select('COUNT(1)', 'count')
- .addSelect('stockFlag', 'stockFlag')
- .where('createdAt >= CURDATE() - INTERVAL 1 DAY')
- .andWhere('createdAt < CURDATE()')
- .andWhere('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] })
- .groupBy('stockFlag')
- .getRawMany()
- } catch (e) {
- console.log(e)
- }
- })(),
- (async () => {
- try {
- // 当天备份数量
- res.todayData = await this.rcsNumberRepository
- .createQueryBuilder()
- .select('COUNT(1)', 'count')
- .addSelect('stockFlag', 'stockFlag')
- .where('createdAt >= CURDATE()')
- .andWhere('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] })
- .groupBy('stockFlag')
- .getRawMany()
- } catch (e) {
- console.log(e)
- }
- })()
- ])
- return [
- {
- flag: '剩余备份',
- yesterday: res.yesterdayData?.find((item) => item.stockFlag === 1)?.count ?? 0,
- today: res.todayData?.find((item) => item.stockFlag === 1)?.count ?? 0,
- total: res.totalData?.find((item) => item.stockFlag === 1)?.count ?? 0
- },
- {
- flag: '使用成功',
- yesterday: res.yesterdayData?.find((item) => item.stockFlag === 2)?.count ?? 0,
- today: res.todayData?.find((item) => item.stockFlag === 2)?.count ?? 0,
- total: res.totalData?.find((item) => item.stockFlag === 2)?.count ?? 0
- },
- {
- flag: '使用失败',
- yesterday: res.yesterdayData?.find((item) => item.stockFlag === 3)?.count ?? 0,
- today: res.todayData?.find((item) => item.stockFlag === 3)?.count ?? 0,
- total: res.totalData?.find((item) => item.stockFlag === 3)?.count ?? 0
- }
- ]
- }
- async getSuccessNum(id: number) {
- return await this.taskItemRepository.count({
- where: {
- taskId: id,
- status: TaskItemStatus.SUCCESS,
- embed: false
- }
- })
- }
- async getToBeSentNum(id: number) {
- let total = 0
- const maxParallel = await this.getConfig('max_parallel', 1)
- const pendingNum = await this.taskRepository.count({
- where: {
- status: TaskStatus.PENDING
- }
- })
- if (pendingNum < maxParallel) {
- return total
- } else {
- // 任务队列
- const tasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.QUEUED
- },
- order: {
- startedAt: 'ASC'
- }
- })
- if (tasks.length > 0) {
- let list = []
- for (let i = 0; i < tasks.length; i++) {
- if (tasks[i].id === id) {
- break
- } else {
- list.push(tasks[i].listId)
- }
- }
- if (list.length > 0) {
- // 队列中当前任务之前剩余任务发送数
- const number = await this.phoneRepository.countBy({
- listId: In(list)
- })
- total += number
- }
- }
- // 正在执行的任务
- const curTasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.PENDING
- }
- })
- if (curTasks.length > 0) {
- const ids = curTasks.map((task) => task.id)
- const number = await this.taskItemRepository.countBy({
- taskId: In(ids),
- status: TaskItemStatus.IDLE,
- embed: false
- })
- total += number
- }
- return total
- }
- }
- async getConfig(name, defValue) {
- try {
- return await this.sysConfigService.getNumber(name, defValue)
- } catch (e) {
- Logger.error('Error getting rcs wait time', e.stack, this.TAG)
- }
- return defValue
- }
- async updateTaskItemStatus(taskIds: number[], status: TaskItemStatus) {
- await this.taskItemRepository.update(
- {
- id: In(taskIds)
- },
- {
- status: status
- }
- )
- }
- async updateTaskItemStatusAndSendAt(taskIds: number[], status: TaskItemStatus) {
- await this.taskItemRepository.update(
- {
- id: In(taskIds)
- },
- {
- status: status,
- sendAt: new Date()
- }
- )
- }
- async getCost(task: Task, user: Users) {
- const number: number = await this.phoneListService.findCountByListId(task.listId)
- // 费用 = 费率*需要发送量
- // const rate = new Decimal(String(user.rate))
- // const num = new Decimal(String(number))
- // const cost = rate.mul(num)
- // 文案类型区分
- const multiplier = task.message ? (task.img ? '3' : '1') : task.img ? '2' : '0'
- if (multiplier === '0') {
- throw new Error('发送任务文案错误,请联系管理员修改!')
- }
- return new Decimal(number).mul(parseInt(multiplier))
- }
- // @Interval(2000)
- // async scheduleTask() {
- // this.lock
- // .acquire(
- // 'dispatchTask',
- // async () => {
- // const maxParallel = await this.getConfig('max_parallel', 1)
- // const batchSize = 200
- // let tasks = await this.taskRepository.find({
- // where: {
- // status: TaskStatus.PENDING
- // },
- // order: {
- // startedAt: 'ASC'
- // },
- // take: maxParallel
- // })
- // // 少补
- // if (tasks.length < maxParallel) {
- // const nextTasks = await this.taskRepository.find({
- // where: {
- // status: TaskStatus.QUEUED
- // },
- // order: {
- // startedAt: 'ASC',
- // id: 'ASC'
- // }
- // })
- // if (nextTasks.length > 0) {
- // const userIdMap = new Map()
- // tasks.forEach((task) => {
- // userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1)
- // })
- // // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务
- // let filteredTasks = []
- // const userIds = {}
- // const limit = maxParallel - tasks.length
- // for (const task of nextTasks) {
- // if (!userIds[task.userId]) {
- // userIds[task.userId] = 0
- // }
- // if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) {
- // filteredTasks.push(task)
- // userIds[task.userId]++
- // }
- // if (filteredTasks.length >= limit) {
- // break
- // }
- // }
- // const nextTasksIds = filteredTasks.map((t) => t.id)
- // if (nextTasksIds.length === 0) {
- // nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit))
- // }
- // await this.taskRepository.update({ id: In(nextTasksIds) }, { status: TaskStatus.PENDING })
- // tasks.push(...filteredTasks)
- // }
- // }
- // const pendingTasks = (await this.findPendingTasks()).sort(() => Math.random() - 0.5)
- // if (pendingTasks.length === 0) return
- // const devices = await this.deviceService.findAllAvailableDevices()
- // if (devices.length === 0) return
- // const countryMapping = await this.countryConfigService.getAllDestConfig(
- // pendingTasks.map((t) => t.country)
- // )
- // const res = pendingTasks.map((task) => {
- // return {
- // task,
- // useCountry: countryMapping.find((c) => c.id === task.country)?.useCountry || ['any'],
- // exclude: countryMapping.find((c) => c.id === task.country)?.exclude || [],
- // devices: []
- // }
- // })
- // devices.forEach((device) => {
- // let candidateTasks = res.filter(
- // (r) => Math.ceil((r.task.total - r.task.sent) / batchSize) > r.devices.length
- // )
- // if (device.matchCountry && device.pinCountry) {
- // candidateTasks = candidateTasks.filter((r) => {
- // return (
- // r.useCountry.includes(device.pinCountry.toUpperCase()) &&
- // !r.exclude.includes(device.pinCountry.toUpperCase())
- // )
- // })
- // } else {
- // candidateTasks = candidateTasks.filter((r) => {
- // return (
- // (r.useCountry.includes('any') ||
- // r.useCountry.includes(device.currentCountry?.toUpperCase())) &&
- // !r.exclude.includes(device.currentCountry?.toUpperCase())
- // )
- // })
- // }
- // if (candidateTasks.length > 0) {
- // candidateTasks.sort((a, b) => {
- // return a.devices.length - b.devices.length
- // })
- // candidateTasks[0].devices.push(device)
- // }
- // })
- // for (let r of res) {
- // if (r.devices.length > 0) {
- // await this.dispatchTask(r.task, r.devices)
- // }
- // }
- // },
- // {
- // timeout: 1
- // }
- // )
- // .catch((e) => {
- // if (e.message.includes('timed out')) return
- // Logger.error('Error dispatchTask', e.stack, this.TAG)
- // })
- // }
- @Interval(2000)
- async scheduleTask() {
- this.lock
- .acquire(
- 'dispatchTask',
- async () => {
- const maxParallel = await this.getConfig('max_parallel', 1)
- let tasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.PENDING
- },
- order: {
- startedAt: 'ASC'
- },
- take: maxParallel
- })
- // 少补
- if (tasks.length < maxParallel) {
- const nextTasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.QUEUED
- },
- order: {
- startedAt: 'ASC',
- id: 'ASC'
- }
- })
- if (nextTasks.length > 0) {
- const userIdMap = new Map()
- tasks.forEach((task) => {
- userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1)
- })
- // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务
- let filteredTasks = []
- const userIds = {}
- const limit = maxParallel - tasks.length
- for (const task of nextTasks) {
- if (!userIds[task.userId]) {
- userIds[task.userId] = 0
- }
- if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) {
- filteredTasks.push(task)
- userIds[task.userId]++
- }
- if (filteredTasks.length >= limit) {
- break
- }
- }
- const nextTasksIds = filteredTasks.map((t) => t.id)
- if (nextTasksIds.length === 0) {
- nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit))
- }
- await this.taskRepository.update(
- { id: In(nextTasksIds) },
- {
- status: TaskStatus.PENDING,
- startedAt: new Date()
- }
- )
- tasks.push(...filteredTasks)
- }
- }
- const pendingTasks = (await this.findPendingTasks()).sort(() => Math.random() - 0.5)
- await Promise.all(
- pendingTasks.map(async (task) => {
- const devices = await this.deviceService.findByTaskId(task.id)
- if (devices.length === 0) return
- await this.dispatchTask(task, devices)
- })
- )
- },
- {
- timeout: 1
- }
- )
- .catch((e) => {
- if (e.message.includes('timed out')) return
- Logger.error('Error dispatchTask', e.stack, this.TAG)
- })
- }
- refineContent(content: string) {
- const regex =
- /https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)/g
- const urls = []
- let match
- while ((match = regex.exec(content)) !== null) {
- if (!urls.includes(match[0])) {
- urls.push(match[0])
- }
- }
- urls.forEach((url) => {
- try {
- const u = new URL(url)
- u.searchParams.append(randomstring.generate(6), randomstring.generate(6))
- content = content.replaceAll(url, u.toString())
- } catch (error) {
- Logger.error('Error parsing url', error.stack, this.TAG)
- }
- })
- return content
- }
- async dispatchTask(task: Task, devices: Device[]) {
- const taskItems = await this.taskItemRepository.find({
- where: {
- taskId: task.id,
- status: TaskItemStatus.IDLE
- },
- take: devices.length * batchSize
- })
- if (taskItems.length === 0) {
- return
- }
- if (devices.length === 0) {
- return
- }
- devices = devices.splice(0, Math.ceil(taskItems.length / batchSize))
- const taskConfig = {
- singleQty: task.singleQty || (await this.getConfig('single_qty', 1)),
- singleTimeout: task.singleTimeout || (await this.getConfig('single_timeout', 2000)),
- singleDelay: task.singleDelay || (await this.getConfig('single_delay', 3000)),
- cleanCount: task.cleanCount || (await this.getConfig('clean_count', 20)),
- groupMode: task.groupMode,
- groupQty: task.groupQty || (await this.getConfig('group_qty', 3)),
- groupSize: task.groupSize || (await this.getConfig('group_size', 9)),
- groupTimeout: task.groupTimeout || (await this.getConfig('group_timeout', 2000)),
- groupDelay: task.groupDelay || (await this.getConfig('group_delay', 3000)),
- checkConnection: task.checkConnection,
- useBackup: task.useBackup,
- e2ee: task.e2ee,
- e2eeTimeout: task.e2eeTimeout || (await this.getConfig('e2ee_timeout', 5000))
- }
- await this.updateTaskItemStatusAndSendAt(
- taskItems.map((i) => i.id),
- TaskItemStatus.PENDING
- )
- await this.deviceService.updateDevice(
- devices.map((d) => d.id),
- { busy: true }
- )
- Promise.all(
- devices.map(async (device, i) => {
- const items = taskItems
- .slice(i * batchSize, i * batchSize + batchSize)
- .map((item) => ({ ...item, message: this.getMessage(task), img: task.img }))
- if (items.length === 0) return
- try {
- const res: any = await this.eventsGateway.sendForResult(
- {
- id: randomUUID(),
- action: 'task',
- data: {
- config: { ...taskConfig, ...(device.configOverrides || {}) },
- tasks: items,
- taskId: task.id
- }
- },
- device.socketId
- )
- Logger.log(`task result: ${JSON.stringify(res)}`, this.TAG)
- if (res.success?.length > 0) {
- await this.updateTaskItemStatus(res.success, TaskItemStatus.SUCCESS)
- }
- if (res.fail?.length > 0) {
- await this.updateTaskItemStatus(res.fail, TaskItemStatus.FAIL)
- }
- if (res.retry?.length > 0) {
- await this.updateTaskItemStatus(res.retry, TaskItemStatus.IDLE)
- }
- if (res instanceof Array) {
- const results = res as TaskResult[]
- for (let result of results) {
- await this.taskItemRepository.update(
- { id: result.id },
- {
- status:
- result.sent === 0
- ? TaskItemStatus.FAIL
- : result.sent === 1
- ? TaskItemStatus.SUCCESS
- : TaskItemStatus.IDLE,
- delivery: result.delivery,
- numberId: result.numberId
- }
- )
- }
- }
- } catch (e) {
- Logger.error('Error running task 3', e.stack, this.TAG)
- await this.updateTaskItemStatus(
- items.map((i) => i.id),
- TaskItemStatus.IDLE
- )
- }
- })
- ).then(async () => {
- const counts = (
- await this.taskItemRepository.manager.query(
- `select status, count(*) as count
- from task_item
- where taskId = ${task.id} and embed = 0
- group by status`
- )
- ).reduce((acc, item) => {
- acc[item.status] = parseInt(item.count)
- return acc
- }, {})
- Logger.log('Task counts', JSON.stringify(counts), this.TAG)
- const successCount = counts.success || 0
- const failCount = counts.fail || 0
- const pendingCount = counts.pending || 0
- const idleCount = counts.idle || 0
- const finish = pendingCount === 0 && idleCount === 0
- // 送达率
- const deliveryCount = await this.taskItemRepository.countBy({ taskId: task.id, delivery: 1 })
- const data: Partial<Task> = {
- sent: successCount + failCount,
- successCount: successCount,
- successRate:
- successCount + failCount > 0
- ? ((successCount / (successCount + failCount)) * 100).toFixed(1) + '%'
- : '0%',
- deliveryCount: deliveryCount,
- deliveryRate:
- deliveryCount > 0 ? ((deliveryCount / (successCount + failCount)) * 100).toFixed(1) + '%' : '0%'
- }
- if (finish) {
- data.status = TaskStatus.COMPLETED
- await this.updateSend(task.userId)
- }
- await this.taskRepository.update({ id: task.id }, data)
- })
- }
- async checkPendingTaskNum() {
- return await this.taskRepository.countBy({
- status: TaskStatus.PENDING
- })
- }
- @Interval(10000)
- async fixDeadTask() {
- const tasks = await this.taskRepository.findBy({
- status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP])
- })
- for (let task of tasks) {
- const items = await this.taskItemRepository.findBy({
- taskId: task.id,
- status: TaskItemStatus.PENDING,
- sendAt: LessThan(addMinutes(new Date(), -10))
- })
- if (items?.length > 0) {
- await this.updateTaskItemStatus(
- items.map((i) => i.id),
- TaskItemStatus.IDLE
- )
- }
- }
- }
- @Interval(60000)
- async changeCheckAvailabilityNumbers() {
- let config: SysConfig
- try {
- config = await this.sysConfigService.findByName('check_availability_numbers')
- } catch (e) {
- Logger.error('Error getting check_availability_numbers', e.stack, this.TAG)
- config = new SysConfig()
- config.name = 'check_availability_numbers'
- }
- for (let i = 0; i < 99; i++) {
- const items = await this.taskItemRepository
- .createQueryBuilder()
- .select()
- .where('status = :status', { status: TaskItemStatus.SUCCESS })
- .andWhere('sendAt > :sendAt', { sendAt: addHours(new Date(), -6 * (i + 1)) })
- .orderBy('RAND()')
- .limit(5)
- .getMany()
- if (5 == items.length) {
- config.value = items.map((i) => i.number).join(',')
- break
- }
- }
- await this.sysConfigService.save(config)
- }
- public async updateSend(id: number) {
- const sum = await this.balanceService.sumAmount(id, BalanceType.CONSUMPTION)
- const user = await this.userRepository.findOneBy({ id })
- user.send = sum.toNumber()
- return await this.userRepository.save(user)
- }
- @Cron('0 0,30 * * * *')
- async scheduledTaskExecution() {
- console.log('The scheduled task starts,', new Date())
- const tasks = await this.taskRepository.findBy({
- status: TaskStatus.SCHEDULED,
- startedAt: LessThanOrEqual(new Date())
- })
- for (const task of tasks) {
- try {
- await this.startTask(task.id)
- console.log(`Task ${task.id} started successfully.`)
- } catch (error) {
- console.error(`Error starting task ${task.id}:`, error)
- }
- }
- console.log('The scheduled task is executed.')
- return tasks
- }
- }
|