x1ongzhu před 1 rokem
rodič
revize
97be58e7b1

+ 1 - 0
package.json

@@ -77,6 +77,7 @@
     "mysql2": "^3.1.2",
     "nestjs-typeorm-paginate": "^4.0.3",
     "nodemailer": "^6.9.1",
+    "p-queue": "^6.6.2",
     "p-timeout": "^6.1.1",
     "passport": "^0.6.0",
     "passport-http-bearer": "^1.0.1",

+ 9 - 1
src/device/device.module.ts

@@ -5,9 +5,17 @@ import { TypeOrmModule } from '@nestjs/typeorm'
 import { Device } from './entities/device.entity'
 import { TaskModule } from 'src/task/task.module'
 import { OperaterConfigModule } from '../operator_config/operator_config.module'
+import { EventsModule } from '../events/events.module'
+import { DeviceTask } from './entities/device-task.entity'
+import { DeviceTaskItem } from './entities/device-task-item.entity'
 
 @Module({
-    imports: [TypeOrmModule.forFeature([Device]), forwardRef(() => TaskModule), OperaterConfigModule],
+    imports: [
+        TypeOrmModule.forFeature([Device, DeviceTask, DeviceTaskItem]),
+        forwardRef(() => TaskModule),
+        OperaterConfigModule,
+        forwardRef(() => EventsModule)
+    ],
     providers: [DeviceService],
     controllers: [DeviceController],
     exports: [DeviceService]

+ 99 - 1
src/device/device.service.ts

@@ -1,3 +1,4 @@
+import { compare } from 'bcrypt'
 import { forwardRef, Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common'
 import { PageRequest } from '../common/dto/page-request'
 import { Device } from './entities/device.entity'
@@ -9,6 +10,12 @@ import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity
 import { TaskService } from '../task/task.service'
 import { OperatorConfigService } from '../operator_config/operator_config.service'
 import { Interval } from '@nestjs/schedule'
+import { EventsGateway } from 'src/events/events.gateway'
+import { DeviceTask, DeviceTaskStatus } from './entities/device-task.entity'
+import { DeviceTaskItem, DeviceTaskItemStatus } from './entities/device-task-item.entity'
+import PQueue from 'p-queue'
+import { randomUUID } from 'crypto'
+import { setTimeout } from 'timers/promises'
 
 @Injectable()
 export class DeviceService implements OnModuleInit {
@@ -18,7 +25,13 @@ export class DeviceService implements OnModuleInit {
         private deviceRepository: Repository<Device>,
         @Inject(forwardRef(() => TaskService))
         private taskService: TaskService,
-        private operatorConfigService: OperatorConfigService
+        private operatorConfigService: OperatorConfigService,
+        @Inject(forwardRef(() => EventsGateway))
+        private eventsGateway: EventsGateway,
+        @InjectRepository(DeviceTask)
+        private deviceTaskRepository: Repository<DeviceTask>,
+        @InjectRepository(DeviceTaskItem)
+        private deviceTaskItemRepository: Repository<DeviceTaskItem>
     ) {}
 
     async onModuleInit() {
@@ -214,4 +227,89 @@ export class DeviceService implements OnModuleInit {
             Logger.error(e)
         }
     }
+
+    async createTask({ taskType, payload }: { taskType: string; payload: any; total: number }) {
+        const devices = await this.deviceRepository.findBy({
+            online: true
+        })
+        const task = await this.deviceTaskRepository.save({
+            taskType,
+            payload,
+            total: devices.length,
+            progress: 0
+        })
+        const taskItems = devices.map((device) => {
+            return new DeviceTaskItem({
+                taskId: task.id,
+                deviceId: device.id,
+                status: DeviceTaskItemStatus.IDLE
+            })
+        })
+        await this.deviceTaskItemRepository.save(taskItems)
+        this.runTask(task)
+        return task
+    }
+
+    async runTask(task: DeviceTask) {
+        await this.deviceTaskRepository.update(task.id, {
+            status: DeviceTaskStatus.PENDING
+        })
+        const taskItems = await this.deviceTaskItemRepository.findBy({ taskId: task.id })
+        const queue = new PQueue({ concurrency: 5 })
+        queue.on('next', async () => {
+            const progress = await this.deviceTaskItemRepository.countBy({
+                taskId: task.id,
+                status: In([DeviceTaskItemStatus.FAILED, DeviceTaskItemStatus.SUCCESS])
+            })
+            this.deviceTaskRepository.update(task.id, {
+                progress
+            })
+        })
+        for (const taskItem of taskItems) {
+            queue.add(async () => {
+                await this.deviceTaskItemRepository.update(taskItem.id, {
+                    status: DeviceTaskItemStatus.PENDING
+                })
+                let error = ''
+                try {
+                    const device = await this.deviceRepository.findOneBy({
+                        id: taskItem.deviceId
+                    })
+                    if (device) {
+                        const res = await Promise.race([
+                            this.eventsGateway.sendForResult(
+                                {
+                                    id: randomUUID(),
+                                    action: task.taskType,
+                                    data: task.payload
+                                },
+                                device.socketId
+                            ),
+                            setTimeout(5 * 60 * 100, 'timeout')
+                        ])
+                        if (res === 'timeout') {
+                            error = 'timeout'
+                        }
+                    }
+                } catch (e) {
+                    error = e.message
+                    Logger.error(e)
+                }
+                if (error) {
+                    await this.deviceTaskItemRepository.update(taskItem.id, {
+                        status: DeviceTaskItemStatus.FAILED,
+                        error
+                    })
+                } else {
+                    await this.deviceTaskItemRepository.update(taskItem.id, {
+                        status: DeviceTaskItemStatus.SUCCESS
+                    })
+                }
+            })
+        }
+        await queue.onIdle()
+        await this.deviceTaskRepository.update(task.id, {
+            status: DeviceTaskStatus.COMPLETE
+        })
+    }
 }

+ 44 - 0
src/device/entities/device-task-item.entity.ts

@@ -0,0 +1,44 @@
+import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn } from 'typeorm'
+
+export enum DeviceTaskItemStatus {
+    IDLE = 'idle',
+    PENDING = 'pending',
+    SUCCESS = 'success',
+    FAILED = 'failed'
+}
+
+@Entity()
+export class DeviceTaskItem {
+    constructor(partial?: Partial<DeviceTaskItem>) {
+        if (partial) {
+            Object.assign(this, partial)
+        }
+    }
+
+    @PrimaryGeneratedColumn()
+    id: number
+
+    @CreateDateColumn()
+    createdAt: Date
+
+    @Column()
+    @Index()
+    taskId: number
+
+    @Column({ length: 50 })
+    @Index()
+    deviceId: string
+
+    @Column({ type: 'enum', enum: DeviceTaskItemStatus, default: DeviceTaskItemStatus.IDLE })
+    @Index()
+    status: DeviceTaskItemStatus
+
+    @Column({ nullable: true })
+    startedAt: Date
+
+    @Column({ nullable: true })
+    completedAt: Date
+
+    @Column({ type: 'text', nullable: true })
+    error: string
+}

+ 38 - 0
src/device/entities/device-task.entity.ts

@@ -0,0 +1,38 @@
+import { JsonTransformer } from '../../transformers/json.transformer'
+import { Column, CreateDateColumn, Entity, PrimaryGeneratedColumn } from 'typeorm'
+
+export enum DeviceTaskStatus {
+    IDLE = 'idle',
+    PENDING = 'pending',
+    COMPLETE = 'complete'
+}
+
+@Entity()
+export class DeviceTask {
+    @PrimaryGeneratedColumn()
+    id: number
+
+    @CreateDateColumn()
+    createdAt: Date
+
+    @Column()
+    taskType: string
+
+    @Column({ type: 'text', nullable: false, transformer: new JsonTransformer() })
+    payload: any
+
+    @Column({ type: 'enum', enum: DeviceTaskStatus, default: DeviceTaskStatus.IDLE })
+    status: DeviceTaskStatus
+
+    @Column({ nullable: true })
+    startedAt: Date
+
+    @Column({ nullable: true })
+    completedAt: Date
+
+    @Column()
+    progress: number
+
+    @Column()
+    total: number
+}

+ 21 - 4
src/events/events.gateway.ts

@@ -1,4 +1,4 @@
-import { Inject, Logger, forwardRef } from '@nestjs/common'
+import { Inject, Logger, UseFilters, forwardRef } from '@nestjs/common'
 import {
     MessageBody,
     OnGatewayConnection,
@@ -10,6 +10,7 @@ import {
 } from '@nestjs/websockets'
 import { Server, Socket } from 'socket.io'
 import { DeviceService } from '../device/device.service'
+import { AllExceptionsSocketFilter } from '../filters/all-exceptions-socket.filter'
 
 export interface Message {
     id: string
@@ -24,6 +25,7 @@ export interface Message {
         credentials: true
     }
 })
+@UseFilters(new AllExceptionsSocketFilter())
 export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
     @WebSocketServer()
     server: Server
@@ -44,8 +46,13 @@ export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGate
         const model = client.handshake.query.model as string
         const name = client.handshake.query.name as string
         const version = client.handshake.query.version as any
-        Logger.log(`Client connected: id=${id}, model=${model}, name=${name}, version=${version || ''}`, 'EventsGateway')
-        this.deviceService.deviceConnect(id, client.id, model, name, version)
+        Logger.log(
+            `Client connected: id=${id}, model=${model}, name=${name}, version=${version || ''}`,
+            'EventsGateway'
+        )
+        if (id && model && name) {
+            this.deviceService.deviceConnect(id, client.id, model, name, version)
+        }
     }
 
     handleDisconnect(client: any) {
@@ -67,7 +74,7 @@ export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGate
 
     @SubscribeMessage('callback')
     handleCallback(client: Socket, data: any) {
-        // Logger.log(`Received callback: ${JSON.stringify(data)}`, 'EventsGateway')
+        Logger.log(`Received callback: ${JSON.stringify(data)}`, 'EventsGateway')
         if (this.callbacks[data.id]) {
             const success = data.status === 0
             if (success) {
@@ -79,6 +86,16 @@ export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGate
         }
     }
 
+    @SubscribeMessage('redirect')
+    async redirect(client: Socket, data: any) {
+        Logger.log(`Received redirect: ${JSON.stringify(data)}`, 'EventsGateway')
+        try {
+            await this.sendForResult(data.message, data.to)
+        } catch (e) {
+            Logger.error(`Error redirect message`, e.stack, 'EventsGateway')
+        }
+    }
+
     public emitEvent(event: string, data: any) {
         this.server.emit(event, data)
     }

+ 7 - 0
src/filters/all-exceptions-socket.filter.ts

@@ -0,0 +1,7 @@
+import { ArgumentsHost, Catch } from '@nestjs/common'
+import { BaseWsExceptionFilter } from '@nestjs/websockets'
+
+@Catch()
+export class AllExceptionsSocketFilter extends BaseWsExceptionFilter {
+    catch(exception: any, host: ArgumentsHost) {}
+}

+ 25 - 0
yarn.lock

@@ -3660,6 +3660,11 @@ etag@~1.8.1:
   resolved "https://registry.npmmirror.com/etag/-/etag-1.8.1.tgz#41ae2eeb65efa62268aebfea83ac7d79299b0887"
   integrity sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg==
 
+eventemitter3@^4.0.4:
+  version "4.0.7"
+  resolved "https://registry.npmmirror.com/eventemitter3/-/eventemitter3-4.0.7.tgz#2de9b68f6528d5644ef5c59526a1b4a07306169f"
+  integrity sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==
+
 events@^3.2.0:
   version "3.3.0"
   resolved "https://registry.npmmirror.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400"
@@ -6418,6 +6423,11 @@ osx-release@^1.0.0:
   dependencies:
     minimist "^1.1.0"
 
+p-finally@^1.0.0:
+  version "1.0.0"
+  resolved "https://registry.npmmirror.com/p-finally/-/p-finally-1.0.0.tgz#3fbcfb15b899a44123b34b6dcc18b724336a2cae"
+  integrity sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==
+
 p-limit@^2.2.0:
   version "2.3.0"
   resolved "https://registry.npmmirror.com/p-limit/-/p-limit-2.3.0.tgz#3dd33c647a214fdfffd835933eb086da0dc21db1"
@@ -6446,6 +6456,21 @@ p-locate@^5.0.0:
   dependencies:
     p-limit "^3.0.2"
 
+p-queue@^6.6.2:
+  version "6.6.2"
+  resolved "https://registry.npmmirror.com/p-queue/-/p-queue-6.6.2.tgz#2068a9dcf8e67dd0ec3e7a2bcb76810faa85e426"
+  integrity sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==
+  dependencies:
+    eventemitter3 "^4.0.4"
+    p-timeout "^3.2.0"
+
+p-timeout@^3.2.0:
+  version "3.2.0"
+  resolved "https://registry.npmmirror.com/p-timeout/-/p-timeout-3.2.0.tgz#c7e17abc971d2a7962ef83626b35d635acf23dfe"
+  integrity sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==
+  dependencies:
+    p-finally "^1.0.0"
+
 p-timeout@^6.1.1:
   version "6.1.2"
   resolved "https://registry.npmmirror.com/p-timeout/-/p-timeout-6.1.2.tgz#22b8d8a78abf5e103030211c5fc6dee1166a6aa5"