x1ongzhu 1 jaar geleden
bovenliggende
commit
baa32d4fa0
4 gewijzigde bestanden met toevoegingen van 59 en 12 verwijderingen
  1. 1 1
      .env
  2. 1 0
      src/rcs/entities/task.entity.ts
  3. 5 0
      src/rcs/rcs.controller.ts
  4. 52 11
      src/rcs/rcs.service.ts

+ 1 - 1
.env

@@ -30,7 +30,7 @@ TYPEORM_HOST="rdsave1o67m1ido6gwp6public.mysql.rds.aliyuncs.com"
 TYPEORM_PORT=3306
 TYPEORM_USERNAME=zouma
 TYPEORM_PASSWORD="2wsx@WSX#EDC"
-TYPEORM_DATABASE=rcs
+TYPEORM_DATABASE=rcs_test
 TYPEORM_AUTO_SCHEMA_SYNC=true
 TYPEORM_ENTITIES="dist/**/*.entity.js"
 TYPEORM_SUBSCRIBERS="dist/subscriber/**/*.js"

+ 1 - 0
src/rcs/entities/task.entity.ts

@@ -3,6 +3,7 @@ import { Column, CreateDateColumn, Entity, PrimaryGeneratedColumn } from 'typeor
 export enum TaskStatus {
     IDLE = 'idle',
     PENDING = 'pending',
+    PAUSE = 'pause',
     COMPLETED = 'completed',
     ERROR = 'error'
 }

+ 5 - 0
src/rcs/rcs.controller.ts

@@ -80,6 +80,11 @@ export class RcsController {
         return await this.rcsService.startTask(parseInt(id))
     }
 
+    @Post('/task/:id/pause')
+    async pauseTask(@Param('id') id: string) {
+        return await this.rcsService.pauseTask(parseInt(id))
+    }
+
     @Post('/device')
     async findAllDevice(@Body() page: PageRequest<Device>) {
         return await this.rcsService.findAllDevice(page)

+ 52 - 11
src/rcs/rcs.service.ts

@@ -36,6 +36,8 @@ export class RcsService implements OnModuleInit {
         private readonly USACodeApiService: USACodeApiService
     ) {}
 
+    private taskControllers: { [key: number]: AbortController } = {}
+
     onModuleInit() {
         this.deviceRepository.update({}, { online: false, busy: false })
         this.taskRepository.update({ status: TaskStatus.PENDING }, { status: TaskStatus.IDLE })
@@ -106,17 +108,36 @@ export class RcsService implements OnModuleInit {
 
     async startTask(id: number): Promise<void> {
         const task = await this.taskRepository.findOneBy({ id })
-        if (task && task.status === TaskStatus.IDLE) {
+        if ((task && task.status === TaskStatus.IDLE) || task.status === TaskStatus.PAUSE) {
             task.status = TaskStatus.PENDING
             await this.taskRepository.save(task)
             this.runTask(task)
         }
     }
 
+    async pauseTask(id: number): Promise<void> {
+        const task = await this.taskRepository.findOneBy({ id })
+        if (task && task.status === TaskStatus.PENDING) {
+            task.status = TaskStatus.PAUSE
+            await this.taskRepository.save(task)
+            const controller = this.taskControllers[task.id]
+            if (controller) {
+                controller.abort()
+            }
+        }
+    }
+
     async runTask(task: Task) {
+        let controller = new AbortController()
+        this.taskControllers[task.id] = controller
         try {
             let finish = false
             while (!finish) {
+                if (controller.signal.aborted) {
+                    Logger.log('Task aborted', 'RcsService')
+                    return
+                }
+
                 let taskItems = await this.taskItemRepository.find({
                     where: { taskId: task.id, status: TaskItemStatus.IDLE },
                     take: 5
@@ -131,6 +152,10 @@ export class RcsService implements OnModuleInit {
 
                 let device = null
                 while (device === null) {
+                    if (controller.signal.aborted) {
+                        Logger.log('Task aborted', 'RcsService')
+                        return
+                    }
                     device = await this.deviceRepository.findOne({
                         where: { online: true, canSend: true, busy: false }
                     })
@@ -145,28 +170,39 @@ export class RcsService implements OnModuleInit {
                 device.busy = true
                 await this.deviceRepository.save(device)
                 await this.taskItemRepository.save(taskItems)
-                Logger.log(`Sending task to device ${device.id}(${device.model})`, 'RcsService')
-                this.eventsGateway
-                    .sendForResult(
+                Logger.log(`Send task to device ${device.id}(${device.model})`, 'RcsService')
+                Promise.race([
+                    this.eventsGateway.sendForResult(
                         {
                             id: randomUUID(),
                             action: 'task',
                             data: taskItems
                         },
                         device.socketId
-                    )
-                    .then(async (res: any) => {
+                    ),
+                    setTimeout(60000).then(() => {
+                        return Promise.resolve({ error: 'Timeout' })
+                    })
+                ]).then(async (res: any) => {
+                    if (res.error) {
+                        Logger.error('Task timeout', 'RcsService')
+                        await this.taskItemRepository.update(
+                            taskItems.map((i) => i.id),
+                            { status: TaskItemStatus.FAIL }
+                        )
+                    } else {
+                        Logger.log(
+                            `Task completed: ${res.success.length} success, ${res.fail.length} fail`,
+                            'RcsService'
+                        )
                         if (res.success?.length > 0) {
                             await this.taskItemRepository.update(res.success, { status: TaskItemStatus.SUCCESS })
                         }
                         if (res.fail?.length > 0) {
                             await this.taskItemRepository.update(res.fail, { status: TaskItemStatus.FAIL })
                         }
-                        Logger.log(
-                            `Task completed: ${res.success.length} success, ${res.fail.length} fail`,
-                            'RcsService'
-                        )
-                    })
+                    }
+                })
             }
         } catch (e) {
             Logger.error('Error running task', e, 'RcsService')
@@ -174,6 +210,8 @@ export class RcsService implements OnModuleInit {
             task.error = e.message
             await this.taskRepository.save(task)
         }
+
+        this.taskControllers[task.id] = null
     }
 
     async findAllDevice(req: PageRequest<Device>): Promise<Pagination<Device>> {
@@ -191,6 +229,7 @@ export class RcsService implements OnModuleInit {
         device.name = name
         device.online = true
         device.canSend = canSend
+        device.busy = false
         await this.deviceRepository.save(device)
     }
 
@@ -198,6 +237,8 @@ export class RcsService implements OnModuleInit {
         const device = await this.deviceRepository.findOneBy({ id })
         if (device) {
             device.online = false
+            device.busy = false
+            device.canSend = false
             await this.deviceRepository.save(device)
         }
     }