task.service.ts 65 KB


  1. import { OperatorConfigService } from './../operator_config/operator_config.service'
  2. import { PhoneListService } from './../phone-list/phone-list.service'
  3. import {
  4. forwardRef,
  5. Inject,
  6. Injectable,
  7. InternalServerErrorException,
  8. Logger,
  9. NotFoundException,
  10. OnModuleInit
  11. } from '@nestjs/common'
  12. import { InjectRepository } from '@nestjs/typeorm'
  13. import { ConfusionType, Task, TaskStatus } from './entities/task.entity'
  14. import { Between, In, LessThan, LessThanOrEqual, Repository } from 'typeorm'
  15. import { TaskItem, TaskItemStatus } from './entities/task-item.entity'
  16. import { PageRequest } from '../common/dto/page-request'
  17. import { paginate, Pagination } from 'nestjs-typeorm-paginate'
  18. import { EventsGateway } from '../events/events.gateway'
  19. import { randomUUID } from 'crypto'
  20. import { setTimeout } from 'timers/promises'
  21. import { DeviceService } from '../device/device.service'
  22. import { SysConfigService } from '../sys-config/sys-config.service'
  23. import { Users } from '../users/entities/users.entity'
  24. import * as ExcelJS from 'exceljs'
  25. import * as moment from 'moment'
  26. import { BalanceService } from '../balance/balance.service'
  27. import { Role } from '../model/role.enum'
  28. import { Phone } from '../phone-list/entities/phone.entity'
  29. import { Cron, Interval } from '@nestjs/schedule'
  30. import * as AsyncLock from 'async-lock'
  31. import { UsersService } from '../users/users.service'
  32. import { addDays, addHours, addMinutes, endOfDay, startOfDay } from 'date-fns'
  33. import { SysConfig } from '../sys-config/entities/sys-config.entity'
  34. import * as randomstring from 'randomstring'
  35. import { RcsNumber } from '../rcs-number/entities/rcs-number.entity'
  36. import axios from 'axios'
  37. import { BalanceRecord, BalanceType } from '../balance/entities/balance-record.entity'
  38. import { Device } from '../device/entities/device.entity'
  39. import Decimal from 'decimal.js'
  40. import { CountryConfigService } from '../country-config/country-config.service'
  41. import { TaskResult } from './types'
  42. export const batchSize = 9
  43. @Injectable()
  44. export class TaskService implements OnModuleInit {
  45. private lock = new AsyncLock()
  46. private TAG = 'TaskService'
  47. constructor(
  48. @InjectRepository(Task)
  49. private taskRepository: Repository<Task>,
  50. @InjectRepository(TaskItem)
  51. private taskItemRepository: Repository<TaskItem>,
  52. @InjectRepository(Phone)
  53. private phoneRepository: Repository<Phone>,
  54. @InjectRepository(Users)
  55. private userRepository: Repository<Users>,
  56. @InjectRepository(RcsNumber)
  57. private rcsNumberRepository: Repository<RcsNumber>,
  58. @InjectRepository(BalanceRecord)
  59. private readonly balanceRecordRepository: Repository<BalanceRecord>,
  60. @Inject(forwardRef(() => EventsGateway))
  61. private readonly eventsGateway: EventsGateway,
  62. private readonly phoneListService: PhoneListService,
  63. @Inject(forwardRef(() => DeviceService))
  64. private readonly deviceService: DeviceService,
  65. private readonly sysConfigService: SysConfigService,
  66. private readonly balanceService: BalanceService,
  67. private readonly userService: UsersService,
  68. private readonly operatorConfigService: OperatorConfigService,
  69. private readonly countryConfigService: CountryConfigService
  70. ) {}
  71. async onModuleInit() {
  72. this.lock.acquire('dispatchTask', async () => {
  73. const tasks = await this.taskRepository.findBy({
  74. status: TaskStatus.PENDING
  75. })
  76. for (let task of tasks) {
  77. await this.taskItemRepository.update(
  78. { taskId: task.id, status: TaskItemStatus.PENDING },
  79. { status: TaskStatus.IDLE }
  80. )
  81. }
  82. await setTimeout(10000)
  83. })
  84. // this.mqttClient = await mqttConnect({
  85. // host: '47.98.225.28',
  86. // port: 1883,
  87. // username: 'rcs',
  88. // password: '3edc#EDC',
  89. // connectTimeout: 10000
  90. // })
  91. }
  92. private taskControllers: { [key: number]: AbortController } = {}
  93. async findById(id: number): Promise<Task> {
  94. return await this.taskRepository.findOneBy({ id })
  95. }
  96. async findAllTask(req: PageRequest<Task>): Promise<Pagination<Task>> {
  97. const page = await paginate<Task>(this.taskRepository, req.page, req.search)
  98. if (page.items.length !== 0) {
  99. let items = page.items
  100. const userIds = items.map((item) => item.userId)
  101. const users = await this.userRepository.findBy({
  102. id: In(userIds)
  103. })
  104. for (let i = 0; i < items.length; i++) {
  105. const item = items[i]
  106. const user = users.find((user) => user.id === item.userId)
  107. if (user) {
  108. item.userName = user.username
  109. }
  110. }
  111. }
  112. return page
  113. }
  114. async findPendingTasks() {
  115. return await this.taskRepository.findBy({
  116. status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP])
  117. })
  118. }
  119. async findAllTaskItem(req: PageRequest<TaskItem>): Promise<Pagination<TaskItem>> {
  120. return await paginate<TaskItem>(this.taskItemRepository, req.page, req.search)
  121. }
  122. async createTask(task: Task): Promise<Task> {
  123. const phoneList = await this.phoneListService.findPhoneListById(task.listId)
  124. if (!phoneList) {
  125. throw new NotFoundException('Phone list not found')
  126. }
  127. const phones = await this.phoneListService.findPhoneByListId(task.listId)
  128. if (!phones || phones.length === 0) {
  129. throw new InternalServerErrorException('请先上传料子')
  130. } else if (phones.length < 100) {
  131. throw new InternalServerErrorException('料子条数不能少于100条!')
  132. }
  133. task.message = task.message || ''
  134. task.total = phones.length
  135. task.country = phoneList.country
  136. task.groupMode = task.groupMode ?? (await this.sysConfigService.getBoolean('group_mode', false))
  137. if (task.country) {
  138. const countryConfig = await this.countryConfigService.getDestConfig(task.country)
  139. task.useBackup = countryConfig.useBackup
  140. task.e2ee = countryConfig.e2ee
  141. }
  142. // 任务混淆
  143. if (task.confusion && task.confusion.includes('head') && task.confusion.includes('end')) {
  144. task.confusion = 'both'
  145. }
  146. // 定时任务
  147. let cost = new Decimal(0)
  148. if (task.startedAt) {
  149. task.status = TaskStatus.SCHEDULED
  150. const user = await this.userService.findById(task.userId)
  151. // 创建任务前扣费
  152. cost = await this.getCost(task, user)
  153. if (new Decimal(cost).comparedTo(user.balance || new Decimal(0)) > 0) {
  154. throw new Error('余额不足,请充值后创建定时任务!')
  155. }
  156. task.paid = true
  157. }
  158. // 用户单条号码最大发送数
  159. const users = await this.userRepository.findOneBy({ id: task.userId })
  160. if (users.maxSend > 0) {
  161. task.singleQty = users.maxSend
  162. }
  163. task = await this.taskRepository.save(task)
  164. if (task.paid) {
  165. try {
  166. await this.balanceService.feeDeduction(task.userId, cost, task.id)
  167. } catch (e) {
  168. task.status = TaskStatus.IDLE
  169. task.paid = false
  170. await this.taskRepository.update(task.id, { status: TaskStatus.IDLE, paid: false })
  171. throw new Error('定时任务扣款失败,已转为手动发送任务')
  172. }
  173. }
  174. let finalPhones = [...phones]
  175. // 埋号
  176. let extraNumbers = []
  177. if (users.isEmbedNumber) {
  178. const extraNumbersString = await this.sysConfigService.getString('embed_numbers', '')
  179. extraNumbers = extraNumbersString.split(',')
  180. // 少于2100埋号减少一半
  181. if (task.total < 2100) {
  182. const half = Math.floor(extraNumbers.length / 2)
  183. extraNumbers.splice(half)
  184. }
  185. if (extraNumbers.length > 0) {
  186. const extraNumbersNum = extraNumbers.length
  187. const totalLength = finalPhones.length + extraNumbersNum
  188. const insertionStep = Math.floor(totalLength / (extraNumbersNum + 1))
  189. extraNumbers.forEach((extraNumber, index) => {
  190. const insertIndex = (index + 1) * insertionStep
  191. const cur = new Phone()
  192. cur.number = extraNumber
  193. finalPhones.splice(insertIndex, 0, cur)
  194. })
  195. }
  196. }
  197. await this.taskItemRepository
  198. .createQueryBuilder()
  199. .insert()
  200. .values(
  201. finalPhones.map((phone) => {
  202. const taskItem = new TaskItem()
  203. taskItem.taskId = task.id
  204. taskItem.number = phone.number
  205. taskItem.embed = extraNumbers.includes(phone.number)
  206. taskItem.status = TaskStatus.IDLE
  207. return taskItem
  208. })
  209. )
  210. .updateEntity(false)
  211. .execute()
  212. return task
  213. }
  214. getMessage(task: Task) {
  215. let message = task.message
  216. if (!message) {
  217. return ''
  218. }
  219. task.dynamicMessage?.forEach((dm) => {
  220. if (dm.key && dm.values?.length > 0) {
  221. message = message.replaceAll(`${dm.key}`, dm.values[Math.floor(Math.random() * dm.values.length)])
  222. }
  223. })
  224. // 内容混淆
  225. if (task.confusion !== ConfusionType.NONE) {
  226. const timestamp = Math.round(Date.now() / 1000)
  227. // 六位随机数
  228. const randomNumber = Math.floor(Math.random() * 1000000)
  229. const confusionText = `${task.id}-${randomNumber}-msg-${timestamp}`
  230. switch (task.confusion) {
  231. case ConfusionType.HEAD:
  232. message = `${confusionText}\n` + message
  233. break
  234. case ConfusionType.END:
  235. message += `\n${confusionText}`
  236. break
  237. case ConfusionType.BOTH:
  238. message = `${confusionText}\n` + message + `\n${confusionText}`
  239. break
  240. default:
  241. }
  242. }
  243. return this.refineContent(message)
  244. }
  245. async updateTask(id: number, user: Users, data: Task) {
  246. if (!id) throw new Error('Task id is required')
  247. const old = await this.taskRepository.findOneOrFail({
  248. where: { id }
  249. })
  250. if (old.userId !== user.id && !user.roles.includes(Role.Admin)) {
  251. throw new Error('No permission to update task')
  252. }
  253. // 任务混淆
  254. if (data.confusion && data.confusion.includes('head') && data.confusion.includes('end')) {
  255. data.confusion = 'both'
  256. } else if (data.confusion?.length === 0) {
  257. data.confusion = ConfusionType.NONE
  258. }
  259. return await this.taskRepository.update(
  260. { id },
  261. {
  262. message: data.message === '' ? '' : data.message || old.message,
  263. img: data.img || old.img,
  264. dynamicMessage: data.dynamicMessage || old.dynamicMessage,
  265. singleQty: data.singleQty,
  266. singleTimeout: data.singleTimeout,
  267. singleDelay: data.singleDelay,
  268. groupMode: data.groupMode,
  269. groupQty: data.groupQty,
  270. groupSize: data.groupSize,
  271. groupTimeout: data.groupTimeout,
  272. groupDelay: data.groupDelay,
  273. cleanCount: data.cleanCount,
  274. checkConnection: data.checkConnection,
  275. country: data.country,
  276. matchDevice: data.matchDevice,
  277. useBackup: data.useBackup,
  278. e2ee: data.e2ee,
  279. e2eeTimeout: data.e2eeTimeout,
  280. confusion: data.confusion,
  281. remark: data.remark
  282. }
  283. )
  284. }
  285. async balanceVerification(id: number) {
  286. const task = await this.taskRepository.findOneBy({ id })
  287. // 获取用户信息
  288. const user = await this.userService.findById(task.userId)
  289. if (user.roles.includes(Role.Admin)) {
  290. return 0
  291. }
  292. const cost = await this.getCost(task, user)
  293. // 验证余额
  294. if (cost.comparedTo(user.balance || new Decimal(0)) > 0) {
  295. return -1
  296. } else {
  297. return cost
  298. }
  299. }
  300. async delTask(id: number) {
  301. const task = await this.taskRepository.findOneBy({ id })
  302. if (task.status !== TaskStatus.IDLE) {
  303. throw new Error('当前任务状态无法删除!')
  304. }
  305. await this.taskRepository.delete(id)
  306. return task
  307. }
  308. async startTask(id: number): Promise<void> {
  309. const task = await this.taskRepository.findOneOrFail({
  310. where: { id }
  311. })
  312. if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSE && task.status !== TaskStatus.SCHEDULED)
  313. return
  314. const user = await this.userService.findById(task.userId)
  315. if (!task.paid) {
  316. if (!user.roles.includes(Role.Admin)) {
  317. // 开始任务前扣费
  318. const cost = await this.getCost(task, user)
  319. if (cost.comparedTo(user.balance || new Decimal(0)) > 0) {
  320. throw new Error('Insufficient balance!')
  321. }
  322. await this.balanceService.feeDeduction(task.userId, cost, task.id)
  323. await this.taskRepository.update({ id }, { paid: true })
  324. }
  325. }
  326. let curStatus = TaskStatus.IDLE
  327. if (user.isVip) {
  328. // 专线发送
  329. curStatus = TaskStatus.VIP
  330. } else {
  331. // 最大并行数
  332. const maxParallel = await this.getConfig('max_parallel', 0)
  333. // 查询当前是否有任务执行
  334. const num = await this.taskRepository.count({
  335. where: {
  336. status: TaskStatus.PENDING
  337. }
  338. })
  339. if (num < maxParallel) {
  340. curStatus = TaskStatus.PENDING
  341. } else {
  342. // 如果当前任务数大于最大并行数,则将任务放入排队队列中
  343. curStatus = TaskStatus.QUEUED
  344. }
  345. }
  346. await this.taskRepository.update(
  347. { id },
  348. {
  349. status: curStatus,
  350. startedAt: new Date()
  351. }
  352. )
  353. }
  354. async queueCutting(id: number): Promise<void> {
  355. const task = await this.taskRepository.findOneBy({ id })
  356. if (task.status === TaskStatus.IDLE || task.status === TaskStatus.QUEUED || task.status === TaskStatus.PAUSE) {
  357. await this.taskRepository.update({ id }, { status: TaskStatus.CUTTING })
  358. }
  359. }
  360. async pauseTask(id: number): Promise<void> {
  361. const task = await this.taskRepository.findOneBy({ id })
  362. if (
  363. task.status === TaskStatus.PENDING ||
  364. task.status === TaskStatus.QUEUED ||
  365. task.status === TaskStatus.CUTTING ||
  366. task.status === TaskStatus.VIP
  367. ) {
  368. await this.taskRepository.update({ id }, { status: TaskStatus.PAUSE })
  369. }
  370. }
  371. async forceCompletion(id: number): Promise<void> {
  372. const task = await this.taskRepository.findOneBy({ id })
  373. if (task.status === TaskStatus.PAUSE || task.status === TaskStatus.QUEUED) {
  374. await this.taskRepository.update({ id }, { status: TaskStatus.COMPLETED })
  375. }
  376. }
  377. async unscheduledSending(id: number) {
  378. const task = await this.taskRepository.findOneBy({ id })
  379. if (task.status === TaskStatus.SCHEDULED) {
  380. await this.taskRepository.update({ id }, { status: TaskStatus.IDLE, paid: false })
  381. }
  382. const costBalanceRecord = await this.balanceRecordRepository.findOneBy({
  383. taskId: id,
  384. type: BalanceType.CONSUMPTION
  385. })
  386. // 退款
  387. await this.balanceService.feeRefund(task.userId, costBalanceRecord.amount, task.id)
  388. }
  389. async exportTaskItem(taskId: number) {
  390. const workbook = new ExcelJS.Workbook()
  391. const worksheet = workbook.addWorksheet('Sheet1')
  392. const task = await this.taskRepository.findOneBy({ id: taskId })
  393. let where: any = {}
  394. if (task.status === TaskStatus.COMPLETED) {
  395. where = {
  396. taskId: taskId,
  397. embed: false
  398. }
  399. } else if (task.status === TaskStatus.PAUSE) {
  400. where = {
  401. taskId: taskId,
  402. embed: false,
  403. status: In([TaskItemStatus.SUCCESS, TaskItemStatus.FAIL])
  404. }
  405. }
  406. const taskItems = await this.taskItemRepository.find({
  407. where,
  408. order: {
  409. status: 'ASC',
  410. sendAt: 'ASC'
  411. }
  412. })
  413. // 设置列头
  414. worksheet.columns = [
  415. { header: '手机号', key: 'number', width: 30, style: { alignment: { horizontal: 'center' } } },
  416. { header: '是否有效', key: 'isValid', width: 15, style: { alignment: { horizontal: 'center' } } },
  417. { header: '发送成功', key: 'status', width: 15, style: { alignment: { horizontal: 'center' } } },
  418. {
  419. header: '发送时间',
  420. key: 'sendAt',
  421. width: 30,
  422. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  423. }
  424. ]
  425. taskItems.forEach((item) => {
  426. let valid = '无效'
  427. let status = ''
  428. const sendAt: Date = item.sendAt
  429. const formattedSendAt = moment(sendAt).format('YYYY-MM-DD HH:mm:ss')
  430. if (item.status === TaskItemStatus.SUCCESS) {
  431. valid = '有效'
  432. status = '发送成功'
  433. }
  434. worksheet.addRow({
  435. number: item.number,
  436. isValid: valid,
  437. status: status,
  438. sendAt: formattedSendAt
  439. })
  440. })
  441. return await workbook.xlsx.writeBuffer()
  442. }
  443. async exportTask(req: any, data: any) {
  444. if (!data.startDate || !data.endDate) {
  445. throw new Error('请选择日期')
  446. }
  447. let where = {}
  448. if (req.user.roles.includes('superApi')) {
  449. const userIds = await this.userService.getApiInvitesIds(req.user.id)
  450. where = {
  451. userId: In(userIds),
  452. createdAt: Between(data.startDate, data.endDate)
  453. }
  454. } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
  455. const userIds = await this.userService.getInvitesIds(req.user.id)
  456. where = {
  457. userId: In(userIds),
  458. createdAt: Between(data.startDate, data.endDate)
  459. }
  460. } else if (!req.user.roles.includes('admin')) {
  461. where = {
  462. userId: req.user.id,
  463. createdAt: Between(data.startDate, data.endDate)
  464. }
  465. } else {
  466. where = {
  467. createdAt: Between(data.startDate, data.endDate)
  468. }
  469. }
  470. const tasks = await this.taskRepository.find({
  471. where,
  472. order: {
  473. createdAt: 'desc'
  474. }
  475. })
  476. if (tasks.length == 0) {
  477. throw new Error('暂无日期内任务数据')
  478. }
  479. const workbook = new ExcelJS.Workbook()
  480. const worksheet = workbook.addWorksheet('Sheet1')
  481. // 设置列头
  482. worksheet.columns = [
  483. { header: '#', key: 'id', width: 30, style: { alignment: { horizontal: 'center' } } },
  484. { header: '任务名称', key: 'name', width: 15, style: { alignment: { horizontal: 'center' } } },
  485. { header: '任务创建人', key: 'userName', width: 15, style: { alignment: { horizontal: 'center' } } },
  486. { header: '已发送', key: 'sent', width: 15, style: { alignment: { horizontal: 'center' } } },
  487. { header: '发送成功数', key: 'successCount', width: 15, style: { alignment: { horizontal: 'center' } } },
  488. { header: '总数', key: 'total', width: 15, style: { alignment: { horizontal: 'center' } } },
  489. {
  490. header: '创建时间',
  491. key: 'createdAt',
  492. width: 30,
  493. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  494. },
  495. {
  496. header: '开始时间',
  497. key: 'startedAt',
  498. width: 30,
  499. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  500. },
  501. {
  502. header: '结束时间',
  503. key: 'updatedAt',
  504. width: 30,
  505. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  506. }
  507. ]
  508. const userIds = tasks.map((item) => item.userId)
  509. const users = await this.userRepository.findBy({
  510. id: In(userIds)
  511. })
  512. tasks.forEach((task) => {
  513. const user = users.find((user) => user.id === task.userId)
  514. worksheet.addRow({
  515. id: task.id,
  516. name: task.name,
  517. userName: user.username,
  518. sent: task.sent,
  519. successCount: task.successCount,
  520. total: task.total,
  521. createdAt: task.createdAt ? moment(task.createdAt).format('YYYY-MM-DD HH:mm:ss') : '',
  522. startedAt: task.startedAt ? moment(task.startedAt).format('YYYY-MM-DD HH:mm:ss') : '',
  523. updatedAt: task.updatedAt ? moment(task.updatedAt).format('YYYY-MM-DD HH:mm:ss') : ''
  524. })
  525. })
  526. return await workbook.xlsx.writeBuffer()
  527. }
  528. async homeStatistics(req: any) {
  529. let where = {}
  530. const res = {
  531. xData: [],
  532. sentData: [],
  533. successData: [],
  534. totalData: [],
  535. todayData: {
  536. sent: '0',
  537. success: '0',
  538. total: '0',
  539. code: '0'
  540. }
  541. }
  542. const sixDaysAgo = new Date()
  543. sixDaysAgo.setDate(sixDaysAgo.getDate() - 6)
  544. sixDaysAgo.setHours(0, 0, 0, 0)
  545. if (req.user.roles.includes('superApi')) {
  546. const userIds = await this.userService.getApiInvitesIds(req.user.id)
  547. where = {
  548. userId: In(userIds),
  549. createdAt: Between(sixDaysAgo, new Date())
  550. }
  551. } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
  552. const userIds = await this.userService.getInvitesIds(req.user.id)
  553. where = {
  554. userId: In(userIds),
  555. createdAt: Between(sixDaysAgo, new Date())
  556. }
  557. } else if (!req.user.roles.includes('admin')) {
  558. where = {
  559. userId: req.user.id,
  560. createdAt: Between(sixDaysAgo, new Date())
  561. }
  562. } else {
  563. where = {
  564. createdAt: Between(sixDaysAgo, new Date())
  565. }
  566. }
  567. if (req.user.roles.includes('admin') || req.user.roles.includes('superApi')) {
  568. res.todayData.code = await this.rcsNumberRepository
  569. .createQueryBuilder()
  570. .select('count(1) as sum')
  571. .where('status = :status', { status: 'success' })
  572. .andWhere('createdAt between :start and :end', {
  573. start: startOfDay(new Date()),
  574. end: endOfDay(new Date())
  575. })
  576. .getRawOne()
  577. .then((data) => {
  578. return data.sum
  579. })
  580. }
  581. return await this.taskRepository
  582. .createQueryBuilder()
  583. .select([
  584. 'DATE(createdAt) as day',
  585. 'SUM(sent) as sent',
  586. 'SUM(successCount) as success',
  587. 'SUM(total) as total'
  588. ])
  589. .where(where)
  590. .groupBy('day')
  591. .orderBy('day', 'ASC')
  592. .getRawMany()
  593. .then((rows) => {
  594. if (rows.length > 0) {
  595. rows.forEach((item) => {
  596. const day = moment(item.day).format('MM-DD')
  597. res.xData.push(day)
  598. res.sentData.push(item.sent)
  599. res.successData.push(item.success)
  600. res.totalData.push(item.total)
  601. if (moment(new Date()).format('MM-DD').includes(day)) {
  602. res.todayData.sent = item.sent
  603. res.todayData.success = item.success
  604. res.todayData.total = item.total
  605. }
  606. })
  607. }
  608. return res
  609. })
  610. }
  611. async codeStatistics() {
  612. const res = await this.rcsNumberRepository
  613. .createQueryBuilder('rcsNumber')
  614. .select(['rcsNumber.`from` as channel', 'DATE(rcsNumber.createdAt) as day', 'COUNT(1) as sum'])
  615. .where('rcsNumber.status = :status', { status: 'success' })
  616. .andWhere('createdAt between :start and :end', {
  617. start: startOfDay(addDays(new Date(), -1)),
  618. end: endOfDay(new Date())
  619. })
  620. .groupBy('DATE(rcsNumber.createdAt), rcsNumber.from')
  621. .orderBy('rcsNumber.from', 'ASC')
  622. .getRawMany()
  623. const cur = moment(new Date()).format('MM-DD')
  624. const groupedData = res.reduce((acc, item) => {
  625. const day = moment(item.day).format('MM-DD')
  626. const channel = item.channel
  627. if (!acc[channel]) {
  628. acc[channel] = {
  629. channel: channel,
  630. todayData: 0,
  631. yesterdayData: 0
  632. }
  633. }
  634. if (day === cur) {
  635. acc[channel].todayData = item.sum
  636. } else {
  637. acc[channel].yesterdayData = item.sum
  638. }
  639. return acc
  640. }, {})
  641. return Object.values(groupedData)
  642. }
  643. async balanceStatistics(platform: string) {
  644. const supportedPlatforms = {
  645. durian: 0,
  646. durian02: 0,
  647. xyz: 0,
  648. usapanel: 0,
  649. dashboard: 0,
  650. smspva: 0,
  651. smspva02: 0,
  652. smstiger: 0,
  653. smsman: 0,
  654. globalcode: 0
  655. }
  656. // 验证平台是否支持
  657. if (!supportedPlatforms.hasOwnProperty(platform)) {
  658. throw new NotFoundException(`不支持的平台: ${platform}。`)
  659. }
  660. const res = { ...supportedPlatforms }
  661. const perfLogs = {}
  662. const tasks = []
  663. // 根据需要创建相应的 axios 实例
  664. let xyzInstance, panelInstance, dashboardInstance, smspvaInstance, smstigerInstance, smsmanInstance, globalcodeInstance
  665. switch (platform) {
  666. case 'xyz':
  667. xyzInstance = axios.create({
  668. baseURL: 'http://113.28.178.155:8003/api/'
  669. })
  670. break
  671. case 'usapanel':
  672. panelInstance = axios.create({
  673. baseURL: 'https://panel.hellomeetyou.com/api/'
  674. })
  675. break
  676. case 'dashboard':
  677. dashboardInstance = axios.create({
  678. baseURL: 'https://code.smscodes.io/api/sms/'
  679. })
  680. break
  681. case 'smspva':
  682. case 'smspva02':
  683. smspvaInstance = axios.create({
  684. baseURL: 'https://api.smspva.com/activation/'
  685. })
  686. break
  687. case 'smstiger':
  688. smstigerInstance = axios.create({
  689. baseURL: 'https://api.tiger-sms.com/stubs/'
  690. })
  691. break
  692. case 'smsman':
  693. smsmanInstance = axios.create({
  694. baseURL: 'https://api.sms-man.com/control/'
  695. })
  696. break
  697. case 'globalcode':
  698. globalcodeInstance = axios.create({
  699. baseURL: 'https://api.globalsimc.com/serviceapi/'
  700. })
  701. break
  702. }
  703. if (platform === 'durian') {
  704. tasks.push(
  705. (async () => {
  706. const startTime = Date.now()
  707. perfLogs['durian'] = { startTime }
  708. try {
  709. const durianRes = await axios
  710. .create({
  711. baseURL: 'http://8.218.211.187/out/ext_api/',
  712. headers: {
  713. uhost: 'api.durianrcs.com',
  714. uprotocol: 'http'
  715. }
  716. })
  717. .get('getUserInfo', {
  718. params: {
  719. name: 'unsnap3094',
  720. ApiKey: 'U3Jma1hkbUxXblEyL0ZYai9WWFVvdz09'
  721. }
  722. })
  723. if (durianRes.data.code === 200) {
  724. res.durian = durianRes.data.data.score
  725. }
  726. } catch (e) {
  727. Logger.warn(`durian API调用失败: ${e.message}`, this.TAG)
  728. }
  729. perfLogs['durian'].endTime = Date.now()
  730. perfLogs['durian'].duration = perfLogs['durian'].endTime - perfLogs['durian'].startTime
  731. Logger.log(`API调用性能统计 - durian: ${perfLogs['durian'].duration}ms`, this.TAG)
  732. })()
  733. )
  734. }
  735. if (platform === 'durian02') {
  736. tasks.push(
  737. (async () => {
  738. const startTime = Date.now()
  739. perfLogs['durian02'] = { startTime }
  740. try {
  741. const durianRes02 = await axios
  742. .create({
  743. baseURL: 'http://8.218.211.187/out/ext_api/',
  744. headers: {
  745. uhost: 'api.durianrcs.com',
  746. uprotocol: 'http'
  747. }
  748. })
  749. .get('getUserInfo', {
  750. params: {
  751. name: 'unsnap30941',
  752. ApiKey: 'RHJGV1paR1BFWjlFbCtnakUza2xJdz09'
  753. }
  754. })
  755. if (durianRes02.data.code === 200) {
  756. res.durian02 = durianRes02.data.data.score
  757. }
  758. } catch (e) {
  759. Logger.warn(`durian02 API调用失败: ${e.message}`, this.TAG)
  760. }
  761. perfLogs['durian02'].endTime = Date.now()
  762. perfLogs['durian02'].duration = perfLogs['durian02'].endTime - perfLogs['durian02'].startTime
  763. Logger.log(`API调用性能统计 - durian02: ${perfLogs['durian02'].duration}ms`, this.TAG)
  764. })()
  765. )
  766. }
  767. if (platform === 'xyz') {
  768. tasks.push(
  769. (async () => {
  770. const startTime = Date.now()
  771. perfLogs['xyz'] = { startTime }
  772. try {
  773. const xyz = await xyzInstance.get('v1', {
  774. params: {
  775. act: 'myinfo',
  776. token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjp7InVpZCI6MjUsInJvbGVfaWQiOjF9fQ.VU1tvG72YaXooUT-FUaQj-YWVXnVrYBad1AsoWUT4pw'
  777. }
  778. })
  779. const parts = xyz.data.split('|').map((part) => part.trim())
  780. if (parts[0] === '0') {
  781. res.xyz = parts[1] < 0 ? 0 : parts[1]
  782. }
  783. } catch (e) {
  784. Logger.warn(`xyz API调用失败: ${e.message}`, this.TAG)
  785. }
  786. perfLogs['xyz'].endTime = Date.now()
  787. perfLogs['xyz'].duration = perfLogs['xyz'].endTime - perfLogs['xyz'].startTime
  788. Logger.log(`API调用性能统计 - xyz: ${perfLogs['xyz'].duration}ms`, this.TAG)
  789. })()
  790. )
  791. }
  792. if (platform === 'usapanel') {
  793. tasks.push(
  794. (async () => {
  795. const startTime = Date.now()
  796. perfLogs['usapanel'] = { startTime }
  797. try {
  798. const panelRes = await panelInstance.get('account', {
  799. headers: {
  800. 'Content-Type': 'application/json',
  801. Accept: 'application/json',
  802. 'X-API-Key': 'wpJohESEZsjW1LtlyoGwZw53'
  803. }
  804. })
  805. if (panelRes.data) {
  806. res.usapanel = panelRes.data.balance
  807. }
  808. } catch (e) {
  809. Logger.warn(`usapanel API调用失败: ${e.message}`, this.TAG)
  810. }
  811. perfLogs['usapanel'].endTime = Date.now()
  812. perfLogs['usapanel'].duration = perfLogs['usapanel'].endTime - perfLogs['usapanel'].startTime
  813. Logger.log(`API调用性能统计 - usapanel: ${perfLogs['usapanel'].duration}ms`, this.TAG)
  814. })()
  815. )
  816. }
  817. if (platform === 'dashboard') {
  818. tasks.push(
  819. (async () => {
  820. const startTime = Date.now()
  821. perfLogs['dashboard'] = { startTime }
  822. try {
  823. const dashboardRes = await dashboardInstance.get('GetBalance', {
  824. params: {
  825. key: '6619f084-363c-4518-b5b8-57b5e01a9c95'
  826. }
  827. })
  828. if (dashboardRes.data.Status === 'Success') {
  829. res.dashboard = dashboardRes.data.Balance
  830. }
  831. } catch (e) {
  832. Logger.warn(`dashboard API调用失败: ${e.message}`, this.TAG)
  833. }
  834. perfLogs['dashboard'].endTime = Date.now()
  835. perfLogs['dashboard'].duration = perfLogs['dashboard'].endTime - perfLogs['dashboard'].startTime
  836. Logger.log(`API调用性能统计 - dashboard: ${perfLogs['dashboard'].duration}ms`, this.TAG)
  837. })()
  838. )
  839. }
  840. if (platform === 'smspva') {
  841. tasks.push(
  842. (async () => {
  843. const startTime = Date.now()
  844. perfLogs['smspva'] = { startTime }
  845. try {
  846. const smspvaRes = await smspvaInstance.get('balance', {
  847. headers: {
  848. apikey: 'uNW56fGr0zstfs87Xn0e1l2gCYVnb1'
  849. }
  850. })
  851. if (smspvaRes.data.statusCode === 200) {
  852. res.smspva = smspvaRes.data.data.balance
  853. }
  854. } catch (e) {
  855. Logger.warn(`smspva API调用失败: ${e.message}`, this.TAG)
  856. }
  857. perfLogs['smspva'].endTime = Date.now()
  858. perfLogs['smspva'].duration = perfLogs['smspva'].endTime - perfLogs['smspva'].startTime
  859. Logger.log(`API调用性能统计 - smspva: ${perfLogs['smspva'].duration}ms`, this.TAG)
  860. })()
  861. )
  862. }
  863. if (platform === 'smspva02') {
  864. tasks.push(
  865. (async () => {
  866. const startTime = Date.now()
  867. perfLogs['smspva02'] = { startTime }
  868. try {
  869. const smspva02Res = await smspvaInstance.get('balance', {
  870. headers: {
  871. apikey: 'rTTL8pZtKkQ60zjU82bvbMEP7G6XGU'
  872. }
  873. })
  874. if (smspva02Res.data.statusCode === 200) {
  875. res.smspva02 = smspva02Res.data.data.balance
  876. }
  877. } catch (e) {
  878. Logger.warn(`smspva02 API调用失败: ${e.message}`, this.TAG)
  879. }
  880. perfLogs['smspva02'].endTime = Date.now()
  881. perfLogs['smspva02'].duration = perfLogs['smspva02'].endTime - perfLogs['smspva02'].startTime
  882. Logger.log(`API调用性能统计 - smspva02: ${perfLogs['smspva02'].duration}ms`, this.TAG)
  883. })()
  884. )
  885. }
  886. if (platform === 'smstiger') {
  887. tasks.push(
  888. (async () => {
  889. const startTime = Date.now()
  890. perfLogs['smstiger'] = { startTime }
  891. try {
  892. const smstigerRes = await smstigerInstance.get('handler_api.php', {
  893. params: {
  894. api_key: 't6AV7f5KgwRmWsK9M5xN6uTTtHiog8EQ',
  895. action: 'getBalance'
  896. }
  897. })
  898. const data = smstigerRes.data.split(':')
  899. if (data[0] === 'ACCESS_BALANCE') {
  900. res.smstiger = data[1]
  901. }
  902. } catch (e) {
  903. Logger.warn(`smstiger API调用失败: ${e.message}`, this.TAG)
  904. }
  905. perfLogs['smstiger'].endTime = Date.now()
  906. perfLogs['smstiger'].duration = perfLogs['smstiger'].endTime - perfLogs['smstiger'].startTime
  907. Logger.log(`API调用性能统计 - smstiger: ${perfLogs['smstiger'].duration}ms`, this.TAG)
  908. })()
  909. )
  910. }
  911. if (platform === 'smsman') {
  912. tasks.push(
  913. (async () => {
  914. const startTime = Date.now()
  915. perfLogs['smsman'] = { startTime }
  916. try {
  917. const smsmanRes = await smsmanInstance.get('get-balance', {
  918. params: {
  919. token: 'BFEjAMIAEsd8EhmyReGZyKxdGPolhpSx'
  920. }
  921. })
  922. if (smsmanRes.data && smsmanRes.data.balance) {
  923. res.smsman = parseFloat(smsmanRes.data.balance)
  924. }
  925. } catch (e) {
  926. Logger.warn(`smsman API调用失败: ${e.message}`, this.TAG)
  927. }
  928. perfLogs['smsman'].endTime = Date.now()
  929. perfLogs['smsman'].duration = perfLogs['smsman'].endTime - perfLogs['smsman'].startTime
  930. Logger.log(`API调用性能统计 - smsman: ${perfLogs['smsman'].duration}ms`, this.TAG)
  931. })()
  932. )
  933. }
  934. if (platform === 'globalcode') {
  935. tasks.push(
  936. (async () => {
  937. const startTime = Date.now()
  938. perfLogs['globalcode'] = { startTime }
  939. try {
  940. const globalcodeRes = await globalcodeInstance.get('getbalance/', {
  941. params: {
  942. token: '6555dc93c3fecf222ff4daea6a3697a6ee66a8d60ea47bdb2e19281e226f3b61'
  943. }
  944. })
  945. if (globalcodeRes.data.code === 1 && globalcodeRes.data.data) {
  946. // 移除 $ 符号并转换为数字
  947. res.globalcode = parseFloat(globalcodeRes.data.data.replace('$', ''))
  948. }
  949. } catch (e) {
  950. Logger.warn(`globalcode API调用失败: ${e.message}`, this.TAG)
  951. }
  952. perfLogs['globalcode'].endTime = Date.now()
  953. perfLogs['globalcode'].duration = perfLogs['globalcode'].endTime - perfLogs['globalcode'].startTime
  954. Logger.log(`API调用性能统计 - globalcode: ${perfLogs['globalcode'].duration}ms`, this.TAG)
  955. })()
  956. )
  957. }
  958. await Promise.all(tasks)
  959. Logger.log(`API调用性能统计total: ${JSON.stringify(perfLogs)}`, this.TAG)
  960. return { [platform]: res[platform] }
  961. }
  962. async hourSentStatistics() {
  963. const twelveHoursAgo = new Date()
  964. twelveHoursAgo.setHours(twelveHoursAgo.getHours() - 25)
  965. return await this.taskItemRepository
  966. .createQueryBuilder()
  967. .select(['COUNT(*) AS sent', "DATE_FORMAT(sendAt, '%Y-%m-%d %H:00:00') AS hour"])
  968. .where('sendAt BETWEEN :start AND :end', { start: twelveHoursAgo, end: new Date() })
  969. .groupBy('hour')
  970. .orderBy('hour', 'DESC')
  971. .getRawMany()
  972. }
  973. async numStatistics() {
  974. const res = {
  975. totalHoursYesterday: 0,
  976. totalHoursToday: 0,
  977. orderCountYesterday: 0,
  978. orderCountToday: 0,
  979. hourData: 0
  980. }
  981. await Promise.all([
  982. (async () => {
  983. try {
  984. // 查询昨日订单数
  985. const yesterdayOrderCount = await this.taskRepository
  986. .createQueryBuilder()
  987. .select('COUNT(1)', 'sum')
  988. .where('task.startedAt >= CURDATE() - INTERVAL 1 DAY')
  989. .andWhere('task.startedAt < CURDATE()')
  990. .getRawOne()
  991. res.orderCountYesterday = yesterdayOrderCount.sum
  992. } catch (e) {}
  993. })(),
  994. (async () => {
  995. try {
  996. // 查询今日订单数
  997. const todayOrderCount = await this.taskRepository
  998. .createQueryBuilder()
  999. .select('COUNT(1)', 'sum')
  1000. .where('task.startedAt >= CURDATE()')
  1001. .andWhere('task.startedAt < CURDATE() + INTERVAL 1 DAY')
  1002. .getRawOne()
  1003. res.orderCountToday = todayOrderCount.sum
  1004. } catch (e) {}
  1005. })(),
  1006. (async () => {
  1007. try {
  1008. res.hourData = await this.sentHourStatistics()
  1009. } catch (e) {}
  1010. })()
  1011. ])
  1012. return res
  1013. }
  1014. async sentHourStatistics() {
  1015. const result = await this.taskItemRepository.query(`
  1016. SELECT SUM(c) / 60 AS hour
  1017. FROM (
  1018. SELECT 1 AS c, UNIX_TIMESTAMP(sendAt) DIV 60 * 60 AS "time"
  1019. FROM task_item
  1020. WHERE sendAt >= CURDATE() - INTERVAL 1 DAY
  1021. AND sendAt < CURDATE()
  1022. AND status != 'idle'
  1023. GROUP BY time
  1024. ) tmp
  1025. `)
  1026. return Number(result[0]?.hour) || 0
  1027. }
  1028. async sentCountryStatistics(req: any) {
  1029. let where = {}
  1030. if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
  1031. const userIds = await this.userService.getInvitesIds(req.user.id)
  1032. where = {
  1033. userId: In(userIds)
  1034. }
  1035. } else if (!req.user.roles.includes('admin')) {
  1036. where = {
  1037. userId: req.user.id
  1038. }
  1039. }
  1040. // 昨天
  1041. const yesterday = new Date()
  1042. yesterday.setDate(yesterday.getDate() - 1)
  1043. yesterday.setHours(0, 0, 0, 0)
  1044. const res = await this.taskRepository
  1045. .createQueryBuilder()
  1046. .select(['sum(total) as value', 'country as name'])
  1047. .where(where)
  1048. .andWhere('country is not null')
  1049. .andWhere('status = :status', { status: TaskStatus.COMPLETED })
  1050. .andWhere('createdAt between :start and :end', {
  1051. start: startOfDay(yesterday),
  1052. end: endOfDay(yesterday)
  1053. })
  1054. .groupBy('country')
  1055. .orderBy('value', 'DESC')
  1056. .getRawMany()
  1057. const sixDaysAgo = new Date()
  1058. sixDaysAgo.setDate(sixDaysAgo.getDate() - 7)
  1059. sixDaysAgo.setHours(0, 0, 0, 0)
  1060. const totalRes = await this.taskRepository
  1061. .createQueryBuilder()
  1062. .select(['sum(total) as value', 'country as name'])
  1063. .where(where)
  1064. .andWhere('country is not null')
  1065. .andWhere('createdAt between :start and :end', {
  1066. start: sixDaysAgo,
  1067. end: new Date()
  1068. })
  1069. .groupBy('country')
  1070. .orderBy('value', 'DESC')
  1071. .getRawMany()
  1072. return {
  1073. completedData: res,
  1074. totalData: totalRes
  1075. }
  1076. }
  1077. async backupStatistics() {
  1078. const res = {
  1079. todayData: null,
  1080. totalData: null,
  1081. yesterdayData: null
  1082. }
  1083. await Promise.all([
  1084. (async () => {
  1085. try {
  1086. // 总备份数量,剩余未使用备份数量
  1087. res.totalData = await this.rcsNumberRepository
  1088. .createQueryBuilder()
  1089. .select('COUNT(1)', 'count')
  1090. .addSelect('stockFlag', 'stockFlag')
  1091. .where('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] })
  1092. .groupBy('stockFlag')
  1093. .getRawMany()
  1094. } catch (e) {
  1095. console.log(e)
  1096. }
  1097. })(),
  1098. (async () => {
  1099. try {
  1100. // 昨日备份使用成功数量,失败数量
  1101. res.yesterdayData = await this.rcsNumberRepository
  1102. .createQueryBuilder()
  1103. .select('COUNT(1)', 'count')
  1104. .addSelect('stockFlag', 'stockFlag')
  1105. .where('createdAt >= CURDATE() - INTERVAL 1 DAY')
  1106. .andWhere('createdAt < CURDATE()')
  1107. .andWhere('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] })
  1108. .groupBy('stockFlag')
  1109. .getRawMany()
  1110. } catch (e) {
  1111. console.log(e)
  1112. }
  1113. })(),
  1114. (async () => {
  1115. try {
  1116. // 当天备份数量
  1117. res.todayData = await this.rcsNumberRepository
  1118. .createQueryBuilder()
  1119. .select('COUNT(1)', 'count')
  1120. .addSelect('stockFlag', 'stockFlag')
  1121. .where('createdAt >= CURDATE()')
  1122. .andWhere('stockFlag IN (:...stockFlags)', { stockFlags: [1, 2, 3] })
  1123. .groupBy('stockFlag')
  1124. .getRawMany()
  1125. } catch (e) {
  1126. console.log(e)
  1127. }
  1128. })()
  1129. ])
  1130. return [
  1131. {
  1132. flag: '剩余备份',
  1133. yesterday: res.yesterdayData?.find((item) => item.stockFlag === 1)?.count ?? 0,
  1134. today: res.todayData?.find((item) => item.stockFlag === 1)?.count ?? 0,
  1135. total: res.totalData?.find((item) => item.stockFlag === 1)?.count ?? 0
  1136. },
  1137. {
  1138. flag: '使用成功',
  1139. yesterday: res.yesterdayData?.find((item) => item.stockFlag === 2)?.count ?? 0,
  1140. today: res.todayData?.find((item) => item.stockFlag === 2)?.count ?? 0,
  1141. total: res.totalData?.find((item) => item.stockFlag === 2)?.count ?? 0
  1142. },
  1143. {
  1144. flag: '使用失败',
  1145. yesterday: res.yesterdayData?.find((item) => item.stockFlag === 3)?.count ?? 0,
  1146. today: res.todayData?.find((item) => item.stockFlag === 3)?.count ?? 0,
  1147. total: res.totalData?.find((item) => item.stockFlag === 3)?.count ?? 0
  1148. }
  1149. ]
  1150. }
  1151. async getSuccessNum(id: number) {
  1152. return await this.taskItemRepository.count({
  1153. where: {
  1154. taskId: id,
  1155. status: TaskItemStatus.SUCCESS,
  1156. embed: false
  1157. }
  1158. })
  1159. }
  1160. async getToBeSentNum(id: number) {
  1161. let total = 0
  1162. const maxParallel = await this.getConfig('max_parallel', 1)
  1163. const pendingNum = await this.taskRepository.count({
  1164. where: {
  1165. status: TaskStatus.PENDING
  1166. }
  1167. })
  1168. if (pendingNum < maxParallel) {
  1169. return total
  1170. } else {
  1171. // 任务队列
  1172. const tasks = await this.taskRepository.find({
  1173. where: {
  1174. status: TaskStatus.QUEUED
  1175. },
  1176. order: {
  1177. startedAt: 'ASC'
  1178. }
  1179. })
  1180. if (tasks.length > 0) {
  1181. let list = []
  1182. for (let i = 0; i < tasks.length; i++) {
  1183. if (tasks[i].id === id) {
  1184. break
  1185. } else {
  1186. list.push(tasks[i].listId)
  1187. }
  1188. }
  1189. if (list.length > 0) {
  1190. // 队列中当前任务之前剩余任务发送数
  1191. const number = await this.phoneRepository.countBy({
  1192. listId: In(list)
  1193. })
  1194. total += number
  1195. }
  1196. }
  1197. // 正在执行的任务
  1198. const curTasks = await this.taskRepository.find({
  1199. where: {
  1200. status: TaskStatus.PENDING
  1201. }
  1202. })
  1203. if (curTasks.length > 0) {
  1204. const ids = curTasks.map((task) => task.id)
  1205. const number = await this.taskItemRepository.countBy({
  1206. taskId: In(ids),
  1207. status: TaskItemStatus.IDLE,
  1208. embed: false
  1209. })
  1210. total += number
  1211. }
  1212. return total
  1213. }
  1214. }
  1215. async getConfig(name, defValue) {
  1216. try {
  1217. return await this.sysConfigService.getNumber(name, defValue)
  1218. } catch (e) {
  1219. Logger.error('Error getting rcs wait time', e.stack, this.TAG)
  1220. }
  1221. return defValue
  1222. }
  1223. async updateTaskItemStatus(taskIds: number[], status: TaskItemStatus) {
  1224. await this.taskItemRepository.update(
  1225. {
  1226. id: In(taskIds)
  1227. },
  1228. {
  1229. status: status
  1230. }
  1231. )
  1232. }
  1233. async updateTaskItemStatusAndSendAt(taskIds: number[], status: TaskItemStatus) {
  1234. await this.taskItemRepository.update(
  1235. {
  1236. id: In(taskIds)
  1237. },
  1238. {
  1239. status: status,
  1240. sendAt: new Date()
  1241. }
  1242. )
  1243. }
  1244. async getCost(task: Task, user: Users) {
  1245. // const number: number = await this.phoneListService.findCountByListId(task.listId)
  1246. const number = task.total
  1247. // 费用 = 费率*需要发送量
  1248. // const rate = new Decimal(String(user.rate))
  1249. // const num = new Decimal(String(number))
  1250. // const cost = rate.mul(num)
  1251. // 文案类型区分
  1252. const multiplier = task.message ? (task.img ? '3' : '1') : task.img ? '2' : '0'
  1253. if (multiplier === '0') {
  1254. throw new Error('发送任务文案错误,请联系管理员修改!')
  1255. }
  1256. return new Decimal(number).mul(parseInt(multiplier))
  1257. }
  1258. @Interval(2000)
  1259. async scheduleTask() {
  1260. this.lock
  1261. .acquire(
  1262. 'dispatchTask',
  1263. async () => {
  1264. const maxParallel = await this.getConfig('max_parallel', 1)
  1265. let tasks = await this.taskRepository.find({
  1266. where: {
  1267. status: TaskStatus.PENDING
  1268. },
  1269. order: {
  1270. startedAt: 'ASC'
  1271. },
  1272. take: maxParallel
  1273. })
  1274. // 少补
  1275. if (tasks.length < maxParallel) {
  1276. const nextTasks = await this.taskRepository.find({
  1277. where: {
  1278. status: TaskStatus.QUEUED
  1279. },
  1280. order: {
  1281. startedAt: 'ASC',
  1282. id: 'ASC'
  1283. }
  1284. })
  1285. if (nextTasks.length > 0) {
  1286. const userIdMap = new Map()
  1287. tasks.forEach((task) => {
  1288. userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1)
  1289. })
  1290. // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务
  1291. let filteredTasks = []
  1292. const userIds = {}
  1293. const limit = maxParallel - tasks.length
  1294. for (const task of nextTasks) {
  1295. if (!userIds[task.userId]) {
  1296. userIds[task.userId] = 0
  1297. }
  1298. if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) {
  1299. filteredTasks.push(task)
  1300. userIds[task.userId]++
  1301. }
  1302. if (filteredTasks.length >= limit) {
  1303. break
  1304. }
  1305. }
  1306. const nextTasksIds = filteredTasks.map((t) => t.id)
  1307. if (nextTasksIds.length === 0) {
  1308. nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit))
  1309. }
  1310. await this.taskRepository.update(
  1311. { id: In(nextTasksIds) },
  1312. {
  1313. status: TaskStatus.PENDING,
  1314. startedAt: new Date()
  1315. }
  1316. )
  1317. tasks.push(...filteredTasks)
  1318. }
  1319. }
  1320. const pendingTasks = (await this.findPendingTasks()).sort(() => Math.random() - 0.5)
  1321. await Promise.all(
  1322. pendingTasks.map(async (task) => {
  1323. const devices = await this.deviceService.findByTaskId(task.id)
  1324. if (devices.length === 0) return
  1325. await this.dispatchTask(task, devices)
  1326. })
  1327. )
  1328. },
  1329. {
  1330. timeout: 1
  1331. }
  1332. )
  1333. .catch((e) => {
  1334. if (e.message.includes('timed out')) return
  1335. Logger.error('Error dispatchTask', e.stack, this.TAG)
  1336. })
  1337. }
  1338. refineContent(content: string) {
  1339. const regex =
  1340. /https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)/g
  1341. const urls = []
  1342. let match
  1343. while ((match = regex.exec(content)) !== null) {
  1344. if (!urls.includes(match[0])) {
  1345. urls.push(match[0])
  1346. }
  1347. }
  1348. urls.forEach((url) => {
  1349. try {
  1350. const u = new URL(url)
  1351. u.searchParams.append(randomstring.generate(6), randomstring.generate(6))
  1352. content = content.replaceAll(url, u.toString())
  1353. } catch (error) {
  1354. Logger.error('Error parsing url', error.stack, this.TAG)
  1355. }
  1356. })
  1357. return content
  1358. }
  1359. async dispatchTask(task: Task, devices: Device[]) {
  1360. const taskItems = await this.taskItemRepository.find({
  1361. where: {
  1362. taskId: task.id,
  1363. status: TaskItemStatus.IDLE
  1364. },
  1365. take: devices.length * batchSize
  1366. })
  1367. if (taskItems.length === 0) {
  1368. return
  1369. }
  1370. if (devices.length === 0) {
  1371. return
  1372. }
  1373. devices = devices.splice(0, Math.ceil(taskItems.length / batchSize))
  1374. const taskConfig = {
  1375. singleQty: task.singleQty || (await this.getConfig('single_qty', 1)),
  1376. singleTimeout: task.singleTimeout || (await this.getConfig('single_timeout', 2000)),
  1377. singleDelay: task.singleDelay || (await this.getConfig('single_delay', 3000)),
  1378. cleanCount: task.cleanCount || (await this.getConfig('clean_count', 20)),
  1379. groupMode: task.groupMode,
  1380. groupQty: task.groupQty || (await this.getConfig('group_qty', 3)),
  1381. groupSize: task.groupSize || (await this.getConfig('group_size', 9)),
  1382. groupTimeout: task.groupTimeout || (await this.getConfig('group_timeout', 2000)),
  1383. groupDelay: task.groupDelay || (await this.getConfig('group_delay', 3000)),
  1384. checkConnection: task.checkConnection,
  1385. useBackup: task.useBackup,
  1386. e2ee: task.e2ee,
  1387. e2eeTimeout: task.e2eeTimeout || (await this.getConfig('e2ee_timeout', 5000))
  1388. }
  1389. await this.updateTaskItemStatusAndSendAt(
  1390. taskItems.map((i) => i.id),
  1391. TaskItemStatus.PENDING
  1392. )
  1393. await this.deviceService.updateDevice(
  1394. devices.map((d) => d.id),
  1395. { busy: true }
  1396. )
  1397. Promise.all(
  1398. devices.map(async (device, i) => {
  1399. const items = taskItems
  1400. .slice(i * batchSize, i * batchSize + batchSize)
  1401. .map((item) => ({ ...item, message: this.getMessage(task), img: task.img }))
  1402. if (items.length === 0) return
  1403. try {
  1404. const res: any = await this.eventsGateway.sendForResult(
  1405. {
  1406. id: randomUUID(),
  1407. action: 'task',
  1408. data: {
  1409. config: { ...taskConfig, ...(device.configOverrides || {}) },
  1410. tasks: items,
  1411. taskId: task.id
  1412. }
  1413. },
  1414. device.socketId
  1415. )
  1416. Logger.log(`task result: ${JSON.stringify(res)}`, this.TAG)
  1417. if (res.success?.length > 0) {
  1418. await this.updateTaskItemStatus(res.success, TaskItemStatus.SUCCESS)
  1419. }
  1420. if (res.fail?.length > 0) {
  1421. await this.updateTaskItemStatus(res.fail, TaskItemStatus.FAIL)
  1422. }
  1423. if (res.retry?.length > 0) {
  1424. await this.updateTaskItemStatus(res.retry, TaskItemStatus.IDLE)
  1425. }
  1426. if (res instanceof Array) {
  1427. const results = res as TaskResult[]
  1428. for (let result of results) {
  1429. await this.taskItemRepository.update(
  1430. { id: result.id },
  1431. {
  1432. status:
  1433. result.sent === true
  1434. ? TaskItemStatus.SUCCESS
  1435. : result.sent === false
  1436. ? TaskItemStatus.FAIL
  1437. : result.sent === 0
  1438. ? TaskItemStatus.FAIL
  1439. : result.sent === 1
  1440. ? TaskItemStatus.SUCCESS
  1441. : TaskItemStatus.IDLE,
  1442. delivery: result.delivery,
  1443. numberId: result.numberId
  1444. }
  1445. )
  1446. }
  1447. }
  1448. } catch (e) {
  1449. Logger.error('Error running task 3', e.stack, this.TAG)
  1450. await this.updateTaskItemStatus(
  1451. items.map((i) => i.id),
  1452. TaskItemStatus.IDLE
  1453. )
  1454. }
  1455. })
  1456. ).then(async () => {
  1457. const counts = (
  1458. await this.taskItemRepository.manager.query(
  1459. `select status, count(*) as count
  1460. from task_item
  1461. where taskId = ${task.id} and embed = 0
  1462. group by status`
  1463. )
  1464. ).reduce((acc, item) => {
  1465. acc[item.status] = parseInt(item.count)
  1466. return acc
  1467. }, {})
  1468. Logger.log('Task counts', JSON.stringify(counts), this.TAG)
  1469. const successCount = counts.success || 0
  1470. const failCount = counts.fail || 0
  1471. const pendingCount = counts.pending || 0
  1472. const idleCount = counts.idle || 0
  1473. const finish = pendingCount === 0 && idleCount === 0
  1474. // 送达率
  1475. const deliveryCount = await this.taskItemRepository.countBy({ taskId: task.id, delivery: 1 })
  1476. const data: Partial<Task> = {
  1477. sent: successCount + failCount,
  1478. successCount: successCount,
  1479. successRate:
  1480. successCount + failCount > 0
  1481. ? ((successCount / (successCount + failCount)) * 100).toFixed(1) + '%'
  1482. : '0%',
  1483. deliveryCount: deliveryCount,
  1484. deliveryRate:
  1485. deliveryCount > 0 ? ((deliveryCount / (successCount + failCount)) * 100).toFixed(1) + '%' : '0%'
  1486. }
  1487. if (finish) {
  1488. data.status = TaskStatus.COMPLETED
  1489. await this.updateSend(task.userId)
  1490. }
  1491. await this.taskRepository.update({ id: task.id }, data)
  1492. })
  1493. }
  1494. async checkPendingTaskNum() {
  1495. return await this.taskRepository.countBy({
  1496. status: TaskStatus.PENDING
  1497. })
  1498. }
  1499. @Interval(10000)
  1500. async fixDeadTask() {
  1501. const tasks = await this.taskRepository.findBy({
  1502. status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP])
  1503. })
  1504. for (let task of tasks) {
  1505. const items = await this.taskItemRepository.findBy({
  1506. taskId: task.id,
  1507. status: TaskItemStatus.PENDING,
  1508. sendAt: LessThan(addMinutes(new Date(), -10))
  1509. })
  1510. if (items?.length > 0) {
  1511. await this.updateTaskItemStatus(
  1512. items.map((i) => i.id),
  1513. TaskItemStatus.IDLE
  1514. )
  1515. }
  1516. }
  1517. }
  1518. @Interval(60000)
  1519. async changeCheckAvailabilityNumbers() {
  1520. let config: SysConfig
  1521. try {
  1522. config = await this.sysConfigService.findByName('check_availability_numbers')
  1523. } catch (e) {
  1524. Logger.error('Error getting check_availability_numbers', e.stack, this.TAG)
  1525. config = new SysConfig()
  1526. config.name = 'check_availability_numbers'
  1527. }
  1528. for (let i = 0; i < 99; i++) {
  1529. const items = await this.taskItemRepository
  1530. .createQueryBuilder()
  1531. .select()
  1532. .where('status = :status', { status: TaskItemStatus.SUCCESS })
  1533. .andWhere('sendAt > :sendAt', { sendAt: addHours(new Date(), -6 * (i + 1)) })
  1534. .orderBy('RAND()')
  1535. .limit(5)
  1536. .getMany()
  1537. if (5 == items.length) {
  1538. config.value = items.map((i) => i.number).join(',')
  1539. break
  1540. }
  1541. }
  1542. await this.sysConfigService.save(config)
  1543. }
  1544. public async updateSend(id: number) {
  1545. const sum = await this.balanceService.sumAmount(id, BalanceType.CONSUMPTION)
  1546. const user = await this.userRepository.findOneBy({ id })
  1547. user.send = sum.toNumber()
  1548. return await this.userRepository.save(user)
  1549. }
  1550. @Cron('0 0,30 * * * *')
  1551. async scheduledTaskExecution() {
  1552. console.log('The scheduled task starts,', new Date())
  1553. const tasks = await this.taskRepository.findBy({
  1554. status: TaskStatus.SCHEDULED,
  1555. startedAt: LessThanOrEqual(new Date())
  1556. })
  1557. for (const task of tasks) {
  1558. try {
  1559. await this.startTask(task.id)
  1560. console.log(`Task ${task.id} started successfully.`)
  1561. } catch (error) {
  1562. console.error(`Error starting task ${task.id}:`, error)
  1563. }
  1564. }
  1565. console.log('The scheduled task is executed.')
  1566. return tasks
  1567. }
  1568. }