x1ongzhu 1 год назад
Родитель
Сommit
fc993e8252

+ 23 - 9
src/events/events.gateway.ts

@@ -10,6 +10,13 @@ import {
 } from '@nestjs/websockets'
 import { Server, Socket } from 'socket.io'
 import { RcsService } from '../rcs/rcs.service'
+import { randomUUID } from 'crypto'
+
+export interface Message {
+    id: string
+    action: string
+    data?: any
+}
 
 @WebSocketGateway({
     cors: {
@@ -32,16 +39,12 @@ export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGate
     }
 
     handleConnection(client: Socket, ...args: any[]) {
-        const id = client.handshake.query.id
-        const model = client.handshake.query.model
-        const name = client.handshake.query.name
+        const id = client.handshake.query.id as string
+        const model = client.handshake.query.model as string
+        const name = client.handshake.query.name as string
+        const canSend = client.handshake.query.canSend === 'true'
         Logger.log(`Client connected: id=${id}, model=${model}, name=${name}`, 'EventsGateway')
-        this.rcsService.deviceConnect(
-            client.handshake.query.id as string,
-            client.handshake.query.device as string,
-            client.handshake.query.name as string,
-            (client.handshake.query.canSend as string) === 'true'
-        )
+        this.rcsService.deviceConnect(id, client.id, model, name, canSend)
     }
 
     handleDisconnect(client: any) {
@@ -61,4 +64,15 @@ export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGate
     public emitEvent(event: string, data: any) {
         this.server.emit(event, data)
     }
+
+    public sendForResult(message: Message, to?: string) {
+        this.server.once(message.id, (data) => {
+            Logger.log(`Received response for ${message.id}: ${data}`, 'EventsGateway')
+        })
+        if (to) {
+            this.server.to(to).emit('message', message)
+        } else {
+            this.server.emit('message', message)
+        }
+    }
 }

+ 2 - 1
src/rcs/entities/device.entity.ts

@@ -20,5 +20,6 @@ export class Device {
     @Column()
     canSend: boolean
 
-    
+    @Column()
+    socketId: string
 }

+ 3 - 0
src/rcs/entities/task.entity.ts

@@ -29,4 +29,7 @@ export class Task {
 
     @Column({ type: 'enum', enum: TaskStatus, nullable: false, default: TaskStatus.IDLE })
     status: TaskStatus
+
+    @Column()
+    error: string
 }

+ 3 - 1
src/rcs/rcs.controller.ts

@@ -7,6 +7,7 @@ import { Phone } from './entities/phone.entity'
 import { PhoneList } from './entities/phone-list.entity'
 import { Device } from './entities/device.entity'
 import { FileInterceptor } from '@nestjs/platform-express'
+import { Public } from 'src/auth/public.decorator'
 
 @Controller('rcs')
 export class RcsController {
@@ -85,7 +86,8 @@ export class RcsController {
     }
 
     @Post('/updateDevice/:id')
+    @Public()
     async deviceConnect(@Param('id') id: string, @Req() req) {
-        return await this.rcsService.deviceConnect(id, req.body)
+        return await this.rcsService.updateDevice(id, req.body)
     }
 }

+ 39 - 6
src/rcs/rcs.service.ts

@@ -1,4 +1,4 @@
-import { Inject, Injectable, OnModuleInit, forwardRef } from '@nestjs/common'
+import { Inject, Injectable, Logger, OnModuleInit, forwardRef } from '@nestjs/common'
 import { InjectRepository } from '@nestjs/typeorm'
 import { PhoneList } from './entities/phone-list.entity'
 import { Repository } from 'typeorm'
@@ -12,6 +12,8 @@ import axios from 'axios'
 import { USACodeApiService } from './usacode-api-service'
 import { Device } from './entities/device.entity'
 import { NumberRequest } from './entities/number-request.entity'
+import { randomUUID } from 'crypto'
+import { setTimeout } from 'timers/promises'
 
 @Injectable()
 export class RcsService implements OnModuleInit {
@@ -36,6 +38,7 @@ export class RcsService implements OnModuleInit {
 
     onModuleInit() {
         this.deviceRepository.update({}, { online: false })
+        this.taskRepository.update({ status: TaskStatus.PENDING }, { status: TaskStatus.IDLE })
     }
 
     async findAllPhoneList(req: PageRequest<PhoneList>): Promise<Pagination<PhoneList>> {
@@ -111,10 +114,39 @@ export class RcsService implements OnModuleInit {
     }
 
     async runTask(task: Task) {
-        const taskItems = await this.taskItemRepository.findBy({ taskId: task.id, status: TaskStatus.IDLE })
-        for (const taskItem of taskItems) {
-            taskItem.status = TaskStatus.PENDING
-            await this.taskItemRepository.save(taskItem)
+        try {
+            let taskItems = await this.taskItemRepository.find({
+                where: { taskId: task.id, status: TaskStatus.IDLE },
+                take: 10
+            })
+            while (taskItems && taskItems.length > 0) {
+                let device = null
+                while (device === null) {
+                    device = this.deviceRepository.findOne({
+                        where: { online: true, canSend: true }
+                    })
+                    if (device === null) {
+                        await setTimeout(2000)
+                    }
+                }
+                await this.eventsGateway.sendForResult(
+                    {
+                        id: randomUUID(),
+                        action: 'task',
+                        data: taskItems
+                    },
+                    device.socketId
+                )
+                taskItems = await this.taskItemRepository.find({
+                    where: { taskId: task.id, status: TaskStatus.IDLE },
+                    take: 10
+                })
+            }
+        } catch (e) {
+            Logger.error('Error running task', e, 'RcsService')
+            task.status = TaskStatus.ERROR
+            task.error = e.message
+            await this.taskRepository.save(task)
         }
     }
 
@@ -122,12 +154,13 @@ export class RcsService implements OnModuleInit {
         return await paginate<Device>(this.deviceRepository, req.page, req.search)
     }
 
-    async deviceConnect(id: string, model: string, name?: string, canSend: boolean = false) {
+    async deviceConnect(id: string, socketId: string, model: string, name?: string, canSend: boolean = false) {
         let device = await this.deviceRepository.findOneBy({ id })
         if (!device) {
             device = new Device()
         }
         device.id = id
+        device.socketId = socketId
         device.model = model
         device.name = name
         device.online = true