| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229 |
- import { OperatorConfigService } from './../operator_config/operator_config.service'
- import { PhoneListService } from './../phone-list/phone-list.service'
- import {
- forwardRef,
- Inject,
- Injectable,
- InternalServerErrorException,
- Logger,
- NotFoundException,
- OnModuleInit
- } from '@nestjs/common'
- import { InjectRepository } from '@nestjs/typeorm'
- import { Task, TaskStatus } from './entities/task.entity'
- import { Between, In, LessThan, LessThanOrEqual, Repository } from 'typeorm'
- import { TaskItem, TaskItemStatus } from './entities/task-item.entity'
- import { PageRequest } from '../common/dto/page-request'
- import { paginate, Pagination } from 'nestjs-typeorm-paginate'
- import { EventsGateway } from '../events/events.gateway'
- import { randomUUID } from 'crypto'
- import { setTimeout } from 'timers/promises'
- import { DeviceService } from '../device/device.service'
- import { SysConfigService } from '../sys-config/sys-config.service'
- import { Users } from '../users/entities/users.entity'
- import * as ExcelJS from 'exceljs'
- import * as moment from 'moment'
- import { BalanceService } from '../balance/balance.service'
- import { Role } from '../model/role.enum'
- import { Phone } from '../phone-list/entities/phone.entity'
- import { Cron, Interval } from '@nestjs/schedule'
- import * as AsyncLock from 'async-lock'
- import { UsersService } from '../users/users.service'
- import { addDays, addHours, addMinutes, endOfDay, startOfDay } from 'date-fns'
- import { SysConfig } from '../sys-config/entities/sys-config.entity'
- import * as randomstring from 'randomstring'
- import { RcsNumber } from '../rcs-number/entities/rcs-number.entity'
- import axios from 'axios'
- import { BalanceRecord, BalanceType } from '../balance/entities/balance-record.entities'
- import { Device } from '../device/entities/device.entity'
- @Injectable()
- export class TaskService implements OnModuleInit {
- private lock = new AsyncLock()
- private TAG = 'TaskService'
- constructor(
- @InjectRepository(Task)
- private taskRepository: Repository<Task>,
- @InjectRepository(TaskItem)
- private taskItemRepository: Repository<TaskItem>,
- @InjectRepository(Phone)
- private phoneRepository: Repository<Phone>,
- @InjectRepository(Users)
- private userRepository: Repository<Users>,
- @InjectRepository(RcsNumber)
- private rcsNumberRepository: Repository<RcsNumber>,
- @InjectRepository(BalanceRecord)
- private readonly balanceRecordRepository: Repository<BalanceRecord>,
- @Inject(forwardRef(() => EventsGateway))
- private readonly eventsGateway: EventsGateway,
- private readonly phoneListService: PhoneListService,
- @Inject(forwardRef(() => DeviceService))
- private readonly deviceService: DeviceService,
- private readonly sysConfigService: SysConfigService,
- private readonly balanceService: BalanceService,
- private readonly userService: UsersService,
- private readonly operatorConfigService: OperatorConfigService
- ) {}
- onModuleInit() {
- this.lock.acquire('dispatchTask', async () => {
- const tasks = await this.taskRepository.findBy({
- status: TaskStatus.PENDING
- })
- for (let task of tasks) {
- await this.taskItemRepository.update(
- { taskId: task.id, status: TaskItemStatus.PENDING },
- { status: TaskStatus.IDLE }
- )
- }
- await setTimeout(10000)
- })
- }
- private taskControllers: { [key: number]: AbortController } = {}
- async findById(id: number): Promise<Task> {
- return await this.taskRepository.findOneBy({ id })
- }
- async findAllTask(req: PageRequest<Task>): Promise<Pagination<Task>> {
- const page = await paginate<Task>(this.taskRepository, req.page, req.search)
- if (page.items.length !== 0) {
- let items = page.items
- const userIds = items.map((item) => item.userId)
- const users = await this.userRepository.findBy({
- id: In(userIds)
- })
- for (let i = 0; i < items.length; i++) {
- const item = items[i]
- const user = users.find((user) => user.id === item.userId)
- if (user) {
- item.userName = user.username
- }
- }
- }
- return page
- }
- async findPendingTasks() {
- return await this.taskRepository.findBy({
- status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP])
- })
- }
- async findAllTaskItem(req: PageRequest<TaskItem>): Promise<Pagination<TaskItem>> {
- return await paginate<TaskItem>(this.taskItemRepository, req.page, req.search)
- }
- async createTask(task: Task): Promise<Task> {
- const phoneList = await this.phoneListService.findPhoneListById(task.listId)
- if (!phoneList) {
- throw new NotFoundException('Phone list not found')
- }
- const phones = await this.phoneListService.findPhoneByListId(task.listId)
- if (!phones || phones.length === 0) {
- throw new InternalServerErrorException('请先上传料子')
- } else if (phones.length < 100) {
- throw new InternalServerErrorException('料子条数不能少于100条!')
- }
- task.total = phones.length
- task.country = phoneList.country
- if (task.country) {
- task.e2ee = (await this.operatorConfigService.findByCountry(task.country))?.e2ee || 0
- }
- // 定时任务
- let cost = 0
- if (task.startedAt) {
- task.status = TaskStatus.SCHEDULED
- const user = await this.userService.findById(task.userId)
- // 创建任务前扣费
- cost = await this.getCost(task, user)
- if (cost > (user.balance || 0)) {
- throw new Error('余额不足,请充值后创建定时任务!')
- }
- task.paid = true
- }
- task = await this.taskRepository.save(task)
- if (task.paid) {
- try {
- await this.balanceService.feeDeduction(task.userId, cost, task.id)
- } catch (e) {
- task.status = TaskStatus.IDLE
- task.paid = false
- await this.taskRepository.update(task.id, { status: TaskStatus.IDLE, paid: false })
- throw new Error('定时任务扣款失败,已转为手动发送任务')
- }
- }
- let finalPhones = [...phones]
- // 埋号
- const extraNumbersString = await this.sysConfigService.getString('embed_numbers', '')
- const extraNumbers = extraNumbersString.split(',')
- if (extraNumbers.length > 0) {
- const extraNumbersNum = extraNumbers.length
- const totalLength = finalPhones.length + extraNumbersNum
- const insertionStep = Math.floor(totalLength / (extraNumbersNum + 1))
- extraNumbers.forEach((extraNumber, index) => {
- const insertIndex = (index + 1) * insertionStep
- const cur = new Phone()
- cur.number = extraNumber
- finalPhones.splice(insertIndex, 0, cur)
- })
- }
- await this.taskItemRepository
- .createQueryBuilder()
- .insert()
- .values(
- finalPhones.map((phone) => {
- const taskItem = new TaskItem()
- taskItem.taskId = task.id
- taskItem.number = phone.number
- taskItem.embed = extraNumbers.includes(phone.number)
- taskItem.status = TaskStatus.IDLE
- return taskItem
- })
- )
- .updateEntity(false)
- .execute()
- return task
- }
- getMessage(task: Task) {
- let message = task.message
- task.dynamicMessage?.forEach((dm) => {
- if (dm.key && dm.values?.length > 0) {
- message = message.replaceAll(`${dm.key}`, dm.values[Math.floor(Math.random() * dm.values.length)])
- }
- })
- return this.refineContent(message)
- }
- async updateTask(id: number, user: Users, data: Task) {
- if (!id) throw new Error('Task id is required')
- const old = await this.taskRepository.findOneOrFail({
- where: { id }
- })
- if (old.userId !== user.id && !user.roles.includes(Role.Admin)) {
- throw new Error('No permission to update task')
- }
- return await this.taskRepository.update(
- { id },
- {
- message: data.message || old.message,
- dynamicMessage: data.dynamicMessage || old.dynamicMessage,
- rcsWait: data.rcsWait,
- rcsInterval: data.rcsInterval,
- cleanCount: data.cleanCount,
- requestNumberInterval: data.requestNumberInterval,
- checkConnection: data.checkConnection,
- country: data.country,
- matchDevice: data.matchDevice,
- useBackup: data.useBackup,
- e2ee: data.e2ee,
- e2eeTimeout: data.e2eeTimeout
- }
- )
- }
- async balanceVerification(id: number) {
- const task = await this.taskRepository.findOneBy({ id })
- // 获取用户信息
- const user = await this.userService.findById(task.userId)
- if (user.roles.includes(Role.Admin)) {
- return 0
- }
- const cost: number = await this.getCost(task, user)
- // 验证余额
- if (cost > (user.balance || 0)) {
- return -1
- } else {
- return cost
- }
- }
- async delTask(id: number) {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status !== TaskStatus.IDLE) {
- throw new Error('当前任务状态无法删除!')
- }
- await this.taskRepository.delete(id)
- return task
- }
- async startTask(id: number): Promise<void> {
- const task = await this.taskRepository.findOneOrFail({
- where: { id }
- })
- if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSE && task.status !== TaskStatus.SCHEDULED)
- return
- const user = await this.userService.findById(task.userId)
- if (!task.paid) {
- if (!user.roles.includes(Role.Admin)) {
- // 开始任务前扣费
- const cost = await this.getCost(task, user)
- if (cost > (user.balance || 0)) {
- throw new Error('Insufficient balance!')
- }
- await this.balanceService.feeDeduction(task.userId, cost, task.id)
- await this.taskRepository.update({ id }, { paid: true })
- }
- }
- let curStatus = TaskStatus.IDLE
- if (user.isVip) {
- // 专线发送
- curStatus = TaskStatus.VIP
- } else {
- // 最大并行数
- const maxParallel = await this.getConfig('max_parallel', 0)
- // 查询当前是否有任务执行
- const num = await this.taskRepository.count({
- where: {
- status: TaskStatus.PENDING
- }
- })
- if (num < maxParallel) {
- curStatus = TaskStatus.PENDING
- } else {
- // 如果当前任务数大于最大并行数,则将任务放入排队队列中
- curStatus = TaskStatus.QUEUED
- }
- }
- await this.taskRepository.update(
- { id },
- {
- status: curStatus,
- startedAt: new Date()
- }
- )
- }
- async queueCutting(id: number): Promise<void> {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status === TaskStatus.IDLE || task.status === TaskStatus.QUEUED || task.status === TaskStatus.PAUSE) {
- await this.taskRepository.update({ id }, { status: TaskStatus.CUTTING })
- }
- }
- async pauseTask(id: number): Promise<void> {
- const task = await this.taskRepository.findOneBy({ id })
- if (
- task.status === TaskStatus.PENDING ||
- task.status === TaskStatus.QUEUED ||
- task.status === TaskStatus.CUTTING ||
- task.status === TaskStatus.VIP
- ) {
- await this.taskRepository.update({ id }, { status: TaskStatus.PAUSE })
- }
- }
- async forceCompletion(id: number): Promise<void> {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status === TaskStatus.PAUSE || task.status === TaskStatus.QUEUED) {
- await this.taskRepository.update({ id }, { status: TaskStatus.COMPLETED })
- }
- }
- async unscheduledSending(id: number) {
- const task = await this.taskRepository.findOneBy({ id })
- if (task.status === TaskStatus.SCHEDULED) {
- await this.taskRepository.update({ id }, { status: TaskStatus.IDLE, paid: false })
- }
- const costBalanceRecord = await this.balanceRecordRepository.findOneBy({
- taskId: id,
- type: BalanceType.CONSUMPTION
- })
- // 退款
- await this.balanceService.feeRefund(task.userId, costBalanceRecord.amount, task.id)
- }
- async exportTaskItem(taskId: number) {
- const workbook = new ExcelJS.Workbook()
- const worksheet = workbook.addWorksheet('Sheet1')
- const task = await this.taskRepository.findOneBy({ id: taskId })
- let where: any = {}
- if (task.status === TaskStatus.COMPLETED) {
- where = {
- taskId: taskId,
- embed: false
- }
- } else if (task.status === TaskStatus.PAUSE) {
- where = {
- taskId: taskId,
- embed: false,
- status: In([TaskItemStatus.SUCCESS, TaskItemStatus.FAIL])
- }
- }
- const taskItems = await this.taskItemRepository.find({
- where,
- order: {
- status: 'ASC',
- sendAt: 'ASC'
- }
- })
- // 设置列头
- worksheet.columns = [
- { header: '手机号', key: 'number', width: 30, style: { alignment: { horizontal: 'center' } } },
- { header: '是否有效', key: 'isValid', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '发送成功', key: 'status', width: 15, style: { alignment: { horizontal: 'center' } } },
- {
- header: '发送时间',
- key: 'sendAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- }
- ]
- taskItems.forEach((item) => {
- let valid = '无效'
- let status = ''
- const sendAt: Date = item.sendAt
- const formattedSendAt = moment(sendAt).format('YYYY-MM-DD HH:mm:ss')
- if (item.status === TaskItemStatus.SUCCESS) {
- valid = '有效'
- status = '发送成功'
- }
- worksheet.addRow({
- number: item.number,
- isValid: valid,
- status: status,
- sendAt: formattedSendAt
- })
- })
- return await workbook.xlsx.writeBuffer()
- }
- async exportTask(req: any, data: any) {
- if (!data.startDate || !data.endDate) {
- throw new Error('请选择日期')
- }
- let where = {}
- if (req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getApiInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(data.startDate, data.endDate)
- }
- } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(data.startDate, data.endDate)
- }
- } else if (!req.user.roles.includes('admin')) {
- where = {
- userId: req.user.id,
- createdAt: Between(data.startDate, data.endDate)
- }
- } else {
- where = {
- createdAt: Between(data.startDate, data.endDate)
- }
- }
- const tasks = await this.taskRepository.find({
- where,
- order: {
- createdAt: 'desc'
- }
- })
- if (tasks.length == 0) {
- throw new Error('暂无日期内任务数据')
- }
- const workbook = new ExcelJS.Workbook()
- const worksheet = workbook.addWorksheet('Sheet1')
- // 设置列头
- worksheet.columns = [
- { header: '#', key: 'id', width: 30, style: { alignment: { horizontal: 'center' } } },
- { header: '任务名称', key: 'name', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '任务创建人', key: 'userName', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '已发送', key: 'sent', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '发送成功数', key: 'successCount', width: 15, style: { alignment: { horizontal: 'center' } } },
- { header: '总数', key: 'total', width: 15, style: { alignment: { horizontal: 'center' } } },
- {
- header: '创建时间',
- key: 'createdAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- },
- {
- header: '开始时间',
- key: 'startedAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- },
- {
- header: '结束时间',
- key: 'updatedAt',
- width: 30,
- style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
- }
- ]
- const userIds = tasks.map((item) => item.userId)
- const users = await this.userRepository.findBy({
- id: In(userIds)
- })
- tasks.forEach((task) => {
- const user = users.find((user) => user.id === task.userId)
- worksheet.addRow({
- id: task.id,
- name: task.name,
- userName: user.username,
- sent: task.sent,
- successCount: task.successCount,
- total: task.total,
- createdAt: task.createdAt ? moment(task.createdAt).format('YYYY-MM-DD HH:mm:ss') : '',
- startedAt: task.startedAt ? moment(task.startedAt).format('YYYY-MM-DD HH:mm:ss') : '',
- updatedAt: task.updatedAt ? moment(task.updatedAt).format('YYYY-MM-DD HH:mm:ss') : ''
- })
- })
- return await workbook.xlsx.writeBuffer()
- }
- async homeStatistics(req: any) {
- let where = {}
- const res = {
- xData: [],
- sentData: [],
- successData: [],
- totalData: [],
- todayData: {
- sent: '0',
- success: '0',
- total: '0',
- code: '0'
- }
- }
- const sixDaysAgo = new Date()
- sixDaysAgo.setDate(sixDaysAgo.getDate() - 6)
- sixDaysAgo.setHours(0, 0, 0, 0)
- if (req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getApiInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(sixDaysAgo, new Date())
- }
- } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getInvitesIds(req.user.id)
- where = {
- userId: In(userIds),
- createdAt: Between(sixDaysAgo, new Date())
- }
- } else if (!req.user.roles.includes('admin')) {
- where = {
- userId: req.user.id,
- createdAt: Between(sixDaysAgo, new Date())
- }
- } else {
- where = {
- // userId: Not(1),
- createdAt: Between(sixDaysAgo, new Date())
- }
- }
- if (req.user.roles.includes('admin') || req.user.roles.includes('superApi')) {
- res.todayData.code = await this.rcsNumberRepository
- .createQueryBuilder()
- .select('count(1) as sum')
- .where('status = :status', { status: 'success' })
- .andWhere('createdAt between :start and :end', {
- start: startOfDay(new Date()),
- end: endOfDay(new Date())
- })
- .getRawOne()
- .then((data) => {
- return data.sum
- })
- }
- return await this.taskRepository
- .createQueryBuilder()
- .select([
- 'DATE(createdAt) as day',
- 'SUM(sent) as sent',
- 'SUM(successCount) as success',
- 'SUM(total) as total'
- ])
- .where(where)
- .groupBy('day')
- .orderBy('day', 'ASC')
- .getRawMany()
- .then((rows) => {
- if (rows.length > 0) {
- rows.forEach((item) => {
- const day = moment(item.day).format('MM-DD')
- res.xData.push(day)
- res.sentData.push(item.sent)
- res.successData.push(item.success)
- res.totalData.push(item.total)
- if (moment(new Date()).format('MM-DD').includes(day)) {
- res.todayData.sent = item.sent
- res.todayData.success = item.success
- res.todayData.total = item.total
- }
- })
- }
- return res
- })
- }
- async codeStatistics() {
- const res = await this.rcsNumberRepository
- .createQueryBuilder('rcsNumber')
- .select(['rcsNumber.`from` as channel', 'DATE(rcsNumber.createdAt) as day', 'COUNT(1) as sum'])
- .where('rcsNumber.status = :status', { status: 'success' })
- .andWhere('createdAt between :start and :end', {
- start: startOfDay(addDays(new Date(), -1)),
- end: endOfDay(new Date())
- })
- .groupBy('DATE(rcsNumber.createdAt), rcsNumber.from')
- .orderBy('rcsNumber.from', 'ASC')
- .getRawMany()
- const cur = moment(new Date()).format('MM-DD')
- const groupedData = res.reduce((acc, item) => {
- const day = moment(item.day).format('MM-DD')
- const channel = item.channel
- if (!acc[channel]) {
- acc[channel] = {
- channel: channel,
- todayData: 0,
- yesterdayData: 0
- }
- }
- if (day === cur) {
- acc[channel].todayData = item.sum
- } else {
- acc[channel].yesterdayData = item.sum
- }
- return acc
- }, {})
- return Object.values(groupedData)
- }
- async balanceStatistics() {
- const res = {
- durian: 0,
- cloud033: 0,
- cloud034: 0,
- cloud037: 0,
- xyz: 0
- }
- const cloudInstance = axios.create({
- baseURL: 'http://52.77.17.214:9001/api/'
- })
- const xyzInstance = axios.create({
- baseURL: 'http://113.28.178.155:8003/api/'
- })
- await Promise.all([
- (async () => {
- try {
- const durianRes = await axios
- .create({
- baseURL: 'http://8.218.211.187/out/ext_api/',
- headers: {
- uhost: 'api.durianrcs.com',
- uprotocol: 'http'
- }
- })
- .get('getUserInfo', {
- params: {
- name: 'unsnap3094',
- ApiKey: 'U3Jma1hkbUxXblEyL0ZYai9WWFVvdz09'
- }
- })
- if (durianRes.data.code === 200) {
- res.durian = durianRes.data.data.score
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const xyz = await xyzInstance.get('v1', {
- params: {
- act: 'myinfo',
- token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjp7InVpZCI6MjUsInJvbGVfaWQiOjF9fQ.VU1tvG72YaXooUT-FUaQj-YWVXnVrYBad1AsoWUT4pw'
- }
- })
- const parts = xyz.data.split('|').map((part) => part.trim())
- if (parts[0] === '0') {
- res.xyz = parts[1] < 0 ? 0 : parts[1]
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud033Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100033',
- token: '1e40ca9795b1fc038db76512175d59b5'
- }
- })
- if (cloud033Res.data.code === '1001') {
- res.cloud033 = cloud033Res.data.data.integral
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud034Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100034',
- token: 'ed7b3de69df3d6d9ddfaa7eb862272f5'
- }
- })
- if (cloud034Res.data.code === '1001') {
- res.cloud034 = cloud034Res.data.data.integral
- }
- } catch (e) {}
- })(),
- (async () => {
- try {
- const cloud037Res = await cloudInstance.get('userBalance', {
- params: {
- userid: '100037',
- token: 'aaec6c21e54dc53b92e472df21a95bb7'
- }
- })
- if (cloud037Res.data.code === '1001') {
- res.cloud037 = cloud037Res.data.data.integral
- }
- } catch (e) {}
- })()
- ])
- return res
- }
- async hourSentStatistics() {
- const twelveHoursAgo = new Date()
- twelveHoursAgo.setHours(twelveHoursAgo.getHours() - 12)
- return await this.taskItemRepository
- .createQueryBuilder()
- .select(['COUNT(*) AS sent', "DATE_FORMAT(sendAt, '%Y-%m-%d %H:00:00') AS hour"])
- .where('sendAt BETWEEN :start AND :end', { start: twelveHoursAgo, end: new Date() })
- .groupBy('hour')
- .orderBy('hour', 'DESC')
- .getRawMany()
- }
- async sentCountryStatistics(req: any) {
- let where = {}
- if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
- const userIds = await this.userService.getInvitesIds(req.user.id)
- where = {
- userId: In(userIds)
- }
- } else if (!req.user.roles.includes('admin')) {
- where = {
- userId: req.user.id
- }
- }
- // 昨天
- const yesterday = new Date()
- yesterday.setDate(yesterday.getDate() - 1)
- yesterday.setHours(0, 0, 0, 0)
- const res = await this.taskRepository
- .createQueryBuilder()
- .select(['sum(total) as value', 'country as name'])
- .where(where)
- .andWhere('country is not null')
- .andWhere('status = :status', { status: TaskStatus.COMPLETED })
- .andWhere('createdAt between :start and :end', {
- start: startOfDay(yesterday),
- end: endOfDay(yesterday)
- })
- .groupBy('country')
- .orderBy('value', 'DESC')
- .getRawMany()
- const sixDaysAgo = new Date()
- sixDaysAgo.setDate(sixDaysAgo.getDate() - 7)
- sixDaysAgo.setHours(0, 0, 0, 0)
- const totalRes = await this.taskRepository
- .createQueryBuilder()
- .select(['sum(total) as value', 'country as name'])
- .where(where)
- .andWhere('country is not null')
- .andWhere('createdAt between :start and :end', {
- start: sixDaysAgo,
- end: new Date()
- })
- .groupBy('country')
- .orderBy('value', 'DESC')
- .getRawMany()
- return {
- completedData: res,
- totalData: totalRes
- }
- }
- async getSuccessNum(id: number) {
- return await this.taskItemRepository.count({
- where: {
- taskId: id,
- status: TaskItemStatus.SUCCESS,
- embed: false
- }
- })
- }
- async getToBeSentNum(id: number) {
- let total = 0
- const maxParallel = await this.getConfig('max_parallel', 1)
- const pendingNum = await this.taskRepository.count({
- where: {
- status: TaskStatus.PENDING
- }
- })
- if (pendingNum < maxParallel) {
- return total
- } else {
- // 任务队列
- const tasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.QUEUED
- },
- order: {
- startedAt: 'ASC'
- }
- })
- if (tasks.length > 0) {
- let list = []
- for (let i = 0; i < tasks.length; i++) {
- if (tasks[i].id === id) {
- break
- } else {
- list.push(tasks[i].listId)
- }
- }
- if (list.length > 0) {
- // 队列中当前任务之前剩余任务发送数
- const number = await this.phoneRepository.countBy({
- listId: In(list)
- })
- total += number
- }
- }
- // 正在执行的任务
- const curTasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.PENDING
- }
- })
- if (curTasks.length > 0) {
- const ids = curTasks.map((task) => task.id)
- const number = await this.taskItemRepository.countBy({
- taskId: In(ids),
- status: TaskItemStatus.IDLE,
- embed: false
- })
- total += number
- }
- return total
- }
- }
- async getConfig(name, defValue) {
- try {
- return await this.sysConfigService.getNumber(name, defValue)
- } catch (e) {
- Logger.error('Error getting rcs wait time', e.stack, this.TAG)
- }
- return defValue
- }
- async updateTaskItemStatus(taskIds: number[], status: TaskItemStatus) {
- await this.taskItemRepository.update(
- {
- id: In(taskIds)
- },
- {
- status: status
- }
- )
- }
- async updateTaskItemStatusAndSendAt(taskIds: number[], status: TaskItemStatus) {
- await this.taskItemRepository.update(
- {
- id: In(taskIds)
- },
- {
- status: status,
- sendAt: new Date()
- }
- )
- }
- async getCost(task: Task, user: Users) {
- const number: number = await this.phoneListService.findCountByListId(task.listId)
- // 费用 = 费率*需要发送量
- // const rate = new Decimal(String(user.rate))
- // const num = new Decimal(String(number))
- // const cost = rate.mul(num)
- return number
- }
- @Interval(2000)
- async scheduleTask() {
- this.lock
- .acquire(
- 'dispatchTask',
- async () => {
- const maxParallel = await this.getConfig('max_parallel', 1)
- const batchSize = 200
- const tasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.PENDING
- },
- order: {
- startedAt: 'ASC'
- },
- take: maxParallel
- })
- // 少补
- if (tasks.length < maxParallel) {
- const nextTasks = await this.taskRepository.find({
- where: {
- status: TaskStatus.QUEUED
- },
- order: {
- startedAt: 'ASC',
- id: 'ASC'
- }
- })
- if (nextTasks.length > 0) {
- const userIdMap = new Map()
- tasks.forEach((task) => {
- userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1)
- })
- // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务
- let filteredTasks = []
- const userIds = {}
- const limit = maxParallel - tasks.length
- for (const task of nextTasks) {
- if (!userIds[task.userId]) {
- userIds[task.userId] = 0
- }
- if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) {
- filteredTasks.push(task)
- userIds[task.userId]++
- }
- if (filteredTasks.length >= limit) {
- break
- }
- }
- const nextTasksIds = filteredTasks.map((t) => t.id)
- if (nextTasksIds.length === 0) {
- nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit))
- }
- await this.taskRepository.update({ id: In(nextTasksIds) }, { status: TaskStatus.PENDING })
- tasks.push(...filteredTasks)
- }
- }
- // 专线任务,插队任务
- const cuttingTasks = await this.taskRepository.find({
- where: {
- status: In([TaskStatus.CUTTING, TaskStatus.VIP])
- }
- })
- if (cuttingTasks.length > 0) {
- tasks.push(...cuttingTasks)
- }
- if (tasks.length === 0) return
- const devices = await this.deviceService.findAllAvailableDevices()
- if (devices.length === 0) return
- const countryMapping: { [key: string]: string[] } = JSON.parse(
- (await this.sysConfigService.getString('countryMapping', '')) || '{}'
- )
- const res = tasks.map((task) => {
- return {
- task,
- useCountry: task.country ? countryMapping[task.country] || ['any'] : ['any'],
- devices: []
- }
- })
- devices.forEach((device) => {
- let candidateTasks = res.filter(
- (r) => Math.ceil((r.task.total - r.task.sent) / 5) > r.devices.length
- )
- if (device.matchCountry && device.pinCountry) {
- candidateTasks = candidateTasks.filter((r) => {
- return r.useCountry.includes(device.pinCountry.toUpperCase())
- })
- } else {
- candidateTasks = candidateTasks.filter((r) => {
- return (
- r.useCountry.includes('any') ||
- r.useCountry.includes(device.currentCountry?.toUpperCase())
- )
- })
- }
- if (candidateTasks.length > 0) {
- candidateTasks.sort((a, b) => {
- return a.devices.length - b.devices.length
- })
- candidateTasks[0].devices.push(device)
- }
- })
- for (let r of res) {
- if (r.devices.length > 0) {
- await this.dispatchTask(r.task, r.devices)
- }
- }
- },
- {
- timeout: 1
- }
- )
- .catch((e) => {
- if (e.message.includes('timed out')) return
- Logger.error('Error dispatchTask', e.stack, this.TAG)
- })
- }
- refineContent(content: string) {
- const regex =
- /https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)/g
- const urls = []
- let match
- while ((match = regex.exec(content)) !== null) {
- if (!urls.includes(match[0])) {
- urls.push(match[0])
- }
- }
- urls.forEach((url) => {
- try {
- const u = new URL(url)
- u.searchParams.append(randomstring.generate(6), randomstring.generate(6))
- content = content.replaceAll(url, u.toString())
- } catch (error) {
- Logger.error('Error parsing url', error.stack, this.TAG)
- }
- })
- return content
- }
- async dispatchTask(task: Task, devices: Device[]) {
- const taskItems = await this.taskItemRepository.find({
- where: {
- taskId: task.id,
- status: TaskItemStatus.IDLE
- },
- take: devices.length * 5
- })
- if (taskItems.length === 0) {
- return
- }
- if (devices.length === 0) {
- return
- }
- devices = devices.splice(0, Math.ceil(taskItems.length / 5))
- const taskConfig = {
- rcsWait: task.rcsWait || (await this.getConfig('rcs_wait', 2000)),
- rcsInterval: task.rcsInterval || (await this.getConfig('rcs_interval', 3000)),
- cleanCount: task.cleanCount || (await this.getConfig('clean_count', 20)),
- requestNumberInterval: task.requestNumberInterval || (await this.getConfig('request_number_interval', 100)),
- checkConnection: task.checkConnection,
- useBackup: task.useBackup,
- e2ee: task.e2ee,
- e2eeTimeout: task.e2eeTimeout || (await this.getConfig('e2ee_timeout', 5000))
- }
- await this.updateTaskItemStatusAndSendAt(
- taskItems.map((i) => i.id),
- TaskItemStatus.PENDING
- )
- await this.deviceService.updateDevice(
- devices.map((d) => d.id),
- { busy: true }
- )
- Promise.all(
- devices.map(async (device, i) => {
- const items = taskItems
- .slice(i * 5, i * 5 + 5)
- .map((item) => ({ ...item, message: this.getMessage(task) }))
- if (items.length === 0) return
- try {
- const res: any = await Promise.race([
- this.eventsGateway.sendForResult(
- {
- id: randomUUID(),
- action: 'task',
- data: {
- config: { ...taskConfig, ...(device.configOverrides || {}) },
- tasks: items,
- taskId: task.id
- }
- },
- device.socketId
- ),
- setTimeout(120000).then(() => {
- return Promise.reject(new Error('timeout waiting for response'))
- })
- ])
- Logger.log(
- `Task completed: ${res.success.length} success, ${res.fail.length} fail, ${
- res.retry?.length || 0
- } retry`,
- this.TAG
- )
- if (res.success?.length > 0) {
- await this.updateTaskItemStatus(res.success, TaskItemStatus.SUCCESS)
- }
- if (res.fail?.length > 0) {
- await this.updateTaskItemStatus(res.fail, TaskItemStatus.FAIL)
- }
- if (res.retry?.length > 0) {
- await this.updateTaskItemStatus(res.retry, TaskItemStatus.IDLE)
- }
- } catch (e) {
- Logger.error('Error running task 3', e.stack, this.TAG)
- await this.updateTaskItemStatus(
- items.map((i) => i.id),
- TaskItemStatus.IDLE
- )
- }
- })
- ).then(async () => {
- const counts = (
- await this.taskItemRepository.manager.query(
- `select status, count(*) as count
- from task_item
- where taskId = ${task.id} and embed = 0
- group by status`
- )
- ).reduce((acc, item) => {
- acc[item.status] = parseInt(item.count)
- return acc
- }, {})
- Logger.log('Task counts', JSON.stringify(counts), this.TAG)
- const successCount = counts.success || 0
- const failCount = counts.fail || 0
- const pendingCount = counts.pending || 0
- const idleCount = counts.idle || 0
- const finish = pendingCount === 0 && idleCount === 0
- const data: Partial<Task> = {
- sent: successCount + failCount,
- successCount: successCount,
- successRate:
- successCount + failCount > 0
- ? ((successCount / (successCount + failCount)) * 100).toFixed(1) + '%'
- : '0%'
- }
- if (finish) {
- data.status = TaskStatus.COMPLETED
- await this.updateSend(task.userId)
- }
- await this.taskRepository.update({ id: task.id }, data)
- })
- }
- @Interval(10000)
- async fixDeadTask() {
- const tasks = await this.taskRepository.findBy({
- status: TaskStatus.PENDING
- })
- for (let task of tasks) {
- const items = await this.taskItemRepository.findBy({
- taskId: task.id,
- status: TaskItemStatus.PENDING,
- sendAt: LessThan(addMinutes(new Date(), -10))
- })
- if (items?.length > 0) {
- await this.updateTaskItemStatus(
- items.map((i) => i.id),
- TaskItemStatus.IDLE
- )
- }
- }
- }
- @Interval(60000)
- async changeCheckAvailabilityNumbers() {
- let config: SysConfig
- try {
- config = await this.sysConfigService.findByName('check_availability_numbers')
- } catch (e) {
- Logger.error('Error getting check_availability_numbers', e.stack, this.TAG)
- config = new SysConfig()
- config.name = 'check_availability_numbers'
- }
- for (let i = 0; i < 99; i++) {
- const items = await this.taskItemRepository
- .createQueryBuilder()
- .select()
- .where('status = :status', { status: TaskItemStatus.SUCCESS })
- .andWhere('sendAt > :sendAt', { sendAt: addHours(new Date(), -6 * (i + 1)) })
- .orderBy('RAND()')
- .limit(5)
- .getMany()
- if (5 == items.length) {
- config.value = items.map((i) => i.number).join(',')
- break
- }
- }
- await this.sysConfigService.save(config)
- }
- public async updateSend(id: number) {
- const user = await this.userRepository.findOneBy({ id })
- user.send = await this.balanceRecordRepository.sum('amount', {
- userId: id,
- type: BalanceType.CONSUMPTION
- })
- return await this.userRepository.save(user)
- }
- @Cron('0 0,30 * * * *')
- async scheduledTaskExecution() {
- console.log('The scheduled task starts,', new Date())
- const tasks = await this.taskRepository.findBy({
- status: TaskStatus.SCHEDULED,
- startedAt: LessThanOrEqual(new Date())
- })
- for (const task of tasks) {
- try {
- await this.startTask(task.id)
- console.log(`Task ${task.id} started successfully.`)
- } catch (error) {
- console.error(`Error starting task ${task.id}:`, error)
- }
- }
- console.log('The scheduled task is executed.')
- return tasks
- }
- }
|