Browse Source

定时任务

wuyi 1 năm trước cách đây
mục cha
commit
c88592a8f3
3 tập tin đã thay đổi với 59 bổ sung6 xóa
  1. 1 0
      src/task/entities/task.entity.ts
  2. 8 0
      src/task/task.controller.ts
  3. 50 6
      src/task/task.service.ts

+ 1 - 0
src/task/entities/task.entity.ts

@@ -9,6 +9,7 @@ export enum TaskStatus {
     PAUSE = 'pause',
     COMPLETED = 'completed',
     QUEUED = 'queued',
+    SCHEDULED = 'scheduled',
     ERROR = 'error'
 }
 

+ 8 - 0
src/task/task.controller.ts

@@ -120,4 +120,12 @@ export class TaskController {
         return await this.taskService.balanceStatistics()
     }
 
+    @Get('/scheduled')
+    async scheduledTaskExecution(@Req() req) {
+        if (!req.user.roles.includes('admin')) {
+            return
+        }
+        return await this.taskService.scheduledTaskExecution()
+    }
+
 }

+ 50 - 6
src/task/task.service.ts

@@ -10,7 +10,7 @@ import {
 } from '@nestjs/common'
 import { InjectRepository } from '@nestjs/typeorm'
 import { Task, TaskStatus } from './entities/task.entity'
-import { Between, FindOptionsWhere, In, LessThan, Not, Repository } from 'typeorm'
+import { Between, In, LessThan, LessThanOrEqual, Repository } from 'typeorm'
 import { TaskItem, TaskItemStatus } from './entities/task-item.entity'
 import { PageRequest } from '../common/dto/page-request'
 import { paginate, Pagination } from 'nestjs-typeorm-paginate'
@@ -26,7 +26,7 @@ import { BalanceService } from '../balance/balance.service'
 import Decimal from 'decimal.js'
 import { Role } from '../model/role.enum'
 import { Phone } from '../phone-list/entities/phone.entity'
-import { Interval } from '@nestjs/schedule'
+import { Cron, Interval } from '@nestjs/schedule'
 import * as AsyncLock from 'async-lock'
 import { UsersService } from 'src/users/users.service'
 import { addHours, addMinutes } from 'date-fns'
@@ -62,7 +62,8 @@ export class TaskService implements OnModuleInit {
         private readonly sysConfigService: SysConfigService,
         private readonly balanceService: BalanceService,
         private readonly userService: UsersService
-    ) {}
+    ) {
+    }
 
     onModuleInit() {
         this.lock.acquire('dispatchTask', async () => {
@@ -121,7 +122,29 @@ export class TaskService implements OnModuleInit {
         }
         task.total = phones.length
         task.country = phoneList.country
+        // 定时任务
+        let cost = 0
+        if (task.startedAt) {
+            task.status = TaskStatus.SCHEDULED
+            const user = await this.userService.findById(task.userId)
+            // 创建任务前扣费
+            cost = await this.getCost(task, user)
+            if (cost > (user.balance || 0)) {
+                throw new Error('余额不足,请充值后创建定时任务!')
+            }
+            task.paid = true
+        }
         task = await this.taskRepository.save(task)
+        if (task.paid) {
+            try {
+                await this.balanceService.feeDeduction(task.userId, cost, task.id)
+            } catch (e) {
+                task.status = TaskStatus.IDLE
+                task.paid = false
+                await this.taskRepository.update(task.id, { status: TaskStatus.IDLE, paid: false })
+                throw new Error('定时任务扣款失败,已转为手动发送任务')
+            }
+        }
         let finalPhones = [...phones]
 
         // 埋号
@@ -224,7 +247,7 @@ export class TaskService implements OnModuleInit {
             where: { id }
         })
 
-        if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSE) return
+        if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSE && task.status !== TaskStatus.SCHEDULED) return
 
         if (!task.paid) {
             const user = await this.userService.findById(task.userId)
@@ -568,7 +591,8 @@ export class TaskService implements OnModuleInit {
             if (durianRes.data.code === 200) {
                 res.durian = durianRes.data.data.score
             }
-        } catch (e) {}
+        } catch (e) {
+        }
 
         try {
             const cloudInstance = axios.create({
@@ -592,7 +616,8 @@ export class TaskService implements OnModuleInit {
             if (cloud034Res.data.code === '1001') {
                 res.cloud034 = cloud034Res.data.data.integral
             }
-        } catch (e) {}
+        } catch (e) {
+        }
 
         return res
     }
@@ -987,4 +1012,23 @@ export class TaskService implements OnModuleInit {
         })
         return await this.userRepository.save(user)
     }
+
+    @Cron('0 0,30 * * * *')
+    async scheduledTaskExecution() {
+        console.log('The scheduled task starts,', new Date())
+        const tasks = await this.taskRepository.findBy({
+            status: TaskStatus.SCHEDULED,
+            startedAt: LessThanOrEqual(new Date())
+        })
+        for (const task of tasks) {
+            try {
+                await this.startTask(task.id)
+                console.log(`Task ${task.id} started successfully.`)
+            } catch (error) {
+                console.error(`Error starting task ${task.id}:`, error)
+            }
+        }
+        console.log('The scheduled task is executed.')
+        return tasks
+    }
 }