Procházet zdrojové kódy

refactor(rcs-number): 优化号码获取逻辑和缓存策略- 修改号码获取逻辑,增加对存储号码渠道的判断
- 在缓存号码时添加过期时间设置
- 优化清理过期号码的逻辑,增加任务执行判断

wuyi před 1 rokem
rodič
revize
949bc2be33
2 změnil soubory, kde provedl 48 přidání a 26 odebrání
  1. 21 15
      src/rcs-number/rcs-number.service.ts
  2. 27 11
      src/task/task.service.ts

+ 21 - 15
src/rcs-number/rcs-number.service.ts

@@ -205,10 +205,10 @@ export class RcsNumberService {
                 const newVar = await this.redis.rpop(operatorConfig.country)
                 res = newVar ? JSON.parse(newVar) : null
             }
-            if (res === null) {
+            if (res === null || (store && storeNumberChannels.includes(res.source.toString()))) {
                 res = await numberService.getNumber(operatorConfig.country)
             } else {
-                Logger.log(`${operatorConfig.country} - ${res.source}\tGet the number from the cache`, 'GetNumber')
+                Logger.log(`${operatorConfig.country} - ${res.source}\t Get the number from the cache`, 'GetNumber')
             }
         } catch (e) {
             Logger.error(`${operatorConfig.country} - ${numberService.source}\t ${e.message}`, 'GetNumber')
@@ -429,6 +429,7 @@ export class RcsNumberService {
                         res.forEach(data => {
                             pipe.lpush(config.countryCode, JSON.stringify(data))
                         })
+                        pipe.expire(config.countryCode, 300)
                         await pipe.exec()
                         sum += res.length
                         Logger.log(`${config.channelSource}: ${res.length} numbers cached for ${config.countryCode}`, 'CacheNumber')
@@ -442,21 +443,26 @@ export class RcsNumberService {
     }
 
     async cleanUpExpiredNumbers(listName: string) {
-        const allKeys = await this.redis.keys('*')
-
-        for (const key of allKeys) {
-            const type = await this.redis.type(key)
-            if (type === 'list') {
-                const items = await this.redis.lrange(key, 0, -1)
-                const pipe = this.redis.pipeline()
-                for (const item of items) {
-                    const parsedItem = JSON.parse(item)
-                    if (parsedItem.expireAt <= Date.now()) {
-                        pipe.lrem(key, 1, item)
+        // 判断是否有任务执行
+        const number = await this.taskService.checkPendingTaskNum()
+        Logger.log(`任务执行数量: ${number}`, 'CleanUpExpiredNumbers')
+        if (number === 0) {
+            const allKeys = await this.redis.keys('*')
+
+            for (const key of allKeys) {
+                const type = await this.redis.type(key)
+                if (type === 'list') {
+                    const items = await this.redis.lrange(key, 0, -1)
+                    const pipe = this.redis.pipeline()
+                    for (const item of items) {
+                        const parsedItem = JSON.parse(item)
+                        if (parsedItem.expireAt <= Date.now()) {
+                            pipe.lrem(key, 1, item)
+                        }
                     }
+                    await pipe.exec()
+                    Logger.log(`Cleaned up expired numbers from ${key}`, 'CleanUpExpiredNumbers')
                 }
-                await pipe.exec()
-                Logger.log(`Cleaned up expired numbers from ${key}`, 'CleanUpExpiredNumbers')
             }
         }
     }

+ 27 - 11
src/task/task.service.ts

@@ -67,7 +67,8 @@ export class TaskService implements OnModuleInit {
         private readonly userService: UsersService,
         private readonly operatorConfigService: OperatorConfigService,
         private readonly countryConfigService: CountryConfigService
-    ) {}
+    ) {
+    }
 
     onModuleInit() {
         this.lock.acquire('dispatchTask', async () => {
@@ -699,7 +700,8 @@ export class TaskService implements OnModuleInit {
                     if (durianRes.data.code === 200) {
                         res.durian = durianRes.data.data.score
                     }
-                } catch (e) {}
+                } catch (e) {
+                }
             })(),
             (async () => {
                 try {
@@ -720,7 +722,8 @@ export class TaskService implements OnModuleInit {
                     if (cowboyRes.data.code === 200) {
                         res.cowboy = cowboyRes.data.data.score
                     }
-                } catch (e) {}
+                } catch (e) {
+                }
             })(),
             (async () => {
                 try {
@@ -734,7 +737,8 @@ export class TaskService implements OnModuleInit {
                     if (parts[0] === '0') {
                         res.xyz = parts[1] < 0 ? 0 : parts[1]
                     }
-                } catch (e) {}
+                } catch (e) {
+                }
             })(),
             (async () => {
                 try {
@@ -747,7 +751,8 @@ export class TaskService implements OnModuleInit {
                     if (cloud033Res.data.code === '1001') {
                         res.cloud033 = cloud033Res.data.data.integral
                     }
-                } catch (e) {}
+                } catch (e) {
+                }
             })(),
             (async () => {
                 try {
@@ -760,7 +765,8 @@ export class TaskService implements OnModuleInit {
                     if (cloud034Res.data.code === '1001') {
                         res.cloud034 = cloud034Res.data.data.integral
                     }
-                } catch (e) {}
+                } catch (e) {
+                }
             })(),
             (async () => {
                 try {
@@ -773,7 +779,8 @@ export class TaskService implements OnModuleInit {
                     if (cloud037Res.data.code === '1001') {
                         res.cloud037 = cloud037Res.data.data.integral
                     }
-                } catch (e) {}
+                } catch (e) {
+                }
             })(),
             (async () => {
                 try {
@@ -786,7 +793,8 @@ export class TaskService implements OnModuleInit {
                     if (cloud041Res.data.code === '1001') {
                         res.cloud041 = cloud041Res.data.data.integral
                     }
-                } catch (e) {}
+                } catch (e) {
+                }
             })()
         ])
         return res
@@ -798,7 +806,7 @@ export class TaskService implements OnModuleInit {
 
         return await this.taskItemRepository
             .createQueryBuilder()
-            .select(['COUNT(*) AS sent', "DATE_FORMAT(sendAt, '%Y-%m-%d %H:00:00') AS hour"])
+            .select(['COUNT(*) AS sent', 'DATE_FORMAT(sendAt, \'%Y-%m-%d %H:00:00\') AS hour'])
             .where('sendAt BETWEEN :start AND :end', { start: twelveHoursAgo, end: new Date() })
             .groupBy('hour')
             .orderBy('hour', 'DESC')
@@ -823,7 +831,8 @@ export class TaskService implements OnModuleInit {
                         .andWhere('task.startedAt < CURDATE()')
                         .getRawOne()
                     res.orderCountYesterday = yesterdayOrderCount.sum
-                } catch (e) {}
+                } catch (e) {
+                }
             })(),
             (async () => {
                 try {
@@ -835,7 +844,8 @@ export class TaskService implements OnModuleInit {
                         .andWhere('task.startedAt < CURDATE() + INTERVAL 1 DAY')
                         .getRawOne()
                     res.orderCountToday = todayOrderCount.sum
-                } catch (e) {}
+                } catch (e) {
+                }
             })()
         ])
 
@@ -1260,6 +1270,12 @@ export class TaskService implements OnModuleInit {
         })
     }
 
+    async checkPendingTaskNum() {
+        return await this.taskRepository.countBy({
+            status: TaskStatus.PENDING
+        })
+    }
+
     @Interval(10000)
     async fixDeadTask() {
         const tasks = await this.taskRepository.findBy({