task.service.ts 46 KB


  1. import { OperatorConfigService } from './../operator_config/operator_config.service'
  2. import { PhoneListService } from './../phone-list/phone-list.service'
  3. import {
  4. forwardRef,
  5. Inject,
  6. Injectable,
  7. InternalServerErrorException,
  8. Logger,
  9. NotFoundException,
  10. OnModuleInit
  11. } from '@nestjs/common'
  12. import { InjectRepository } from '@nestjs/typeorm'
  13. import { Task, TaskStatus } from './entities/task.entity'
  14. import { Between, In, LessThan, LessThanOrEqual, Repository } from 'typeorm'
  15. import { TaskItem, TaskItemStatus } from './entities/task-item.entity'
  16. import { PageRequest } from '../common/dto/page-request'
  17. import { paginate, Pagination } from 'nestjs-typeorm-paginate'
  18. import { EventsGateway } from '../events/events.gateway'
  19. import { randomUUID } from 'crypto'
  20. import { setTimeout } from 'timers/promises'
  21. import { DeviceService } from '../device/device.service'
  22. import { SysConfigService } from '../sys-config/sys-config.service'
  23. import { Users } from '../users/entities/users.entity'
  24. import * as ExcelJS from 'exceljs'
  25. import * as moment from 'moment'
  26. import { BalanceService } from '../balance/balance.service'
  27. import { Role } from '../model/role.enum'
  28. import { Phone } from '../phone-list/entities/phone.entity'
  29. import { Cron, Interval } from '@nestjs/schedule'
  30. import * as AsyncLock from 'async-lock'
  31. import { UsersService } from '../users/users.service'
  32. import { addDays, addHours, addMinutes, endOfDay, startOfDay } from 'date-fns'
  33. import { SysConfig } from '../sys-config/entities/sys-config.entity'
  34. import * as randomstring from 'randomstring'
  35. import { RcsNumber } from '../rcs-number/entities/rcs-number.entity'
  36. import axios from 'axios'
  37. import { BalanceRecord, BalanceType } from '../balance/entities/balance-record.entities'
  38. import { Device } from '../device/entities/device.entity'
  39. @Injectable()
  40. export class TaskService implements OnModuleInit {
  41. private lock = new AsyncLock()
  42. private TAG = 'TaskService'
  43. constructor(
  44. @InjectRepository(Task)
  45. private taskRepository: Repository<Task>,
  46. @InjectRepository(TaskItem)
  47. private taskItemRepository: Repository<TaskItem>,
  48. @InjectRepository(Phone)
  49. private phoneRepository: Repository<Phone>,
  50. @InjectRepository(Users)
  51. private userRepository: Repository<Users>,
  52. @InjectRepository(RcsNumber)
  53. private rcsNumberRepository: Repository<RcsNumber>,
  54. @InjectRepository(BalanceRecord)
  55. private readonly balanceRecordRepository: Repository<BalanceRecord>,
  56. @Inject(forwardRef(() => EventsGateway))
  57. private readonly eventsGateway: EventsGateway,
  58. private readonly phoneListService: PhoneListService,
  59. @Inject(forwardRef(() => DeviceService))
  60. private readonly deviceService: DeviceService,
  61. private readonly sysConfigService: SysConfigService,
  62. private readonly balanceService: BalanceService,
  63. private readonly userService: UsersService,
  64. private readonly operatorConfigService: OperatorConfigService
  65. ) {}
  66. onModuleInit() {
  67. this.lock.acquire('dispatchTask', async () => {
  68. const tasks = await this.taskRepository.findBy({
  69. status: TaskStatus.PENDING
  70. })
  71. for (let task of tasks) {
  72. await this.taskItemRepository.update(
  73. { taskId: task.id, status: TaskItemStatus.PENDING },
  74. { status: TaskStatus.IDLE }
  75. )
  76. }
  77. await setTimeout(10000)
  78. })
  79. }
  80. private taskControllers: { [key: number]: AbortController } = {}
  81. async findById(id: number): Promise<Task> {
  82. return await this.taskRepository.findOneBy({ id })
  83. }
  84. async findAllTask(req: PageRequest<Task>): Promise<Pagination<Task>> {
  85. const page = await paginate<Task>(this.taskRepository, req.page, req.search)
  86. if (page.items.length !== 0) {
  87. let items = page.items
  88. const userIds = items.map((item) => item.userId)
  89. const users = await this.userRepository.findBy({
  90. id: In(userIds)
  91. })
  92. for (let i = 0; i < items.length; i++) {
  93. const item = items[i]
  94. const user = users.find((user) => user.id === item.userId)
  95. if (user) {
  96. item.userName = user.username
  97. }
  98. }
  99. }
  100. return page
  101. }
  102. async findPendingTasks() {
  103. return await this.taskRepository.findBy({
  104. status: In([TaskStatus.PENDING, TaskStatus.CUTTING, TaskStatus.VIP])
  105. })
  106. }
  107. async findAllTaskItem(req: PageRequest<TaskItem>): Promise<Pagination<TaskItem>> {
  108. return await paginate<TaskItem>(this.taskItemRepository, req.page, req.search)
  109. }
  110. async createTask(task: Task): Promise<Task> {
  111. const phoneList = await this.phoneListService.findPhoneListById(task.listId)
  112. if (!phoneList) {
  113. throw new NotFoundException('Phone list not found')
  114. }
  115. const phones = await this.phoneListService.findPhoneByListId(task.listId)
  116. if (!phones || phones.length === 0) {
  117. throw new InternalServerErrorException('请先上传料子')
  118. } else if (phones.length < 100) {
  119. throw new InternalServerErrorException('料子条数不能少于100条!')
  120. }
  121. task.total = phones.length
  122. task.country = phoneList.country
  123. if (task.country) {
  124. task.e2ee = (await this.operatorConfigService.findByCountry(task.country))?.e2ee || 0
  125. }
  126. // 定时任务
  127. let cost = 0
  128. if (task.startedAt) {
  129. task.status = TaskStatus.SCHEDULED
  130. const user = await this.userService.findById(task.userId)
  131. // 创建任务前扣费
  132. cost = await this.getCost(task, user)
  133. if (cost > (user.balance || 0)) {
  134. throw new Error('余额不足,请充值后创建定时任务!')
  135. }
  136. task.paid = true
  137. }
  138. task = await this.taskRepository.save(task)
  139. if (task.paid) {
  140. try {
  141. await this.balanceService.feeDeduction(task.userId, cost, task.id)
  142. } catch (e) {
  143. task.status = TaskStatus.IDLE
  144. task.paid = false
  145. await this.taskRepository.update(task.id, { status: TaskStatus.IDLE, paid: false })
  146. throw new Error('定时任务扣款失败,已转为手动发送任务')
  147. }
  148. }
  149. let finalPhones = [...phones]
  150. // 埋号
  151. const extraNumbersString = await this.sysConfigService.getString('embed_numbers', '')
  152. const extraNumbers = extraNumbersString.split(',')
  153. if (extraNumbers.length > 0) {
  154. const extraNumbersNum = extraNumbers.length
  155. const totalLength = finalPhones.length + extraNumbersNum
  156. const insertionStep = Math.floor(totalLength / (extraNumbersNum + 1))
  157. extraNumbers.forEach((extraNumber, index) => {
  158. const insertIndex = (index + 1) * insertionStep
  159. const cur = new Phone()
  160. cur.number = extraNumber
  161. finalPhones.splice(insertIndex, 0, cur)
  162. })
  163. }
  164. await this.taskItemRepository
  165. .createQueryBuilder()
  166. .insert()
  167. .values(
  168. finalPhones.map((phone) => {
  169. const taskItem = new TaskItem()
  170. taskItem.taskId = task.id
  171. taskItem.number = phone.number
  172. taskItem.embed = extraNumbers.includes(phone.number)
  173. taskItem.status = TaskStatus.IDLE
  174. return taskItem
  175. })
  176. )
  177. .updateEntity(false)
  178. .execute()
  179. return task
  180. }
  181. getMessage(task: Task) {
  182. let message = task.message
  183. task.dynamicMessage?.forEach((dm) => {
  184. if (dm.key && dm.values?.length > 0) {
  185. message = message.replaceAll(`${dm.key}`, dm.values[Math.floor(Math.random() * dm.values.length)])
  186. }
  187. })
  188. return this.refineContent(message)
  189. }
  190. async updateTask(id: number, user: Users, data: Task) {
  191. if (!id) throw new Error('Task id is required')
  192. const old = await this.taskRepository.findOneOrFail({
  193. where: { id }
  194. })
  195. if (old.userId !== user.id && !user.roles.includes(Role.Admin)) {
  196. throw new Error('No permission to update task')
  197. }
  198. return await this.taskRepository.update(
  199. { id },
  200. {
  201. message: data.message || old.message,
  202. dynamicMessage: data.dynamicMessage || old.dynamicMessage,
  203. rcsWait: data.rcsWait,
  204. rcsInterval: data.rcsInterval,
  205. cleanCount: data.cleanCount,
  206. requestNumberInterval: data.requestNumberInterval,
  207. checkConnection: data.checkConnection,
  208. country: data.country,
  209. matchDevice: data.matchDevice,
  210. useBackup: data.useBackup,
  211. e2ee: data.e2ee,
  212. e2eeTimeout: data.e2eeTimeout
  213. }
  214. )
  215. }
  216. async balanceVerification(id: number) {
  217. const task = await this.taskRepository.findOneBy({ id })
  218. // 获取用户信息
  219. const user = await this.userService.findById(task.userId)
  220. if (user.roles.includes(Role.Admin)) {
  221. return 0
  222. }
  223. const cost: number = await this.getCost(task, user)
  224. // 验证余额
  225. if (cost > (user.balance || 0)) {
  226. return -1
  227. } else {
  228. return cost
  229. }
  230. }
  231. async delTask(id: number) {
  232. const task = await this.taskRepository.findOneBy({ id })
  233. if (task.status !== TaskStatus.IDLE) {
  234. throw new Error('当前任务状态无法删除!')
  235. }
  236. await this.taskRepository.delete(id)
  237. return task
  238. }
  239. async startTask(id: number): Promise<void> {
  240. const task = await this.taskRepository.findOneOrFail({
  241. where: { id }
  242. })
  243. if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSE && task.status !== TaskStatus.SCHEDULED)
  244. return
  245. const user = await this.userService.findById(task.userId)
  246. if (!task.paid) {
  247. if (!user.roles.includes(Role.Admin)) {
  248. // 开始任务前扣费
  249. const cost = await this.getCost(task, user)
  250. if (cost > (user.balance || 0)) {
  251. throw new Error('Insufficient balance!')
  252. }
  253. await this.balanceService.feeDeduction(task.userId, cost, task.id)
  254. await this.taskRepository.update({ id }, { paid: true })
  255. }
  256. }
  257. let curStatus = TaskStatus.IDLE
  258. if (user.isVip) {
  259. // 专线发送
  260. curStatus = TaskStatus.VIP
  261. } else {
  262. // 最大并行数
  263. const maxParallel = await this.getConfig('max_parallel', 0)
  264. // 查询当前是否有任务执行
  265. const num = await this.taskRepository.count({
  266. where: {
  267. status: TaskStatus.PENDING
  268. }
  269. })
  270. if (num < maxParallel) {
  271. curStatus = TaskStatus.PENDING
  272. } else {
  273. // 如果当前任务数大于最大并行数,则将任务放入排队队列中
  274. curStatus = TaskStatus.QUEUED
  275. }
  276. }
  277. await this.taskRepository.update(
  278. { id },
  279. {
  280. status: curStatus,
  281. startedAt: new Date()
  282. }
  283. )
  284. }
  285. async queueCutting(id: number): Promise<void> {
  286. const task = await this.taskRepository.findOneBy({ id })
  287. if (task.status === TaskStatus.IDLE || task.status === TaskStatus.QUEUED || task.status === TaskStatus.PAUSE) {
  288. await this.taskRepository.update({ id }, { status: TaskStatus.CUTTING })
  289. }
  290. }
  291. async pauseTask(id: number): Promise<void> {
  292. const task = await this.taskRepository.findOneBy({ id })
  293. if (
  294. task.status === TaskStatus.PENDING ||
  295. task.status === TaskStatus.QUEUED ||
  296. task.status === TaskStatus.CUTTING ||
  297. task.status === TaskStatus.VIP
  298. ) {
  299. await this.taskRepository.update({ id }, { status: TaskStatus.PAUSE })
  300. }
  301. }
  302. async forceCompletion(id: number): Promise<void> {
  303. const task = await this.taskRepository.findOneBy({ id })
  304. if (task.status === TaskStatus.PAUSE || task.status === TaskStatus.QUEUED) {
  305. await this.taskRepository.update({ id }, { status: TaskStatus.COMPLETED })
  306. }
  307. }
  308. async unscheduledSending(id: number) {
  309. const task = await this.taskRepository.findOneBy({ id })
  310. if (task.status === TaskStatus.SCHEDULED) {
  311. await this.taskRepository.update({ id }, { status: TaskStatus.IDLE, paid: false })
  312. }
  313. const costBalanceRecord = await this.balanceRecordRepository.findOneBy({
  314. taskId: id,
  315. type: BalanceType.CONSUMPTION
  316. })
  317. // 退款
  318. await this.balanceService.feeRefund(task.userId, costBalanceRecord.amount, task.id)
  319. }
  320. async exportTaskItem(taskId: number) {
  321. const workbook = new ExcelJS.Workbook()
  322. const worksheet = workbook.addWorksheet('Sheet1')
  323. const task = await this.taskRepository.findOneBy({ id: taskId })
  324. let where: any = {}
  325. if (task.status === TaskStatus.COMPLETED) {
  326. where = {
  327. taskId: taskId,
  328. embed: false
  329. }
  330. } else if (task.status === TaskStatus.PAUSE) {
  331. where = {
  332. taskId: taskId,
  333. embed: false,
  334. status: In([TaskItemStatus.SUCCESS, TaskItemStatus.FAIL])
  335. }
  336. }
  337. const taskItems = await this.taskItemRepository.find({
  338. where,
  339. order: {
  340. status: 'ASC',
  341. sendAt: 'ASC'
  342. }
  343. })
  344. // 设置列头
  345. worksheet.columns = [
  346. { header: '手机号', key: 'number', width: 30, style: { alignment: { horizontal: 'center' } } },
  347. { header: '是否有效', key: 'isValid', width: 15, style: { alignment: { horizontal: 'center' } } },
  348. { header: '发送成功', key: 'status', width: 15, style: { alignment: { horizontal: 'center' } } },
  349. {
  350. header: '发送时间',
  351. key: 'sendAt',
  352. width: 30,
  353. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  354. }
  355. ]
  356. taskItems.forEach((item) => {
  357. let valid = '无效'
  358. let status = ''
  359. const sendAt: Date = item.sendAt
  360. const formattedSendAt = moment(sendAt).format('YYYY-MM-DD HH:mm:ss')
  361. if (item.status === TaskItemStatus.SUCCESS) {
  362. valid = '有效'
  363. status = '发送成功'
  364. }
  365. worksheet.addRow({
  366. number: item.number,
  367. isValid: valid,
  368. status: status,
  369. sendAt: formattedSendAt
  370. })
  371. })
  372. return await workbook.xlsx.writeBuffer()
  373. }
  374. async exportTask(req: any, data: any) {
  375. if (!data.startDate || !data.endDate) {
  376. throw new Error('请选择日期')
  377. }
  378. let where = {}
  379. if (req.user.roles.includes('superApi')) {
  380. const userIds = await this.userService.getApiInvitesIds(req.user.id)
  381. where = {
  382. userId: In(userIds),
  383. createdAt: Between(data.startDate, data.endDate)
  384. }
  385. } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
  386. const userIds = await this.userService.getInvitesIds(req.user.id)
  387. where = {
  388. userId: In(userIds),
  389. createdAt: Between(data.startDate, data.endDate)
  390. }
  391. } else if (!req.user.roles.includes('admin')) {
  392. where = {
  393. userId: req.user.id,
  394. createdAt: Between(data.startDate, data.endDate)
  395. }
  396. } else {
  397. where = {
  398. createdAt: Between(data.startDate, data.endDate)
  399. }
  400. }
  401. const tasks = await this.taskRepository.find({
  402. where,
  403. order: {
  404. createdAt: 'desc'
  405. }
  406. })
  407. if (tasks.length == 0) {
  408. throw new Error('暂无日期内任务数据')
  409. }
  410. const workbook = new ExcelJS.Workbook()
  411. const worksheet = workbook.addWorksheet('Sheet1')
  412. // 设置列头
  413. worksheet.columns = [
  414. { header: '#', key: 'id', width: 30, style: { alignment: { horizontal: 'center' } } },
  415. { header: '任务名称', key: 'name', width: 15, style: { alignment: { horizontal: 'center' } } },
  416. { header: '任务创建人', key: 'userName', width: 15, style: { alignment: { horizontal: 'center' } } },
  417. { header: '已发送', key: 'sent', width: 15, style: { alignment: { horizontal: 'center' } } },
  418. { header: '发送成功数', key: 'successCount', width: 15, style: { alignment: { horizontal: 'center' } } },
  419. { header: '总数', key: 'total', width: 15, style: { alignment: { horizontal: 'center' } } },
  420. {
  421. header: '创建时间',
  422. key: 'createdAt',
  423. width: 30,
  424. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  425. },
  426. {
  427. header: '开始时间',
  428. key: 'startedAt',
  429. width: 30,
  430. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  431. },
  432. {
  433. header: '结束时间',
  434. key: 'updatedAt',
  435. width: 30,
  436. style: { alignment: { horizontal: 'center' }, numFmt: 'YYYY-MM-DD HH:mm:ss' }
  437. }
  438. ]
  439. const userIds = tasks.map((item) => item.userId)
  440. const users = await this.userRepository.findBy({
  441. id: In(userIds)
  442. })
  443. tasks.forEach((task) => {
  444. const user = users.find((user) => user.id === task.userId)
  445. worksheet.addRow({
  446. id: task.id,
  447. name: task.name,
  448. userName: user.username,
  449. sent: task.sent,
  450. successCount: task.successCount,
  451. total: task.total,
  452. createdAt: task.createdAt ? moment(task.createdAt).format('YYYY-MM-DD HH:mm:ss') : '',
  453. startedAt: task.startedAt ? moment(task.startedAt).format('YYYY-MM-DD HH:mm:ss') : '',
  454. updatedAt: task.updatedAt ? moment(task.updatedAt).format('YYYY-MM-DD HH:mm:ss') : ''
  455. })
  456. })
  457. return await workbook.xlsx.writeBuffer()
  458. }
  459. async homeStatistics(req: any) {
  460. let where = {}
  461. const res = {
  462. xData: [],
  463. sentData: [],
  464. successData: [],
  465. totalData: [],
  466. todayData: {
  467. sent: '0',
  468. success: '0',
  469. total: '0',
  470. code: '0'
  471. }
  472. }
  473. const sixDaysAgo = new Date()
  474. sixDaysAgo.setDate(sixDaysAgo.getDate() - 6)
  475. sixDaysAgo.setHours(0, 0, 0, 0)
  476. if (req.user.roles.includes('superApi')) {
  477. const userIds = await this.userService.getApiInvitesIds(req.user.id)
  478. where = {
  479. userId: In(userIds),
  480. createdAt: Between(sixDaysAgo, new Date())
  481. }
  482. } else if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
  483. const userIds = await this.userService.getInvitesIds(req.user.id)
  484. where = {
  485. userId: In(userIds),
  486. createdAt: Between(sixDaysAgo, new Date())
  487. }
  488. } else if (!req.user.roles.includes('admin')) {
  489. where = {
  490. userId: req.user.id,
  491. createdAt: Between(sixDaysAgo, new Date())
  492. }
  493. } else {
  494. where = {
  495. // userId: Not(1),
  496. createdAt: Between(sixDaysAgo, new Date())
  497. }
  498. }
  499. if (req.user.roles.includes('admin') || req.user.roles.includes('superApi')) {
  500. res.todayData.code = await this.rcsNumberRepository
  501. .createQueryBuilder()
  502. .select('count(1) as sum')
  503. .where('status = :status', { status: 'success' })
  504. .andWhere('createdAt between :start and :end', {
  505. start: startOfDay(new Date()),
  506. end: endOfDay(new Date())
  507. })
  508. .getRawOne()
  509. .then((data) => {
  510. return data.sum
  511. })
  512. }
  513. return await this.taskRepository
  514. .createQueryBuilder()
  515. .select([
  516. 'DATE(createdAt) as day',
  517. 'SUM(sent) as sent',
  518. 'SUM(successCount) as success',
  519. 'SUM(total) as total'
  520. ])
  521. .where(where)
  522. .groupBy('day')
  523. .orderBy('day', 'ASC')
  524. .getRawMany()
  525. .then((rows) => {
  526. if (rows.length > 0) {
  527. rows.forEach((item) => {
  528. const day = moment(item.day).format('MM-DD')
  529. res.xData.push(day)
  530. res.sentData.push(item.sent)
  531. res.successData.push(item.success)
  532. res.totalData.push(item.total)
  533. if (moment(new Date()).format('MM-DD').includes(day)) {
  534. res.todayData.sent = item.sent
  535. res.todayData.success = item.success
  536. res.todayData.total = item.total
  537. }
  538. })
  539. }
  540. return res
  541. })
  542. }
  543. async codeStatistics() {
  544. const res = await this.rcsNumberRepository
  545. .createQueryBuilder('rcsNumber')
  546. .select(['rcsNumber.`from` as channel', 'DATE(rcsNumber.createdAt) as day', 'COUNT(1) as sum'])
  547. .where('rcsNumber.status = :status', { status: 'success' })
  548. .andWhere('createdAt between :start and :end', {
  549. start: startOfDay(addDays(new Date(), -1)),
  550. end: endOfDay(new Date())
  551. })
  552. .groupBy('DATE(rcsNumber.createdAt), rcsNumber.from')
  553. .orderBy('rcsNumber.from', 'ASC')
  554. .getRawMany()
  555. const cur = moment(new Date()).format('MM-DD')
  556. const groupedData = res.reduce((acc, item) => {
  557. const day = moment(item.day).format('MM-DD')
  558. const channel = item.channel
  559. if (!acc[channel]) {
  560. acc[channel] = {
  561. channel: channel,
  562. todayData: 0,
  563. yesterdayData: 0
  564. }
  565. }
  566. if (day === cur) {
  567. acc[channel].todayData = item.sum
  568. } else {
  569. acc[channel].yesterdayData = item.sum
  570. }
  571. return acc
  572. }, {})
  573. return Object.values(groupedData)
  574. }
  575. async balanceStatistics() {
  576. const res = {
  577. durian: 0,
  578. cloud033: 0,
  579. cloud034: 0,
  580. cloud037: 0,
  581. xyz: 0
  582. }
  583. const cloudInstance = axios.create({
  584. baseURL: 'http://52.77.17.214:9001/api/'
  585. })
  586. const xyzInstance = axios.create({
  587. baseURL: 'http://113.28.178.155:8003/api/'
  588. })
  589. await Promise.all([
  590. (async () => {
  591. try {
  592. const durianRes = await axios
  593. .create({
  594. baseURL: 'http://8.218.211.187/out/ext_api/',
  595. headers: {
  596. uhost: 'api.durianrcs.com',
  597. uprotocol: 'http'
  598. }
  599. })
  600. .get('getUserInfo', {
  601. params: {
  602. name: 'unsnap3094',
  603. ApiKey: 'U3Jma1hkbUxXblEyL0ZYai9WWFVvdz09'
  604. }
  605. })
  606. if (durianRes.data.code === 200) {
  607. res.durian = durianRes.data.data.score
  608. }
  609. } catch (e) {}
  610. })(),
  611. (async () => {
  612. try {
  613. const xyz = await xyzInstance.get('v1', {
  614. params: {
  615. act: 'myinfo',
  616. token: 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJkYXRhIjp7InVpZCI6MjUsInJvbGVfaWQiOjF9fQ.VU1tvG72YaXooUT-FUaQj-YWVXnVrYBad1AsoWUT4pw'
  617. }
  618. })
  619. const parts = xyz.data.split('|').map((part) => part.trim())
  620. if (parts[0] === '0') {
  621. res.xyz = parts[1] < 0 ? 0 : parts[1]
  622. }
  623. } catch (e) {}
  624. })(),
  625. (async () => {
  626. try {
  627. const cloud033Res = await cloudInstance.get('userBalance', {
  628. params: {
  629. userid: '100033',
  630. token: '1e40ca9795b1fc038db76512175d59b5'
  631. }
  632. })
  633. if (cloud033Res.data.code === '1001') {
  634. res.cloud033 = cloud033Res.data.data.integral
  635. }
  636. } catch (e) {}
  637. })(),
  638. (async () => {
  639. try {
  640. const cloud034Res = await cloudInstance.get('userBalance', {
  641. params: {
  642. userid: '100034',
  643. token: 'ed7b3de69df3d6d9ddfaa7eb862272f5'
  644. }
  645. })
  646. if (cloud034Res.data.code === '1001') {
  647. res.cloud034 = cloud034Res.data.data.integral
  648. }
  649. } catch (e) {}
  650. })(),
  651. (async () => {
  652. try {
  653. const cloud037Res = await cloudInstance.get('userBalance', {
  654. params: {
  655. userid: '100037',
  656. token: 'aaec6c21e54dc53b92e472df21a95bb7'
  657. }
  658. })
  659. if (cloud037Res.data.code === '1001') {
  660. res.cloud037 = cloud037Res.data.data.integral
  661. }
  662. } catch (e) {}
  663. })()
  664. ])
  665. return res
  666. }
  667. async hourSentStatistics() {
  668. const twelveHoursAgo = new Date()
  669. twelveHoursAgo.setHours(twelveHoursAgo.getHours() - 12)
  670. return await this.taskItemRepository
  671. .createQueryBuilder()
  672. .select(['COUNT(*) AS sent', "DATE_FORMAT(sendAt, '%Y-%m-%d %H:00:00') AS hour"])
  673. .where('sendAt BETWEEN :start AND :end', { start: twelveHoursAgo, end: new Date() })
  674. .groupBy('hour')
  675. .orderBy('hour', 'DESC')
  676. .getRawMany()
  677. }
  678. async sentCountryStatistics(req: any) {
  679. let where = {}
  680. if (req.user.roles.includes('api') || req.user.roles.includes('superApi')) {
  681. const userIds = await this.userService.getInvitesIds(req.user.id)
  682. where = {
  683. userId: In(userIds)
  684. }
  685. } else if (!req.user.roles.includes('admin')) {
  686. where = {
  687. userId: req.user.id
  688. }
  689. }
  690. // 昨天
  691. const yesterday = new Date()
  692. yesterday.setDate(yesterday.getDate() - 1)
  693. yesterday.setHours(0, 0, 0, 0)
  694. const res = await this.taskRepository
  695. .createQueryBuilder()
  696. .select(['sum(total) as value', 'country as name'])
  697. .where(where)
  698. .andWhere('country is not null')
  699. .andWhere('status = :status', { status: TaskStatus.COMPLETED })
  700. .andWhere('createdAt between :start and :end', {
  701. start: startOfDay(yesterday),
  702. end: endOfDay(yesterday)
  703. })
  704. .groupBy('country')
  705. .orderBy('value', 'DESC')
  706. .getRawMany()
  707. const sixDaysAgo = new Date()
  708. sixDaysAgo.setDate(sixDaysAgo.getDate() - 7)
  709. sixDaysAgo.setHours(0, 0, 0, 0)
  710. const totalRes = await this.taskRepository
  711. .createQueryBuilder()
  712. .select(['sum(total) as value', 'country as name'])
  713. .where(where)
  714. .andWhere('country is not null')
  715. .andWhere('createdAt between :start and :end', {
  716. start: sixDaysAgo,
  717. end: new Date()
  718. })
  719. .groupBy('country')
  720. .orderBy('value', 'DESC')
  721. .getRawMany()
  722. return {
  723. completedData: res,
  724. totalData: totalRes
  725. }
  726. }
  727. async getSuccessNum(id: number) {
  728. return await this.taskItemRepository.count({
  729. where: {
  730. taskId: id,
  731. status: TaskItemStatus.SUCCESS,
  732. embed: false
  733. }
  734. })
  735. }
  736. async getToBeSentNum(id: number) {
  737. let total = 0
  738. const maxParallel = await this.getConfig('max_parallel', 1)
  739. const pendingNum = await this.taskRepository.count({
  740. where: {
  741. status: TaskStatus.PENDING
  742. }
  743. })
  744. if (pendingNum < maxParallel) {
  745. return total
  746. } else {
  747. // 任务队列
  748. const tasks = await this.taskRepository.find({
  749. where: {
  750. status: TaskStatus.QUEUED
  751. },
  752. order: {
  753. startedAt: 'ASC'
  754. }
  755. })
  756. if (tasks.length > 0) {
  757. let list = []
  758. for (let i = 0; i < tasks.length; i++) {
  759. if (tasks[i].id === id) {
  760. break
  761. } else {
  762. list.push(tasks[i].listId)
  763. }
  764. }
  765. if (list.length > 0) {
  766. // 队列中当前任务之前剩余任务发送数
  767. const number = await this.phoneRepository.countBy({
  768. listId: In(list)
  769. })
  770. total += number
  771. }
  772. }
  773. // 正在执行的任务
  774. const curTasks = await this.taskRepository.find({
  775. where: {
  776. status: TaskStatus.PENDING
  777. }
  778. })
  779. if (curTasks.length > 0) {
  780. const ids = curTasks.map((task) => task.id)
  781. const number = await this.taskItemRepository.countBy({
  782. taskId: In(ids),
  783. status: TaskItemStatus.IDLE,
  784. embed: false
  785. })
  786. total += number
  787. }
  788. return total
  789. }
  790. }
  791. async getConfig(name, defValue) {
  792. try {
  793. return await this.sysConfigService.getNumber(name, defValue)
  794. } catch (e) {
  795. Logger.error('Error getting rcs wait time', e.stack, this.TAG)
  796. }
  797. return defValue
  798. }
  799. async updateTaskItemStatus(taskIds: number[], status: TaskItemStatus) {
  800. await this.taskItemRepository.update(
  801. {
  802. id: In(taskIds)
  803. },
  804. {
  805. status: status
  806. }
  807. )
  808. }
  809. async updateTaskItemStatusAndSendAt(taskIds: number[], status: TaskItemStatus) {
  810. await this.taskItemRepository.update(
  811. {
  812. id: In(taskIds)
  813. },
  814. {
  815. status: status,
  816. sendAt: new Date()
  817. }
  818. )
  819. }
  820. async getCost(task: Task, user: Users) {
  821. const number: number = await this.phoneListService.findCountByListId(task.listId)
  822. // 费用 = 费率*需要发送量
  823. // const rate = new Decimal(String(user.rate))
  824. // const num = new Decimal(String(number))
  825. // const cost = rate.mul(num)
  826. return number
  827. }
  828. @Interval(2000)
  829. async scheduleTask() {
  830. this.lock
  831. .acquire(
  832. 'dispatchTask',
  833. async () => {
  834. const maxParallel = await this.getConfig('max_parallel', 1)
  835. const batchSize = 200
  836. const tasks = await this.taskRepository.find({
  837. where: {
  838. status: TaskStatus.PENDING
  839. },
  840. order: {
  841. startedAt: 'ASC'
  842. },
  843. take: maxParallel
  844. })
  845. // 少补
  846. if (tasks.length < maxParallel) {
  847. const nextTasks = await this.taskRepository.find({
  848. where: {
  849. status: TaskStatus.QUEUED
  850. },
  851. order: {
  852. startedAt: 'ASC',
  853. id: 'ASC'
  854. }
  855. })
  856. if (nextTasks.length > 0) {
  857. const userIdMap = new Map()
  858. tasks.forEach((task) => {
  859. userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1)
  860. })
  861. // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务
  862. let filteredTasks = []
  863. const userIds = {}
  864. const limit = maxParallel - tasks.length
  865. for (const task of nextTasks) {
  866. if (!userIds[task.userId]) {
  867. userIds[task.userId] = 0
  868. }
  869. if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) {
  870. filteredTasks.push(task)
  871. userIds[task.userId]++
  872. }
  873. if (filteredTasks.length >= limit) {
  874. break
  875. }
  876. }
  877. const nextTasksIds = filteredTasks.map((t) => t.id)
  878. if (nextTasksIds.length === 0) {
  879. nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit))
  880. }
  881. await this.taskRepository.update({ id: In(nextTasksIds) }, { status: TaskStatus.PENDING })
  882. tasks.push(...filteredTasks)
  883. }
  884. }
  885. // 专线任务,插队任务
  886. const cuttingTasks = await this.taskRepository.find({
  887. where: {
  888. status: In([TaskStatus.CUTTING, TaskStatus.VIP])
  889. }
  890. })
  891. if (cuttingTasks.length > 0) {
  892. tasks.push(...cuttingTasks)
  893. }
  894. if (tasks.length === 0) return
  895. const devices = await this.deviceService.findAllAvailableDevices()
  896. if (devices.length === 0) return
  897. const countryMapping: { [key: string]: string[] } = JSON.parse(
  898. (await this.sysConfigService.getString('countryMapping', '')) || '{}'
  899. )
  900. const res = tasks.map((task) => {
  901. return {
  902. task,
  903. useCountry: task.country ? countryMapping[task.country] || ['any'] : ['any'],
  904. devices: []
  905. }
  906. })
  907. devices.forEach((device) => {
  908. let candidateTasks = res.filter(
  909. (r) => Math.ceil((r.task.total - r.task.sent) / 5) > r.devices.length
  910. )
  911. if (device.matchCountry && device.pinCountry) {
  912. candidateTasks = candidateTasks.filter((r) => {
  913. return r.useCountry.includes(device.pinCountry.toUpperCase())
  914. })
  915. } else {
  916. candidateTasks = candidateTasks.filter((r) => {
  917. return (
  918. r.useCountry.includes('any') ||
  919. r.useCountry.includes(device.currentCountry?.toUpperCase())
  920. )
  921. })
  922. }
  923. if (candidateTasks.length > 0) {
  924. candidateTasks.sort((a, b) => {
  925. return a.devices.length - b.devices.length
  926. })
  927. candidateTasks[0].devices.push(device)
  928. }
  929. })
  930. for (let r of res) {
  931. if (r.devices.length > 0) {
  932. await this.dispatchTask(r.task, r.devices)
  933. }
  934. }
  935. },
  936. {
  937. timeout: 1
  938. }
  939. )
  940. .catch((e) => {
  941. if (e.message.includes('timed out')) return
  942. Logger.error('Error dispatchTask', e.stack, this.TAG)
  943. })
  944. }
  945. refineContent(content: string) {
  946. const regex =
  947. /https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)/g
  948. const urls = []
  949. let match
  950. while ((match = regex.exec(content)) !== null) {
  951. if (!urls.includes(match[0])) {
  952. urls.push(match[0])
  953. }
  954. }
  955. urls.forEach((url) => {
  956. try {
  957. const u = new URL(url)
  958. u.searchParams.append(randomstring.generate(6), randomstring.generate(6))
  959. content = content.replaceAll(url, u.toString())
  960. } catch (error) {
  961. Logger.error('Error parsing url', error.stack, this.TAG)
  962. }
  963. })
  964. return content
  965. }
  966. async dispatchTask(task: Task, devices: Device[]) {
  967. const taskItems = await this.taskItemRepository.find({
  968. where: {
  969. taskId: task.id,
  970. status: TaskItemStatus.IDLE
  971. },
  972. take: devices.length * 5
  973. })
  974. if (taskItems.length === 0) {
  975. return
  976. }
  977. if (devices.length === 0) {
  978. return
  979. }
  980. devices = devices.splice(0, Math.ceil(taskItems.length / 5))
  981. const taskConfig = {
  982. rcsWait: task.rcsWait || (await this.getConfig('rcs_wait', 2000)),
  983. rcsInterval: task.rcsInterval || (await this.getConfig('rcs_interval', 3000)),
  984. cleanCount: task.cleanCount || (await this.getConfig('clean_count', 20)),
  985. requestNumberInterval: task.requestNumberInterval || (await this.getConfig('request_number_interval', 100)),
  986. checkConnection: task.checkConnection,
  987. useBackup: task.useBackup,
  988. e2ee: task.e2ee,
  989. e2eeTimeout: task.e2eeTimeout || (await this.getConfig('e2ee_timeout', 5000))
  990. }
  991. await this.updateTaskItemStatusAndSendAt(
  992. taskItems.map((i) => i.id),
  993. TaskItemStatus.PENDING
  994. )
  995. await this.deviceService.updateDevice(
  996. devices.map((d) => d.id),
  997. { busy: true }
  998. )
  999. Promise.all(
  1000. devices.map(async (device, i) => {
  1001. const items = taskItems
  1002. .slice(i * 5, i * 5 + 5)
  1003. .map((item) => ({ ...item, message: this.getMessage(task) }))
  1004. if (items.length === 0) return
  1005. try {
  1006. const res: any = await Promise.race([
  1007. this.eventsGateway.sendForResult(
  1008. {
  1009. id: randomUUID(),
  1010. action: 'task',
  1011. data: {
  1012. config: { ...taskConfig, ...(device.configOverrides || {}) },
  1013. tasks: items,
  1014. taskId: task.id
  1015. }
  1016. },
  1017. device.socketId
  1018. ),
  1019. setTimeout(120000).then(() => {
  1020. return Promise.reject(new Error('timeout waiting for response'))
  1021. })
  1022. ])
  1023. Logger.log(
  1024. `Task completed: ${res.success.length} success, ${res.fail.length} fail, ${
  1025. res.retry?.length || 0
  1026. } retry`,
  1027. this.TAG
  1028. )
  1029. if (res.success?.length > 0) {
  1030. await this.updateTaskItemStatus(res.success, TaskItemStatus.SUCCESS)
  1031. }
  1032. if (res.fail?.length > 0) {
  1033. await this.updateTaskItemStatus(res.fail, TaskItemStatus.FAIL)
  1034. }
  1035. if (res.retry?.length > 0) {
  1036. await this.updateTaskItemStatus(res.retry, TaskItemStatus.IDLE)
  1037. }
  1038. } catch (e) {
  1039. Logger.error('Error running task 3', e.stack, this.TAG)
  1040. await this.updateTaskItemStatus(
  1041. items.map((i) => i.id),
  1042. TaskItemStatus.IDLE
  1043. )
  1044. }
  1045. })
  1046. ).then(async () => {
  1047. const counts = (
  1048. await this.taskItemRepository.manager.query(
  1049. `select status, count(*) as count
  1050. from task_item
  1051. where taskId = ${task.id} and embed = 0
  1052. group by status`
  1053. )
  1054. ).reduce((acc, item) => {
  1055. acc[item.status] = parseInt(item.count)
  1056. return acc
  1057. }, {})
  1058. Logger.log('Task counts', JSON.stringify(counts), this.TAG)
  1059. const successCount = counts.success || 0
  1060. const failCount = counts.fail || 0
  1061. const pendingCount = counts.pending || 0
  1062. const idleCount = counts.idle || 0
  1063. const finish = pendingCount === 0 && idleCount === 0
  1064. const data: Partial<Task> = {
  1065. sent: successCount + failCount,
  1066. successCount: successCount,
  1067. successRate:
  1068. successCount + failCount > 0
  1069. ? ((successCount / (successCount + failCount)) * 100).toFixed(1) + '%'
  1070. : '0%'
  1071. }
  1072. if (finish) {
  1073. data.status = TaskStatus.COMPLETED
  1074. await this.updateSend(task.userId)
  1075. }
  1076. await this.taskRepository.update({ id: task.id }, data)
  1077. })
  1078. }
  1079. @Interval(10000)
  1080. async fixDeadTask() {
  1081. const tasks = await this.taskRepository.findBy({
  1082. status: TaskStatus.PENDING
  1083. })
  1084. for (let task of tasks) {
  1085. const items = await this.taskItemRepository.findBy({
  1086. taskId: task.id,
  1087. status: TaskItemStatus.PENDING,
  1088. sendAt: LessThan(addMinutes(new Date(), -10))
  1089. })
  1090. if (items?.length > 0) {
  1091. await this.updateTaskItemStatus(
  1092. items.map((i) => i.id),
  1093. TaskItemStatus.IDLE
  1094. )
  1095. }
  1096. }
  1097. }
  1098. @Interval(60000)
  1099. async changeCheckAvailabilityNumbers() {
  1100. let config: SysConfig
  1101. try {
  1102. config = await this.sysConfigService.findByName('check_availability_numbers')
  1103. } catch (e) {
  1104. Logger.error('Error getting check_availability_numbers', e.stack, this.TAG)
  1105. config = new SysConfig()
  1106. config.name = 'check_availability_numbers'
  1107. }
  1108. for (let i = 0; i < 99; i++) {
  1109. const items = await this.taskItemRepository
  1110. .createQueryBuilder()
  1111. .select()
  1112. .where('status = :status', { status: TaskItemStatus.SUCCESS })
  1113. .andWhere('sendAt > :sendAt', { sendAt: addHours(new Date(), -6 * (i + 1)) })
  1114. .orderBy('RAND()')
  1115. .limit(5)
  1116. .getMany()
  1117. if (5 == items.length) {
  1118. config.value = items.map((i) => i.number).join(',')
  1119. break
  1120. }
  1121. }
  1122. await this.sysConfigService.save(config)
  1123. }
  1124. public async updateSend(id: number) {
  1125. const user = await this.userRepository.findOneBy({ id })
  1126. user.send = await this.balanceRecordRepository.sum('amount', {
  1127. userId: id,
  1128. type: BalanceType.CONSUMPTION
  1129. })
  1130. return await this.userRepository.save(user)
  1131. }
  1132. @Cron('0 0,30 * * * *')
  1133. async scheduledTaskExecution() {
  1134. console.log('The scheduled task starts,', new Date())
  1135. const tasks = await this.taskRepository.findBy({
  1136. status: TaskStatus.SCHEDULED,
  1137. startedAt: LessThanOrEqual(new Date())
  1138. })
  1139. for (const task of tasks) {
  1140. try {
  1141. await this.startTask(task.id)
  1142. console.log(`Task ${task.id} started successfully.`)
  1143. } catch (error) {
  1144. console.error(`Error starting task ${task.id}:`, error)
  1145. }
  1146. }
  1147. console.log('The scheduled task is executed.')
  1148. return tasks
  1149. }
  1150. }