Просмотр исходного кода

chore: Update DeviceService to find available devices by country

x1ongzhu 1 год назад
Родитель
Сommit
a3268d4b9e
3 измененных файлов с 100 добавлено и 80 удалено
  1. 7 2
      src/device/device.service.ts
  2. 1 1
      src/rcs-number/rcs-number.service.ts
  3. 92 77
      src/task/task.service.ts

+ 7 - 2
src/device/device.service.ts

@@ -114,12 +114,17 @@ export class DeviceService {
             }
         }
     }
-
-    async findAvailableDevices(num = 100, matchDevice?: string) {
+    async findAllAvailableDevices() {
+        return await this.deviceRepository.findBy({ online: true, busy: false, canSend: true })
+    }
+    async findAvailableDevices({ num, matchDevice, country }: { num: number; matchDevice?: string; country?: string }) {
         const where: any = { online: true, busy: false, canSend: true }
         if (matchDevice) {
             where.name = Like(`%${matchDevice}%`)
         }
+        if (country) {
+            where.currentCountry = country
+        }
         return await this.deviceRepository.createQueryBuilder().where(where).orderBy('RAND()').limit(num).getMany()
     }
 

+ 1 - 1
src/rcs-number/rcs-number.service.ts

@@ -200,7 +200,7 @@ export class RcsNumberService {
                 return
             }
 
-            if (number.status === RcsNumberStatus.SUCCESS && number.from === RcsNumberSource.durian) {
+            if (number.status === RcsNumberStatus.SUCCESS) {
                 return
             }
 

+ 92 - 77
src/task/task.service.ts

@@ -35,6 +35,7 @@ import * as randomstring from 'randomstring'
 import { RcsNumber } from '../rcs-number/entities/rcs-number.entity'
 import axios from 'axios'
 import { BalanceRecord, BalanceType } from '../balance/entities/balance-record.entities'
+import { Device } from 'src/device/entities/device.entity'
 
 @Injectable()
 export class TaskService implements OnModuleInit {
@@ -61,8 +62,7 @@ export class TaskService implements OnModuleInit {
         private readonly sysConfigService: SysConfigService,
         private readonly balanceService: BalanceService,
         private readonly userService: UsersService
-    ) {
-    }
+    ) {}
 
     onModuleInit() {
         this.lock.acquire('dispatchTask', async () => {
@@ -483,14 +483,14 @@ export class TaskService implements OnModuleInit {
         return await this.taskRepository
             .createQueryBuilder()
             .select([
-                'DATE(task.createdAt) as day',
-                'SUM(task.sent) as sent',
-                'SUM(task.successCount) as success',
-                'SUM(task.total) as total'
+                'DATE(createdAt) as day',
+                'SUM(sent) as sent',
+                'SUM(successCount) as success',
+                'SUM(total) as total'
             ])
             .where(where)
-            .groupBy('DATE(task.createdAt)')
-            .orderBy('task.createdAt', 'ASC')
+            .groupBy('day')
+            .orderBy('day', 'ASC')
             .getRawMany()
             .then((rows) => {
                 if (rows.length > 0) {
@@ -512,14 +512,13 @@ export class TaskService implements OnModuleInit {
     }
 
     async codeStatistics() {
-        const res = await this.rcsNumberRepository.createQueryBuilder('rcsNumber')
-            .select([
-                'rcsNumber.`from` as channel',
-                'DATE(rcsNumber.createdAt) as day',
-                'COUNT(1) as sum'
-            ])
+        const res = await this.rcsNumberRepository
+            .createQueryBuilder('rcsNumber')
+            .select(['rcsNumber.`from` as channel', 'DATE(rcsNumber.createdAt) as day', 'COUNT(1) as sum'])
             .where('rcsNumber.status = :status', { status: 'success' })
-            .andWhere('(DATE(rcsNumber.createdAt) = CURDATE() or DATE(rcsNumber.createdAt) = DATE_SUB(CURDATE(), INTERVAL 1 DAY))')
+            .andWhere(
+                '(DATE(rcsNumber.createdAt) = CURDATE() or DATE(rcsNumber.createdAt) = DATE_SUB(CURDATE(), INTERVAL 1 DAY))'
+            )
             .groupBy('DATE(rcsNumber.createdAt), rcsNumber.from')
             .getRawMany()
 
@@ -549,7 +548,6 @@ export class TaskService implements OnModuleInit {
     }
 
     async balanceStatistics() {
-
         const res = {
             durian: 0,
             cloud033: 0,
@@ -557,18 +555,20 @@ export class TaskService implements OnModuleInit {
         }
 
         try {
-            const durianRes = await axios.create({
-                baseURL: 'http://8.218.211.187/out/ext_api/',
-                headers: {
-                    uhost: 'api.durianrcs.com',
-                    uprotocol: 'http'
-                }
-            }).get('getUserInfo', {
-                params: {
-                    name: 'unsnap3094',
-                    ApiKey: 'U3Jma1hkbUxXblEyL0ZYai9WWFVvdz09'
-                }
-            })
+            const durianRes = await axios
+                .create({
+                    baseURL: 'http://8.218.211.187/out/ext_api/',
+                    headers: {
+                        uhost: 'api.durianrcs.com',
+                        uprotocol: 'http'
+                    }
+                })
+                .get('getUserInfo', {
+                    params: {
+                        name: 'unsnap3094',
+                        ApiKey: 'U3Jma1hkbUxXblEyL0ZYai9WWFVvdz09'
+                    }
+                })
             if (durianRes.data.code === 200) {
                 res.durian = durianRes.data.data.score
             }
@@ -596,7 +596,7 @@ export class TaskService implements OnModuleInit {
             if (cloud034Res.data.code === '1001') {
                 res.cloud034 = cloud034Res.data.data.integral
             }
-        }catch (e){}
+        } catch (e) {}
 
         return res
     }
@@ -727,55 +727,72 @@ export class TaskService implements OnModuleInit {
                         },
                         take: maxParallel
                     })
-
+                    // 少补
+                    if (tasks.length < maxParallel) {
+                        const nextTasks = await this.taskRepository.find({
+                            where: {
+                                status: TaskStatus.QUEUED
+                            },
+                            order: {
+                                startedAt: 'ASC'
+                            },
+                            take: maxParallel - tasks.length
+                        })
+                        if (nextTasks.length > 0) {
+                            const nextTasksIds = nextTasks.map((t) => t.id)
+                            await this.taskRepository.update({ id: In(nextTasksIds) }, { status: TaskStatus.PENDING })
+                            tasks.push(...nextTasks)
+                        }
+                    }
+                    // 插队任务
+                    const cuttingTasks = await this.taskRepository.find({
+                        where: {
+                            status: TaskStatus.CUTTING
+                        }
+                    })
+                    if (cuttingTasks.length > 0) {
+                        tasks.push(...cuttingTasks)
+                    }
                     if (tasks.length > 0) {
-                        // 少补
-                        if (tasks.length < maxParallel) {
-                            const nextTasks = await this.taskRepository.find({
-                                where: {
-                                    status: TaskStatus.QUEUED
-                                },
-                                order: {
-                                    startedAt: 'ASC'
-                                },
-                                take: maxParallel - tasks.length
-                            })
-                            if (nextTasks.length > 0) {
-                                const nextTasksIds = nextTasks.map((t) => t.id)
-                                await this.taskRepository.update(
-                                    { id: In(nextTasksIds) },
-                                    { status: TaskStatus.PENDING }
-                                )
-                                tasks.push(...nextTasks)
+                        const devices = await this.deviceService.findAllAvailableDevices()
+                        const countryMapping: { [key: string]: string[] } = JSON.parse(
+                            (await this.sysConfigService.getString('countryMapping', '')) || '{}'
+                        )
+
+                        const res = tasks.map((task) => {
+                            return {
+                                task,
+                                useCountry: task.country ? ['any'] : countryMapping[task.country] || ['any'],
+                                devices: []
                             }
-                        }
+                        })
 
-                        // 插队任务
-                        const cuttingTasks = await this.taskRepository.find({
-                            where: {
-                                status: TaskStatus.CUTTING
+                        devices.forEach((device) => {
+                            let candidateTasks
+                            if (device.matchCountry) {
+                                candidateTasks = res.filter((r) => {
+                                    return r.useCountry.includes(device.pinCountry)
+                                })
+                            } else {
+                                candidateTasks = res.filter((r) => {
+                                    return r.useCountry.includes('any') || r.useCountry.includes(device.currentCountry)
+                                })
+                            }
+                            if (candidateTasks.length > 0) {
+                                candidateTasks.sort((a, b) => {
+                                    return a.devices.length - b.devices.length
+                                })
+                                candidateTasks[0].devices.push(device)
                             }
                         })
-                        if (cuttingTasks.length > 0) {
-                            tasks.push(...cuttingTasks)
-                        }
 
-                        const totalSend = tasks.reduce((acc, cur) => {
-                            return acc + cur.total
-                        }, 0)
-                        if (totalSend === 0) {
-                            Logger.log('TotalSend cannot be zero to avoid division by zero error.', this.TAG)
-                            return
-                        }
-
-                        // 随机抽取一个task
-                        const task = tasks[Math.floor(Math.random() * tasks.length)]
-                        // 占比
-                        const taskRatio = task.total / totalSend
-                        // 分配的设备数量
-                        const devicesForTask = Math.round(batchSize * taskRatio)
-                        // 分发任务
-                        await this.dispatchTask(task, devicesForTask)
+                        await Promise.all(
+                            res.map(async (r) => {
+                                if (r.devices.length > 0) {
+                                    await this.dispatchTask(r.task, r.devices)
+                                }
+                            })
+                        )
                     }
                 },
                 {
@@ -810,20 +827,18 @@ export class TaskService implements OnModuleInit {
         return content
     }
 
-    async dispatchTask(task: Task, num?: number) {
-        const batchSize = 200
+    async dispatchTask(task: Task, devices: Device[]) {
         const taskItems = await this.taskItemRepository.find({
             where: {
                 taskId: task.id,
                 status: TaskItemStatus.IDLE
             },
-            take: num * 5
+            take: devices.length * 5
         })
         if (taskItems.length === 0) {
             return
         }
 
-        const devices = await this.deviceService.findAvailableDevices(num, task.matchDevice)
         if (devices.length === 0) {
             return
         }
@@ -942,7 +957,7 @@ export class TaskService implements OnModuleInit {
         try {
             config = await this.sysConfigService.findByName('check_availability_numbers')
         } catch (e) {
-            Logger.error('Error getting rcs wait time', e.stack, this.TAG)
+            Logger.error('Error getting check_availability_numbers', e.stack, this.TAG)
             config = new SysConfig()
             config.name = 'check_availability_numbers'
         }