xiongzhu 1 年間 前
コミット
8a0133eb12

+ 5 - 3
src/device/device.module.ts

@@ -3,12 +3,13 @@ import { DeviceService } from './device.service'
 import { DeviceController } from './device.controller'
 import { TypeOrmModule } from '@nestjs/typeorm'
 import { Device } from './entities/device.entity'
-import { TaskModule } from 'src/task/task.module'
+import { TaskModule } from '../task/task.module'
 import { OperaterConfigModule } from '../operator_config/operator_config.module'
 import { EventsModule } from '../events/events.module'
 import { DeviceTask } from './entities/device-task.entity'
 import { DeviceTaskItem } from './entities/device-task-item.entity'
-import { SysConfigModule } from 'src/sys-config/sys-config.module'
+import { SysConfigModule } from '../sys-config/sys-config.module'
+import { CountryConfigModule } from '../country-config/country-config.module'
 
 @Module({
     imports: [
@@ -16,7 +17,8 @@ import { SysConfigModule } from 'src/sys-config/sys-config.module'
         forwardRef(() => TaskModule),
         OperaterConfigModule,
         forwardRef(() => EventsModule),
-        SysConfigModule
+        SysConfigModule,
+        CountryConfigModule
     ],
     providers: [DeviceService],
     controllers: [DeviceController],

+ 101 - 2
src/device/device.service.ts

@@ -12,7 +12,7 @@ import { PageRequest } from '../common/dto/page-request'
 import { Device } from './entities/device.entity'
 import { Pagination, paginate } from 'nestjs-typeorm-paginate'
 import { InjectRepository } from '@nestjs/typeorm'
-import { Equal, In, IsNull, Like, Not, Or, Repository } from 'typeorm'
+import { And, Equal, In, IsNull, Like, Not, Or, Repository } from 'typeorm'
 import { IpmoyuProvider } from '../proxy-provider/ipmoyu-provider'
 import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'
 import { TaskService } from '../task/task.service'
@@ -28,6 +28,9 @@ import { SendMessageDto } from './dtos/send-message.dto'
 import { SysConfigService } from 'src/sys-config/sys-config.service'
 import * as AsyncLock from 'async-lock'
 import { isAfter, isBefore, parse } from 'date-fns'
+import { Task } from 'src/task/entities/task.entity'
+import { CountryConfigService } from '../country-config/country-config.service'
+import { DestConfig } from 'src/country-config/entities/dest-config.entity'
 
 @Injectable()
 export class DeviceService implements OnModuleInit {
@@ -45,7 +48,8 @@ export class DeviceService implements OnModuleInit {
         private deviceTaskRepository: Repository<DeviceTask>,
         @InjectRepository(DeviceTaskItem)
         private deviceTaskItemRepository: Repository<DeviceTaskItem>,
-        private sysConfigService: SysConfigService
+        private sysConfigService: SysConfigService,
+        private readonly countryConfigService: CountryConfigService
     ) {}
 
     async onModuleInit() {
@@ -178,6 +182,12 @@ export class DeviceService implements OnModuleInit {
         return await this.deviceRepository.createQueryBuilder().where(where).orderBy('RAND()').limit(num).getMany()
     }
 
+    async findByTaskId(taskId: number) {
+        return await this.deviceRepository.findBy({
+            pinTask: taskId
+        })
+    }
+
     async setBusy(id: string, busy: boolean) {
         const device = await this.deviceRepository.findOneBy({ id })
         if (device) {
@@ -500,4 +510,93 @@ export class DeviceService implements OnModuleInit {
             }
         )
     }
+
+    async removePinTask(deviceId: string) {
+        await this.deviceRepository.update(deviceId, { pinTask: 0 })
+    }
+
+    matchesCountry(device: Device, useCountry: string[], exclude: string[]) {
+        if (device.matchCountry) {
+            if (!device.pinCountry) return false
+            return useCountry.includes(device.pinCountry)
+        } else {
+            return (
+                (useCountry.includes(device.currentCountry.toUpperCase()) || useCountry.includes('any')) &&
+                !exclude.includes(device.currentCountry.toUpperCase())
+            )
+        }
+    }
+
+    @Interval(3 * 1000)
+    async schedulePinTask() {
+        await this.deviceRepository.update({ online: false, pinTask: Not(0) }, { pinTask: 0 })
+        await this.deviceRepository.update({ canSend: false, pinTask: Not(0) }, { pinTask: 0 })
+        const pendingTasks = (await this.taskService.findPendingTasks()).sort(() => Math.random() - 0.5)
+        if (pendingTasks.length === 0) {
+            await this.deviceRepository.update({ pinTask: Not(0) }, { pinTask: 0 })
+        } else {
+            await this.deviceRepository.update(
+                { pinTask: And(Not(0), Not(In(pendingTasks.map((i) => i.id)))) },
+                { pinTask: 0 }
+            )
+        }
+
+        let devices = (
+            await this.deviceRepository.findBy({
+                online: true,
+                canSend: true
+            })
+        ).sort(() => Math.random() - 0.5)
+        if (pendingTasks.length === 0) return
+        if (devices.length === 0) return
+        const countryMapping = await this.countryConfigService.getAllDestConfig(pendingTasks.map((t) => t.country))
+
+        const res = pendingTasks.map((task) => {
+            return {
+                task,
+                useCountry: countryMapping.find((c) => c.id === task.country)?.useCountry || ['any'],
+                exclude: countryMapping.find((c) => c.id === task.country)?.exclude || [],
+                devices: devices.filter((d) => d.pinTask === task.id),
+                maxDevices: Math.max(Math.ceil((task.total - task.sent) / 60), 5)
+            }
+        })
+
+        for (const device of devices) {
+            if (device.pinTask !== 0) {
+                const task = res.find((r) => r.task.id === device.pinTask)
+                if (task && this.matchesCountry(device, task.useCountry, task.exclude)) {
+                    continue
+                } else {
+                    Logger.log(`Unpin task ${device.pinTask} from ${device.name}`, 'PinTask')
+                    device.pinTask = 0
+                    await this.removePinTask(device.id)
+                }
+            }
+            let candidateTasks = res.filter((r) => Math.ceil((r.task.total - r.task.sent) / 5) > r.devices.length)
+            if (device.matchCountry && device.pinCountry) {
+                candidateTasks = candidateTasks.filter((r) => {
+                    return (
+                        r.useCountry.includes(device.pinCountry.toUpperCase()) &&
+                        !r.exclude.includes(device.pinCountry.toUpperCase())
+                    )
+                })
+            } else {
+                candidateTasks = candidateTasks.filter((r) => {
+                    return (
+                        (r.useCountry.includes('any') || r.useCountry.includes(device.currentCountry?.toUpperCase())) &&
+                        !r.exclude.includes(device.currentCountry?.toUpperCase())
+                    )
+                })
+            }
+            candidateTasks = candidateTasks.filter((r) => r.devices.length < r.maxDevices)
+            if (candidateTasks.length > 0) {
+                candidateTasks.sort((a, b) => {
+                    return a.devices.length - b.devices.length
+                })
+                candidateTasks[0].devices.push(device)
+                Logger.log(`Pin task ${candidateTasks[0].task.id} to ${device.name}`, 'PinTask')
+                await this.deviceRepository.update(device.id, { pinTask: candidateTasks[0].task.id })
+            }
+        }
+    }
 }

+ 3 - 0
src/device/entities/device.entity.ts

@@ -56,4 +56,7 @@ export class Device {
 
     @Column({ default: false })
     storing: Boolean
+
+    @Column({ nullable: false, default: 0 })
+    pinTask: number
 }

+ 1 - 3
src/rcs-number/rcs-number.service.ts

@@ -90,9 +90,6 @@ export class RcsNumberService {
         this.sms7979 = new sms7979('VmXjb6DYFpXyC9ZD6sOEQIdRUCMTcg', 40, RcsNumberSource.sms7979)
         this.sms797902 = new sms7979('zKZVl49nNGgtRWe6JI4y9LmQCQPPm6', 40, RcsNumberSource.sms797902)
         this.redis = this.redisService.getOrThrow()
-        this.redis.get('hello').then((res) => {
-            console.log(res)
-        })
     }
 
     async findAll(req: PageRequest<RcsNumber>): Promise<Pagination<RcsNumber>> {
@@ -261,6 +258,7 @@ export class RcsNumberService {
 
         if (deviceId) {
             await this.deviceService.changeProfile(deviceId)
+            await this.deviceService.removePinTask(deviceId)
         }
 
         return number

+ 129 - 56
src/task/task.service.ts

@@ -1190,6 +1190,127 @@ export class TaskService implements OnModuleInit {
         return new Decimal(number).mul(parseInt(multiplier))
     }
 
+    // @Interval(2000)
+    // async scheduleTask() {
+    //     this.lock
+    //         .acquire(
+    //             'dispatchTask',
+    //             async () => {
+    //                 const maxParallel = await this.getConfig('max_parallel', 1)
+    //                 const batchSize = 200
+
+    //                 let tasks = await this.taskRepository.find({
+    //                     where: {
+    //                         status: TaskStatus.PENDING
+    //                     },
+    //                     order: {
+    //                         startedAt: 'ASC'
+    //                     },
+    //                     take: maxParallel
+    //                 })
+    //                 // 少补
+    //                 if (tasks.length < maxParallel) {
+    //                     const nextTasks = await this.taskRepository.find({
+    //                         where: {
+    //                             status: TaskStatus.QUEUED
+    //                         },
+    //                         order: {
+    //                             startedAt: 'ASC',
+    //                             id: 'ASC'
+    //                         }
+    //                     })
+
+    //                     if (nextTasks.length > 0) {
+    //                         const userIdMap = new Map()
+    //                         tasks.forEach((task) => {
+    //                             userIdMap.set(task.userId, (userIdMap.get(task.userId) || 0) + 1)
+    //                         })
+
+    //                         // nextTasks筛选,从排队任务中筛选出最多2个同用户下的任务
+    //                         let filteredTasks = []
+    //                         const userIds = {}
+    //                         const limit = maxParallel - tasks.length
+    //                         for (const task of nextTasks) {
+    //                             if (!userIds[task.userId]) {
+    //                                 userIds[task.userId] = 0
+    //                             }
+    //                             if ((userIdMap.get(task.userId) || 0) + userIds[task.userId] < 2) {
+    //                                 filteredTasks.push(task)
+    //                                 userIds[task.userId]++
+    //                             }
+    //                             if (filteredTasks.length >= limit) {
+    //                                 break
+    //                             }
+    //                         }
+
+    //                         const nextTasksIds = filteredTasks.map((t) => t.id)
+    //                         if (nextTasksIds.length === 0) {
+    //                             nextTasksIds.push(...nextTasks.map((t) => t.id).slice(0, limit))
+    //                         }
+    //                         await this.taskRepository.update({ id: In(nextTasksIds) }, { status: TaskStatus.PENDING })
+    //                         tasks.push(...filteredTasks)
+    //                     }
+    //                 }
+
+    //                 const pendingTasks = (await this.findPendingTasks()).sort(() => Math.random() - 0.5)
+    //                 if (pendingTasks.length === 0) return
+    //                 const devices = await this.deviceService.findAllAvailableDevices()
+    //                 if (devices.length === 0) return
+    //                 const countryMapping = await this.countryConfigService.getAllDestConfig(
+    //                     pendingTasks.map((t) => t.country)
+    //                 )
+
+    //                 const res = pendingTasks.map((task) => {
+    //                     return {
+    //                         task,
+    //                         useCountry: countryMapping.find((c) => c.id === task.country)?.useCountry || ['any'],
+    //                         exclude: countryMapping.find((c) => c.id === task.country)?.exclude || [],
+    //                         devices: []
+    //                     }
+    //                 })
+
+    //                 devices.forEach((device) => {
+    //                     let candidateTasks = res.filter(
+    //                         (r) => Math.ceil((r.task.total - r.task.sent) / 5) > r.devices.length
+    //                     )
+    //                     if (device.matchCountry && device.pinCountry) {
+    //                         candidateTasks = candidateTasks.filter((r) => {
+    //                             return (
+    //                                 r.useCountry.includes(device.pinCountry.toUpperCase()) &&
+    //                                 !r.exclude.includes(device.pinCountry.toUpperCase())
+    //                             )
+    //                         })
+    //                     } else {
+    //                         candidateTasks = candidateTasks.filter((r) => {
+    //                             return (
+    //                                 (r.useCountry.includes('any') ||
+    //                                     r.useCountry.includes(device.currentCountry?.toUpperCase())) &&
+    //                                 !r.exclude.includes(device.currentCountry?.toUpperCase())
+    //                             )
+    //                         })
+    //                     }
+    //                     if (candidateTasks.length > 0) {
+    //                         candidateTasks.sort((a, b) => {
+    //                             return a.devices.length - b.devices.length
+    //                         })
+    //                         candidateTasks[0].devices.push(device)
+    //                     }
+    //                 })
+    //                 for (let r of res) {
+    //                     if (r.devices.length > 0) {
+    //                         await this.dispatchTask(r.task, r.devices)
+    //                     }
+    //                 }
+    //             },
+    //             {
+    //                 timeout: 1
+    //             }
+    //         )
+    //         .catch((e) => {
+    //             if (e.message.includes('timed out')) return
+    //             Logger.error('Error dispatchTask', e.stack, this.TAG)
+    //         })
+    // }
     @Interval(2000)
     async scheduleTask() {
         this.lock
@@ -1197,7 +1318,6 @@ export class TaskService implements OnModuleInit {
                 'dispatchTask',
                 async () => {
                     const maxParallel = await this.getConfig('max_parallel', 1)
-                    const batchSize = 200
 
                     let tasks = await this.taskRepository.find({
                         where: {
@@ -1251,62 +1371,15 @@ export class TaskService implements OnModuleInit {
                             tasks.push(...filteredTasks)
                         }
                     }
-                    // 专线任务,插队任务
-                    const cuttingTasks = await this.taskRepository.find({
-                        where: {
-                            status: In([TaskStatus.CUTTING, TaskStatus.VIP])
-                        }
-                    })
-                    if (cuttingTasks.length > 0) {
-                        tasks.push(...cuttingTasks)
-                    }
-                    if (tasks.length === 0) return
-                    tasks = tasks.sort(() => Math.random() - 0.5)
-                    const devices = await this.deviceService.findAllAvailableDevices()
-                    if (devices.length === 0) return
-                    const countryMapping = await this.countryConfigService.getAllDestConfig(tasks.map((t) => t.country))
-
-                    const res = tasks.map((task) => {
-                        return {
-                            task,
-                            useCountry: countryMapping.find((c) => c.id === task.country)?.useCountry || ['any'],
-                            exclude: countryMapping.find((c) => c.id === task.country)?.exclude || [],
-                            devices: []
-                        }
-                    })
 
-                    devices.forEach((device) => {
-                        let candidateTasks = res.filter(
-                            (r) => Math.ceil((r.task.total - r.task.sent) / 5) > r.devices.length
-                        )
-                        if (device.matchCountry && device.pinCountry) {
-                            candidateTasks = candidateTasks.filter((r) => {
-                                return (
-                                    r.useCountry.includes(device.pinCountry.toUpperCase()) &&
-                                    !r.exclude.includes(device.pinCountry.toUpperCase())
-                                )
-                            })
-                        } else {
-                            candidateTasks = candidateTasks.filter((r) => {
-                                return (
-                                    (r.useCountry.includes('any') ||
-                                        r.useCountry.includes(device.currentCountry?.toUpperCase())) &&
-                                    !r.exclude.includes(device.currentCountry?.toUpperCase())
-                                )
-                            })
-                        }
-                        if (candidateTasks.length > 0) {
-                            candidateTasks.sort((a, b) => {
-                                return a.devices.length - b.devices.length
-                            })
-                            candidateTasks[0].devices.push(device)
-                        }
-                    })
-                    for (let r of res) {
-                        if (r.devices.length > 0) {
-                            await this.dispatchTask(r.task, r.devices)
-                        }
-                    }
+                    const pendingTasks = (await this.findPendingTasks()).sort(() => Math.random() - 0.5)
+                    await Promise.all(
+                        pendingTasks.map(async (task) => {
+                            const devices = await this.deviceService.findByTaskId(task.id)
+                            if (devices.length === 0) return
+                            await this.dispatchTask(task, devices)
+                        })
+                    )
                 },
                 {
                     timeout: 1