فهرست منبع

优化短信任务调度器的日志信息,调整状态标识,修复报告获取逻辑

wuyi 4 روز پیش
والد
کامیت
470487ef0a
4فایلهای تغییر یافته به همراه160 افزوده شده و 101 حذف شده
  1. 1 1
      src/schedulers/base.scheduler.ts
  2. 40 52
      src/schedulers/sms-task.scheduler.ts
  3. 1 1
      src/services/sms-task.service.ts
  4. 118 47
      src/services/sms/xins.service.ts

+ 1 - 1
src/schedulers/base.scheduler.ts

@@ -24,7 +24,7 @@ export abstract class BaseScheduler {
       return
     }
 
-    this.app.log.info(`启动 ${this.constructor.name} 定时调度器,间隔: ${this.interval}ms`)
+    this.app.log.info(`${this.constructor.name} 启动成功,轮询间隔: ${this.interval}ms`)
     this.scheduleInterval = setInterval(() => {
       this.execute()
     }, this.interval)

+ 40 - 52
src/schedulers/sms-task.scheduler.ts

@@ -5,7 +5,6 @@ import { SmsTask } from '../entities/sms-task.entity'
 import { TaskItemStatus, TaskStatus } from '../enum/task.enum'
 import { SmsTaskItem } from '../entities/sms-task-item.entity'
 import { xinsService } from '../services/sms/xins.service'
