x1ongzhu 1 ano atrás
pai
commit
a1aaea3e78

+ 3 - 4
src/rcs-number/entities/rcs-number.entity.ts

@@ -40,21 +40,20 @@ export class RcsNumber {
     @Column({ nullable: false })
     number: string
 
-    @Column({ type: 'text' })
+    @Column({ type: 'text', nullable: true })
     message: string
 
     @Column({ type: 'enum', enum: RcsNumberStatus, nullable: false, default: RcsNumberStatus.PENDING })
     status: RcsNumberStatus
 
-    @Column({ type: 'text', transformer: new JsonTransformer() })
+    @Column({ type: 'text', transformer: new JsonTransformer(), nullable: true })
     extra: any
 
     @Column({ nullable: true })
     deviceId: string
 
-    @Column()
+    @Column({ nullable: true })
     taskId: string
-
     @Exclude()
     deviceName: string
 

+ 0 - 10
src/shared/mailer/mailer.module.ts

@@ -1,10 +0,0 @@
-import { Module } from '@nestjs/common'
-import { MailerService } from './mailer.service'
-import { ConfigModule } from '@nestjs/config'
-
-@Module({
-    imports: [ConfigModule.forRoot({ isGlobal: true })],
-    providers: [MailerService],
-    exports: [MailerService]
-})
-export class MailerModule {}

+ 0 - 19
src/shared/mailer/mailer.service.spec.ts

@@ -1,19 +0,0 @@
-import { ConfigService } from '@nestjs/config'
-import { Test, TestingModule } from '@nestjs/testing'
-import { MailerService } from './mailer.service'
-
-describe('MailerService', () => {
-    let service: MailerService
-
-    beforeEach(async () => {
-        const module: TestingModule = await Test.createTestingModule({
-            providers: [ConfigService, MailerService]
-        }).compile()
-
-        service = module.get<MailerService>(MailerService)
-    })
-
-    it('should be defined', () => {
-        expect(service).toBeDefined()
-    })
-})

+ 0 - 39
src/shared/mailer/mailer.service.ts

@@ -1,39 +0,0 @@
-import { Injectable } from '@nestjs/common'
-import { createTransport } from 'nodemailer'
-import * as Mail from 'nodemailer/lib/mailer'
-import { ConfigService } from '@nestjs/config'
-import * as hbs from 'nodemailer-express-handlebars'
-
-@Injectable()
-export class MailerService {
-    private nodemailerTransport: Mail
-
-    constructor(private readonly configService: ConfigService) {
-        this.nodemailerTransport = createTransport({
-            host: this.configService.get<string>('EMAIL_HOST'),
-            port: this.configService.get<number>('EMAIL_PORT'),
-            auth: {
-                user: this.configService.get<string>('EMAIL_AUTH_USER'),
-                pass: this.configService.get<string>('EMAIL_AUTH_PASSWORD')
-            },
-            debug: this.configService.get<boolean>('EMAIL_DEBUG'),
-            logger: false
-        })
-
-        const options = {
-            viewEngine: {
-                extname: '.hbs', // handlebars extension
-                layoutsDir: process.cwd() + `${this.configService.get<string>('EMAIL_LAYOUT_DIR')}`, // location of handlebars templates
-                defaultLayout: `${this.configService.get<string>('EMAIL_DEFAULT_LAYOUT')}`, // name of main template
-                partialsDir: process.cwd() + `${this.configService.get<string>('EMAIL_PARTIAL_DIR')}` // location of your subtemplates aka. header, footer etc
-            },
-            viewPath: process.cwd() + `${this.configService.get<string>('EMAIL_VIEW_PATH')}`,
-            extName: '.hbs'
-        }
-        this.nodemailerTransport.use('compile', hbs(options))
-    }
-
-    sendMail(options: any) {
-        return this.nodemailerTransport.sendMail(options)
-    }
-}

+ 2 - 2
src/task/entities/task.entity.ts

@@ -33,7 +33,7 @@ export class Task {
     @Column({ type: 'enum', enum: TaskStatus, nullable: false, default: TaskStatus.IDLE })
     status: TaskStatus
 
-    @Column()
+    @Column({ nullable: true })
     error: string
 
     @Column({ default: 0 })
@@ -45,7 +45,7 @@ export class Task {
     @Column({ default: 0 })
     cleanCount: number
 
-    @Column()
+    @Column({ nullable: true })
     successRate: string
 
     @Column({ default: 0 })

+ 85 - 95
src/task/task.service.ts

@@ -40,8 +40,7 @@ export class TaskService implements OnModuleInit {
         private readonly deviceService: DeviceService,
         private readonly sysConfigService: SysConfigService,
         private readonly balanceService: BalanceService
-    ) {
-    }
+    ) {}
 
     onModuleInit() {
         this.taskRepository.update({ status: TaskStatus.PENDING }, { status: TaskStatus.IDLE })
@@ -55,7 +54,11 @@ export class TaskService implements OnModuleInit {
         const taskItems = result.items
         if (taskItems.length > 0) {
             for (const task of taskItems) {
-                if (task.status === TaskStatus.PENDING || (task.status === TaskStatus.COMPLETED && moment(task.createdAt).isSameOrAfter(moment().subtract(12, 'hours')))) {
+                if (
+                    task.status === TaskStatus.PENDING ||
+                    (task.status === TaskStatus.COMPLETED &&
+                        moment(task.createdAt).isSameOrAfter(moment().subtract(12, 'hours')))
+                ) {
                     const id = task.id
                     const successCount = await this.taskItemRepository.countBy({
                         taskId: id,
@@ -136,13 +139,11 @@ export class TaskService implements OnModuleInit {
         const task = await this.taskRepository.findOneBy({ id })
 
         // 查询当前是否有任务执行
-        const num = await this.taskRepository.count(
-            {
-                where: {
-                    status: TaskStatus.PENDING
-                }
+        const num = await this.taskRepository.count({
+            where: {
+                status: TaskStatus.PENDING
             }
-        )
+        })
         // 当前有任务
         if (num > 0 && task.status === TaskStatus.IDLE) {
             // 当前任务进入队列
@@ -152,7 +153,11 @@ export class TaskService implements OnModuleInit {
         }
 
         // 队列没有任务或队列中下一个待执行任务,启动此任务
-        if ((task && task.status === TaskStatus.IDLE) || task.status === TaskStatus.PAUSE || task.status === TaskStatus.QUEUED) {
+        if (
+            (task && task.status === TaskStatus.IDLE) ||
+            task.status === TaskStatus.PAUSE ||
+            task.status === TaskStatus.QUEUED
+        ) {
             if (task.status !== TaskStatus.PAUSE) {
                 const user = await this.userRepository.findOneBy({
                     id: task.userId
@@ -193,14 +198,11 @@ export class TaskService implements OnModuleInit {
                     })
                     newTask.sent = sendCount
                     await this.taskRepository.save(newTask)
-
                 } catch (e) {
                     Logger.error('Error startTask ', e, 'RcsService')
                 }
-
             }
         }
-
     }
 
     async pauseTask(id: number): Promise<void> {
@@ -243,7 +245,7 @@ export class TaskService implements OnModuleInit {
             }
         ]
 
-        taskItems.forEach(item => {
+        taskItems.forEach((item) => {
             let valid = '有效'
             let status = '发送成功'
             const sendAt: Date = item.sendAt
@@ -300,6 +302,26 @@ export class TaskService implements OnModuleInit {
         return total
     }
 
+    async getConfig(name, defValue) {
+        try {
+            return await this.sysConfigService.getNumber(name, defValue)
+        } catch (error) {
+            Logger.error('Error getting rcs wait time', error, 'RcsService')
+        }
+        return defValue
+    }
+
+    async updateTaskItemStatus(taskIds: number[], status: TaskItemStatus) {
+        await this.taskItemRepository.update(
+            {
+                id: In(taskIds)
+            },
+            {
+                status: status
+            }
+        )
+    }
+
     async runTask(task: Task) {
         let controller = new AbortController()
         this.taskControllers[task.id] = controller
@@ -312,46 +334,11 @@ export class TaskService implements OnModuleInit {
                 }
 
                 task = await this.taskRepository.findOneBy({ id: task.id })
-                let rcsWait = 2000
-                let rcsInterval = 3000
-                let cleanCount = 20
-                let requestNumberInterval = 100
-                if (task.rcsWait > 0) {
-                    rcsWait = task.rcsWait
-                } else {
-                    try {
-                        rcsWait = await this.sysConfigService.getNumber('rcs_wait', 2000)
-                    } catch (error) {
-                        Logger.error('Error getting rcs wait time', error, 'RcsService')
-                    }
-                }
-                if (task.rcsInterval > 0) {
-                    rcsInterval = task.rcsInterval
-                } else {
-                    try {
-                        rcsInterval = await this.sysConfigService.getNumber('rcs_interval', 3000)
-                    } catch (error) {
-                        Logger.error('Error getting rcs interval time', error, 'RcsService')
-                    }
-                }
-                if (task.cleanCount > 0) {
-                    cleanCount = task.cleanCount
-                } else {
-                    try {
-                        cleanCount = await this.sysConfigService.getNumber('clean_count', 20)
-                    } catch (error) {
-                        Logger.error('Error getting clean count', error, 'RcsService')
-                    }
-                }
-                if (task.requestNumberInterval > 0) {
-                    requestNumberInterval = task.requestNumberInterval
-                } else {
-                    try {
-                        requestNumberInterval = await this.sysConfigService.getNumber('request_number_interval', 100)
-                    } catch (error) {
-                        Logger.error('Error getting request number interval', error, 'RcsService')
-                    }
-                }
+                let rcsWait = task.rcsWait || (await this.getConfig('rcs_wait', 2000))
+                let rcsInterval = task.rcsInterval || (await this.getConfig('rcs_interval', 3000))
+                let cleanCount = task.cleanCount || (await this.getConfig('clean_count', 20))
+                let requestNumberInterval =
+                    task.requestNumberInterval || (await this.getConfig('request_number_interval', 100))
 
                 let taskItems = await this.taskItemRepository.find({
                     where: { taskId: task.id, status: TaskItemStatus.IDLE },
@@ -363,15 +350,14 @@ export class TaskService implements OnModuleInit {
                     task.status = TaskStatus.COMPLETED
                     await this.taskRepository.save(task)
                     // 从队列中获取下一个任务
-                    const tasks = await this.taskRepository.find(
-                        {
-                            where: {
-                                status: TaskStatus.QUEUED
-                            },
-                            order: {
-                                createdAt: 'ASC'
-                            }
-                        })
+                    const tasks = await this.taskRepository.find({
+                        where: {
+                            status: TaskStatus.QUEUED
+                        },
+                        order: {
+                            createdAt: 'ASC'
+                        }
+                    })
                     // 异步执行startTask方法
                     if (tasks.length > 0) {
                         this.startTask(tasks[0].id)
@@ -391,11 +377,11 @@ export class TaskService implements OnModuleInit {
                         await setTimeout(2000)
                     }
                 }
-                taskItems.forEach((taskItem) => {
-                    taskItem.status = TaskItemStatus.PENDING
-                })
+                await this.updateTaskItemStatus(
+                    taskItems.map((i) => i.id),
+                    TaskItemStatus.PENDING
+                )
                 await this.deviceService.setBusy(device.id, true)
-                await this.taskItemRepository.save(taskItems)
                 Logger.log(`Send task to device ${device.id}(${device.model})`, 'RcsService')
 
                 Promise.race([
@@ -414,38 +400,42 @@ export class TaskService implements OnModuleInit {
                     setTimeout(60000).then(() => {
                         return Promise.resolve({ error: 'Timeout' })
                     })
-                ]).then(async (res: any) => {
-                    if (res.error) {
-                        Logger.error('Task timeout', 'RcsService')
-                        await this.taskItemRepository.update(
-                            taskItems.map((i) => i.id),
-                            {
-                                status: TaskItemStatus.FAIL,
-                                sendAt: new Date()
+                ])
+                    .then(async (res: any) => {
+                        if (res.error) {
+                            Logger.error('Task timeout', 'RcsService')
+                            await this.updateTaskItemStatus(
+                                taskItems.map((i) => i.id),
+                                TaskItemStatus.PENDING
+                            )
+                        } else {
+                            Logger.log(
+                                `Task completed: ${res.success.length} success, ${res.fail.length} fail, ${
+                                    res.retry?.length || 0
+                                } retry`,
+                                'RcsService'
+                            )
+                            if (res.success?.length > 0) {
+                                await this.updateTaskItemStatus(res.success, TaskItemStatus.SUCCESS)
+                            }
+                            if (res.fail?.length > 0) {
+                                await this.updateTaskItemStatus(res.fail, TaskItemStatus.FAIL)
+                            }
+                            if (res.retry?.length > 0) {
+                                await this.updateTaskItemStatus(res.retry, TaskItemStatus.PENDING)
                             }
-                        )
-                    } else {
-                        Logger.log(
-                            `Task completed: ${res.success.length} success, ${res.fail.length} fail`,
-                            'RcsService'
-                        )
-                        if (res.success?.length > 0) {
-                            await this.taskItemRepository.update(res.success, {
-                                status: TaskItemStatus.SUCCESS,
-                                sendAt: new Date()
-                            })
-                        }
-                        if (res.fail?.length > 0) {
-                            await this.taskItemRepository.update(res.fail, {
-                                status: TaskItemStatus.FAIL,
-                                sendAt: new Date()
-                            })
                         }
-                    }
-                })
+                    })
+                    .catch(async (e) => {
+                        Logger.error('Error running task', e.stack, 'RcsService')
+                        await this.updateTaskItemStatus(
+                            taskItems.map((i) => i.id),
+                            TaskItemStatus.PENDING
+                        )
+                    })
             }
         } catch (e) {
-            Logger.error('Error running task', e, 'RcsService')
+            Logger.error('Error running task', e.stack, 'RcsService')
             task.status = TaskStatus.ERROR
             task.error = e.message
             await this.taskRepository.save(task)