-import { GetReportResult } from '../services/sms/sms.types'
 
 /**
  * SMS 任务定时调度器
@@ -35,13 +34,13 @@ export class SmsTaskScheduler extends BaseScheduler {
     })
 
     if (runningTasks.length > 0) {
-      this.app.log.warn(`发现 ${runningTasks.length} 个异常的 RUNNING 状态任务,正在恢复...`)
+      this.app.log.info(`发现 ${runningTasks.length} 个异常的 RUNNING 状态任务,正在恢复...`)
       for (const task of runningTasks) {
         await this.smsTaskRepository.update(task.id, {
           status: TaskStatus.PENDING
         })
-        this.app.log.info(`任务 ${task.id} 已恢复为 PENDING 状态`)
       }
+      this.app.log.info(`所有异常任务恢复完成`)
     }
 
     super.start()
@@ -74,15 +73,17 @@ export class SmsTaskScheduler extends BaseScheduler {
       })
 
       if (queuedTask) {
+        this.app.log.info(`发现队列任务 ${queuedTask.id},升级为 PENDING 状态`)
         await this.smsTaskRepository.update(queuedTask.id, {
           status: TaskStatus.PENDING
         })
-        this.app.log.info(`任务 ${queuedTask.id} 从队列中升级为 pending 状态`)
       }
       return
     }
 
-    this.app.log.info(`开始处理 pending 任务: ${pendingTask.id}`)
+    this.app.log.info(
+      `开始处理任务: ${pendingTask.id} (${pendingTask.name}), 总数: ${pendingTask.total}`
+    )
 
     // 更新为 RUNNING 状态,并记录开始时间
     await this.smsTaskRepository.update(pendingTask.id, {
@@ -106,7 +107,7 @@ export class SmsTaskScheduler extends BaseScheduler {
       })
 
       if (!currentTask || currentTask.status !== TaskStatus.RUNNING) {
-        this.app.log.warn(`任务 ${pendingTask.id} 被中断,当前状态: ${currentTask?.status || '不存在'}`)
+        this.app.log.warn(`⚠️ 任务 ${pendingTask.id} 被中断,当前状态: ${currentTask?.status || '不存在'}`)
         return
       }
 
@@ -129,6 +130,7 @@ export class SmsTaskScheduler extends BaseScheduler {
       // 串行处理批次数据
       for (let i = 0; i < smsTaskItems.length; i += batchSize) {
         const batch = smsTaskItems.slice(i, i + batchSize)
+
         const batchSuccess = await this.sendBatch(batch, pendingTask.message)
 
         totalBatches++
@@ -140,13 +142,14 @@ export class SmsTaskScheduler extends BaseScheduler {
         const progress = await this.getTaskProgress(pendingTask.id)
         this.app.log.info(
           `任务 ${pendingTask.id} 进度: ${progress.processed}/${progress.total} (${progress.percentage}%), ` +
-            `成功率: ${progress.successRate}%, 批次: ${totalBatches}, 失败批次: ${failedBatches}`
+            `成功率: ${progress.successRate}%, 累计批次: ${totalBatches}, 累计失败批次: ${failedBatches}`
         )
 
         // 实时检查失败率,如果超过 70% 且至少处理了 3 个批次,立即停止
         const currentFailureRate = totalBatches > 0 ? (failedBatches / totalBatches) * 100 : 0
+
         if (totalBatches >= 3 && currentFailureRate > 70) {
-          this.app.log.error(`任务 ${pendingTask.id} 失败率过高 (${currentFailureRate.toFixed(2)}%),立即停止任务`)
+          this.app.log.error(`🛑 任务 ${pendingTask.id} 失败率过高 (${currentFailureRate.toFixed(2)}%),立即停止任务`)
           await this.smsTaskRepository.update(pendingTask.id, {
             status: TaskStatus.ERROR
           })
@@ -168,10 +171,11 @@ export class SmsTaskScheduler extends BaseScheduler {
       status: finalStatus
     })
 
-    this.app.log.info(
-      `任务 ${pendingTask.id} 处理完成,状态: ${finalStatus}, ` +
-        `总批次: ${totalBatches}, 失败批次: ${failedBatches}, 失败率: ${failureRate.toFixed(2)}%`
-    )
+    if (finalStatus === TaskStatus.COMPLETED) {
+      this.app.log.info(`任务 ${pendingTask.id} 完成,总批次: ${totalBatches}, 失败批次: ${failedBatches}`)
+    } else {
+      this.app.log.error(`❌ 任务 ${pendingTask.id} 失败率过高 (${failureRate.toFixed(2)}%),标记为错误状态`)
+    }
   }
 
   /**
@@ -181,7 +185,7 @@ export class SmsTaskScheduler extends BaseScheduler {
   private async sendBatch(batch: SmsTaskItem[], message: string): Promise<boolean> {
     // 边界检查
     if (!batch || batch.length === 0) {
-      this.app.log.warn('批次为空,跳过处理')
+      this.app.log.warn('⚠️ 批次为空,跳过处理')
       return true
     }
 
@@ -190,46 +194,27 @@ export class SmsTaskScheduler extends BaseScheduler {
 
       const sendResult = await this.xins.sendSms(phoneNumbers, message)
 
-      // 获取报告,重试 10 次,每次间隔 3s
-      let reportResult: GetReportResult | null = null
-      const maxRetries = 10
-      const retryInterval = 3000
-
-      for (let attempt = 1; attempt <= maxRetries; attempt++) {
-        const currentReport = await this.xins.getReport({ msgid: sendResult.msgid })
-
-        const hasResult =
-          currentReport.phoneStatusList.length > 0 &&
-          batch.some(item => currentReport.phoneStatusList.some(status => status.number === item.target))
-
-        if (hasResult) {
-          reportResult = currentReport
-          this.app.log.info(`批次报告获取成功 (尝试 ${attempt}/${maxRetries})`)
-          break
-        }
-
-        if (attempt < maxRetries) {
-          this.app.log.info(`批次报告暂无结果,等待 ${retryInterval / 1000} 秒后重试 (尝试 ${attempt}/${maxRetries})`)
-          await new Promise(resolve => setTimeout(resolve, retryInterval))
-        } else {
-          this.app.log.warn(`批次报告获取失败:${maxRetries} 次尝试后仍无结果`)
-          reportResult = null
-        }
-      }
+      // 获取报告
+      const reportResult = await this.xins.getReport({ msgid: sendResult.msgid })
 
       // 更新状态
-      const phoneStatusMap = reportResult
-        ? new Map(
-            reportResult.phoneStatusList.map(status => [
-              status.number,
-              status.status === 'success'
-                ? TaskItemStatus.SUCCESS
-                : status.status === 'waiting'
-                ? TaskItemStatus.WAITING
-                : TaskItemStatus.FAILED
-            ])
-          )
-        : null
+      const phoneStatusMap =
+        reportResult.phoneStatusList.length > 0
+          ? new Map(
+              reportResult.phoneStatusList.map(status => [
+                status.number,
+                status.status === 'success'
+                  ? TaskItemStatus.SUCCESS
+                  : status.status === 'waiting'
+                  ? TaskItemStatus.WAITING
+                  : TaskItemStatus.FAILED
+              ])
+            )
+          : null
+
+      if (!phoneStatusMap) {
+        this.app.log.warn(`⚠️ 未获取到任何报告,所有项将标记为 FAILED`)
+      }
 
       const now = new Date()
       let processed = 0
@@ -243,6 +228,7 @@ export class SmsTaskScheduler extends BaseScheduler {
           successed++
         }
       }
+
       await this.smsTaskItemRepository.save(batch)
 
       // 更新任务的发送数和成功数
@@ -254,9 +240,10 @@ export class SmsTaskScheduler extends BaseScheduler {
 
       return true
     } catch (error) {
-      this.app.log.error(`发送短信失败: ${error}`)
+      this.app.log.error(`发送短信失败: ${error}`)
 
       const batchIds = batch.map(i => i.id)
+
       const now = new Date()
       await this.smsTaskItemRepository.update(
         { id: In(batchIds) },
@@ -289,6 +276,7 @@ export class SmsTaskScheduler extends BaseScheduler {
     })
 
     if (!task) {
+      this.app.log.warn(`⚠️ 任务 ${taskId} 不存在,返回空进度`)
       return {
         processed: 0,
         total: 0,

+ 1 - 1
src/services/sms-task.service.ts

@@ -49,7 +49,7 @@ export class SmsTaskService {
     const values = lines.map(line => ({
       taskId: data.taskId,
       target: line.trim(),
-      status: TaskItemStatus.PENDING as TaskItemStatus
+      status: TaskItemStatus.IDLE
     }))
 
     await this.app.dataSource.transaction(async manager => {

+ 118 - 47
src/services/sms/xins.service.ts

@@ -14,7 +14,7 @@ const MKT_PASSWORD = 'RB7EAo3d'
 const OTP_USER_NAME = 'JIAXINOTP'
 const OTP_PASSWORD = 'OgXb8X4q'
 
-const DEFAULT_CALLER = 10123456789
+const DEFAULT_CALLER = 16787995662
 
 export class xinsService extends GetSmsService {
   private xmlParser: XMLParser
@@ -63,13 +63,12 @@ export class xinsService extends GetSmsService {
     const parsed = this.parseXml(xmlData)
 
     const status = parsed?.Message?.Head?.Status
-    if (status !== '0') {
+
+    if (status != '0') {
       const errorDesc = this.getAckErrorDescription(status)
       throw new Error(`发送短信失败 [Status:${status}]: ${errorDesc}`)
     }
 
-    this.app.log.info(`发送短信成功 : ${xmlData}`)
-
     const msgid = parsed?.Message?.Body?.MsgID
     if (!msgid) {
       throw new Error('MsgID is missing in response')
@@ -82,53 +81,49 @@ export class xinsService extends GetSmsService {
   }
 
   async getReport(params: any): Promise<GetReportResult> {
-    const response = await axiosInstance.get('5.dox', {
-      params: {
-        UserName: MKT_USER_NAME,
-        PassWord: MKT_PASSWORD
-      }
-    })
+    const msgid = params?.msgid
 
-    const xmlData = response.data
-    const parsed = this.parseXml(xmlData)
+    // 延迟 5s 后开始获取报告
+    await new Promise(resolve => setTimeout(resolve, 5000))
 
-    const status = parsed?.Message?.Head?.Status
-    if (status !== '0') {
-      const errorDesc = this.getAckErrorDescription(status)
-      throw new Error(`获取报告失败 [Status:${status}]: ${errorDesc}`)
-    }
+    // 累积所有报告结果,使用 Map 去重(号码作为 key)
+    const accumulatedStatusMap = new Map<string, PhoneStatus>()
+    let emptyResultCount = 0 // 连续空结果计数
+    const maxEmptyResults = 3 // 连续 3 次空结果则退出
+    const retryInterval = 5000 // 每次间隔 5 秒
+    let attempt = 0
 
-    this.app.log.info('获取报告成功')
+    while (emptyResultCount < maxEmptyResults) {
+      attempt++
 
-    // Report
-    const body = parsed?.Message?.Body
-    let reports: any[] = []
+      const currentReport = await this.getReportOnce(params)
 
-    if (body?.Report) {
-      if (Array.isArray(body.Report)) {
-        reports = body.Report
+      if (currentReport.phoneStatusList.length > 0) {
+        // 有结果,重置空结果计数
+        emptyResultCount = 0
+
+        // 累积结果到 Map 中(新结果会覆盖旧结果)
+        for (const status of currentReport.phoneStatusList) {
+          const existingStatus = accumulatedStatusMap.get(status.number)
+          if (!existingStatus || existingStatus.status !== status.status) {
+            accumulatedStatusMap.set(status.number, status)
+          }
+        }
       } else {
-        reports = [body.Report]
+        // 无结果,空结果计数 +1
+        emptyResultCount++
       }
-    }
 
-    // msgid过滤
-    if (params?.msgid) {
-      reports = reports.filter(report => report.MsgID === params.msgid)
+      // 如果还未达到退出条件,等待后继续
+      if (emptyResultCount < maxEmptyResults) {
+        await new Promise(resolve => setTimeout(resolve, retryInterval))
+      }
     }
 
-    const phoneStatusList: PhoneStatus[] = reports.map(report => {
-      // 000 表示成功
-      const state = report.State || ''
-      const error = report.Error || ''
-
-      const status = this.parseReportStatus(state, error)
+    this.app.log.info(`报告获取完成,查询 ${attempt} 次,获得 ${accumulatedStatusMap.size} 条状态记录`)
 
-      return {
-        number: report.Callee,
-        status
-      }
-    })
+    // 将累积的 Map 转换为 PhoneStatus 数组返回
+    const phoneStatusList = Array.from(accumulatedStatusMap.values())
 
     return {
       phoneStatusList
@@ -136,17 +131,23 @@ export class xinsService extends GetSmsService {
   }
 
   private parseReportStatus(state: string, error: string): 'success' | 'waiting' | 'fail' {
-    if (error === '000' && state === 'DELIVRD') {
+    const errorStr = String(error || '').trim()
+    const stateStr = String(state || '').trim()
+
+    // 成功条件:error 为 '000' 且 state 为 'DELIVRD'
+    if (errorStr === '000' && stateStr === 'DELIVRD') {
       return 'success'
     }
 
-    if (error !== '000' && error !== '') {
+    // 失败条件:error 不为空且不为 '000'
+    if (errorStr !== '' && errorStr !== '000') {
       return 'fail'
     }
 
-    if (state) {
-      if (state.includes(':')) {
-        const [statusType] = state.split(':')
+    // 根据 state 判断
+    if (stateStr) {
+      if (stateStr.includes(':')) {
+        const [statusType] = stateStr.split(':')
         switch (statusType) {
           case 'REJECTD':
           case 'UNDELIV':
@@ -157,11 +158,16 @@ export class xinsService extends GetSmsService {
             return 'waiting'
         }
       } else {
-        switch (state) {
+        switch (stateStr) {
           case 'DELIVRD':
-            return 'waiting'
+            return 'success'
           case 'UNKNOWN':
             return 'waiting'
+          case 'REJECTD':
+          case 'UNDELIV':
+          case 'DELETED':
+          case 'EXPIRED':
+            return 'fail'
           default:
             return 'waiting'
         }
@@ -171,6 +177,71 @@ export class xinsService extends GetSmsService {
     return 'waiting'
   }
 
+  private async getReportOnce(params: any): Promise<GetReportResult> {
+    const response = await axiosInstance.get('5.dox', {
+      params: {
+        UserName: MKT_USER_NAME,
+        PassWord: MKT_PASSWORD
+      }
+    })
+
+    const xmlData = response.data
+    const parsed = this.parseXml(xmlData)
+
+    const status = parsed?.Message?.Head?.Status
+
+    if (status != '0') {
+      const errorDesc = this.getAckErrorDescription(status)
+      throw new Error(`获取报告失败 [Status:${status}]: ${errorDesc}`)
+    }
+
+    // Report
+    const body = parsed?.Message?.Body
+    let reports: any[] = []
+
+    if (body?.Report) {
+      if (Array.isArray(body.Report)) {
+        reports = body.Report
+      } else {
+        reports = [body.Report]
+      }
+    }
+
+    // msgid过滤
+    if (params?.msgid) {
+      reports = reports.filter(report => report.MsgID === params.msgid)
+    }
+
+    // 多包去重
+    const phoneStatusMap = new Map<string, PhoneStatus>()
+
+    for (const report of reports) {
+      const state = report.State || ''
+      const error = report.Error || ''
+      const caller = String(report.Caller || '').trim()
+
+      if (!caller) continue
+
+      const status = this.parseReportStatus(state, error)
+
+      const existingStatus = phoneStatusMap.get(caller)
+      if (!existingStatus) {
+        phoneStatusMap.set(caller, { number: caller, status })
+      } else {
+        // 状态优先级:success > fail > waiting
+        if (status === 'success' || (status === 'fail' && existingStatus.status === 'waiting')) {
+          phoneStatusMap.set(caller, { number: caller, status })
+        }
+      }
+    }
+
+    const phoneStatusList = Array.from(phoneStatusMap.values())
+
+    return {
+      phoneStatusList
+    }
+  }
+
   private getAckErrorDescription(status: string): string {
     const errorMap: Record<string, string> = {
       '0': '成功',