Forráskód Böngészése

短信发送定时任务

wuyi 5 napja
szülő
commit
15c71dd948

+ 17 - 3
src/app.ts

@@ -12,12 +12,14 @@ import userRoutes from './routes/user.routes'
 import fileRoutes from './routes/file.routes'
 import sysConfigRoutes from './routes/sys-config.routes'
 import smsTaskRoutes from './routes/sms-task.routes'
+import { SmsTaskScheduler } from './schedulers/sms-task.scheduler'
+import { BaseScheduler } from './schedulers/base.scheduler'
 
 const options: FastifyEnvOptions = {
   schema: schema,
   dotenv: {
-    debug: false,
-  },
+    debug: false
+  }
 }
 
 export const createApp = async () => {
@@ -39,7 +41,7 @@ export const createApp = async () => {
 
   app.register(cors, {
     origin: true,
-    methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'],
+    methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']
   })
 
   app.register(jwt, {
@@ -84,7 +86,19 @@ export const createApp = async () => {
   await dataSource.initialize()
   app.decorate('dataSource', dataSource)
 
+  // 定时任务
+  const schedulers: BaseScheduler[] = []
+
+  const smsTaskScheduler = new SmsTaskScheduler(app, 5000)
+  schedulers.push(smsTaskScheduler)
+  smsTaskScheduler.start()
+
+  app.decorate('schedulers', schedulers)
+
   app.addHook('onClose', async () => {
+    schedulers.forEach(scheduler => {
+      scheduler.stop()
+    })
     await dataSource.destroy()
     process.exit(0)
   })

+ 1 - 1
src/entities/sms-task-item.entity.ts

@@ -3,7 +3,7 @@ import { TaskItemStatus } from '../enum/task.enum'
 
 @Entity()
 @Index('idx_taskId_status_id', ['taskId', 'status', 'id'])
-@Index('idx_taskId_createdAt', ['taskId', 'createdAt'])
+@Index('idx_taskId_status_createdAt', ['taskId', 'status', 'createdAt'])
 export class SmsTaskItem {
   @PrimaryGeneratedColumn()
   id: number

+ 67 - 0
src/schedulers/base.scheduler.ts

@@ -0,0 +1,67 @@
+import { FastifyInstance } from 'fastify'
+import AsyncLock from 'async-lock'
+
+export abstract class BaseScheduler {
+  protected app: FastifyInstance
+  protected lock: AsyncLock
+  protected scheduleInterval: NodeJS.Timeout | null = null
+  protected interval: number
+  protected lockKey: string
+
+  constructor(app: FastifyInstance, interval: number = 5000) {
+    this.app = app
+    this.lock = new AsyncLock()
+    this.interval = interval
+    this.lockKey = `${this.constructor.name}-lock`
+  }
+
+  /**
+   * 启动定时任务调度器
+   */
+  start() {
+    if (this.scheduleInterval) {
+      this.app.log.warn(`${this.constructor.name} 定时任务已经启动,跳过重复启动`)
+      return
+    }
+
+    this.app.log.info(`启动 ${this.constructor.name} 定时调度器,间隔: ${this.interval}ms`)
+    this.scheduleInterval = setInterval(() => {
+      this.execute()
+    }, this.interval)
+  }
+
+  /**
+   * 停止定时任务调度器
+   */
+  stop() {
+    if (this.scheduleInterval) {
+      clearInterval(this.scheduleInterval)
+      this.scheduleInterval = null
+      this.app.log.info(`${this.constructor.name} 定时调度器已停止`)
+    }
+  }
+
+  /**
+   * 执行调度任务(带锁保护)
+   */
+  private execute() {
+    this.lock
+      .acquire(this.lockKey, async () => {
+        try {
+          await this.run()
+        } catch (error) {
+          this.app.log.error(
+            `${this.constructor.name} 调度任务时发生错误: ${error instanceof Error ? error.message : '未知错误'}`
+          )
+        }
+      })
+      .catch(err => {
+        this.app.log.error(`${this.constructor.name} 获取锁失败: ${err instanceof Error ? err.message : '未知错误'}`)
+      })
+  }
+
+  /**
+   * 具体调度逻辑
+   */
+  protected abstract run(): Promise<void>
+}

+ 312 - 0
src/schedulers/sms-task.scheduler.ts

@@ -0,0 +1,312 @@
+import { In, Repository } from 'typeorm'
+import { FastifyInstance } from 'fastify'
+import { BaseScheduler } from './base.scheduler'
+import { SmsTask } from '../entities/sms-task.entity'
+import { TaskItemStatus, TaskStatus } from '../enum/task.enum'
+import { SmsTaskItem } from '../entities/sms-task-item.entity'
+import { xinsService } from '../services/sms/xins.service'
+import { GetReportResult } from '../services/sms/sms.types'
+
+/**
+ * SMS 任务定时调度器
+ * 负责定时查询和处理 pending 状态的任务
+ */
+export class SmsTaskScheduler extends BaseScheduler {
+  private smsTaskRepository: Repository<SmsTask>
+  private smsTaskItemRepository: Repository<SmsTaskItem>
+  private xins: xinsService
+  constructor(app: FastifyInstance, interval: number = 5000) {
+    super(app, interval)
+    this.smsTaskRepository = app.dataSource.getRepository(SmsTask)
+    this.smsTaskItemRepository = app.dataSource.getRepository(SmsTaskItem)
+    this.xins = new xinsService(app)
+  }
+
+  /**
+   * 启动调度器时,恢复崩溃时的 RUNNING 任务
+   */
+  async start() {
+    // 恢复崩溃时的 RUNNING 任务为 PENDING 状态
+    const runningTasks = await this.smsTaskRepository.find({
+      where: {
+        status: TaskStatus.RUNNING,
+        delFlag: false
+      }
+    })
+
+    if (runningTasks.length > 0) {
+      this.app.log.warn(`发现 ${runningTasks.length} 个异常的 RUNNING 状态任务,正在恢复...`)
+      for (const task of runningTasks) {
+        await this.smsTaskRepository.update(task.id, {
+          status: TaskStatus.PENDING
+        })
+        this.app.log.info(`任务 ${task.id} 已恢复为 PENDING 状态`)
+      }
+    }
+
+    super.start()
+  }
+
+  /**
+   * 执行调度逻辑
+   */
+  protected async run(): Promise<void> {
+    const pendingTask = await this.smsTaskRepository.findOne({
+      where: {
+        status: TaskStatus.PENDING,
+        delFlag: false
+      },
+      order: {
+        startedAt: 'ASC'
+      }
+    })
+
+    // 少补
+    if (!pendingTask) {
+      const queuedTask = await this.smsTaskRepository.findOne({
+        where: {
+          status: TaskStatus.QUEUED,
+          delFlag: false
+        },
+        order: {
+          startedAt: 'ASC'
+        }
+      })
+
+      if (queuedTask) {
+        await this.smsTaskRepository.update(queuedTask.id, {
+          status: TaskStatus.PENDING
+        })
+        this.app.log.info(`任务 ${queuedTask.id} 从队列中升级为 pending 状态`)
+      }
+      return
+    }
+
+    this.app.log.info(`开始处理 pending 任务: ${pendingTask.id}`)
+
+    // 更新为 RUNNING 状态,并记录开始时间
+    await this.smsTaskRepository.update(pendingTask.id, {
+      status: TaskStatus.RUNNING,
+      startedAt: new Date()
+    })
+
+    // xins 最多支持 100 个号码
+    const batchSize = 100
+    // 每次从数据库查询的批次大小(建议为 batchSize 的倍数)
+    const queryBatchSize = 1000
+
+    let totalBatches = 0
+    let failedBatches = 0
+    let hasMore = true
+
+    while (hasMore) {
+      // 检查任务是否被中断(暂停、取消等)
+      const currentTask = await this.smsTaskRepository.findOne({
+        where: { id: pendingTask.id }
+      })
+
+      if (!currentTask || currentTask.status !== TaskStatus.RUNNING) {
+        this.app.log.warn(`任务 ${pendingTask.id} 被中断,当前状态: ${currentTask?.status || '不存在'}`)
+        return
+      }
+
+      const smsTaskItems = await this.smsTaskItemRepository.find({
+        where: {
+          taskId: pendingTask.id,
+          status: TaskItemStatus.IDLE
+        },
+        order: {
+          createdAt: 'ASC'
+        },
+        take: queryBatchSize
+      })
+
+      if (smsTaskItems.length === 0) {
+        hasMore = false
+        break
+      }
+
+      // 串行处理批次数据
+      for (let i = 0; i < smsTaskItems.length; i += batchSize) {
+        const batch = smsTaskItems.slice(i, i + batchSize)
+        const batchSuccess = await this.sendBatch(batch, pendingTask.message)
+
+        totalBatches++
+        if (!batchSuccess) {
+          failedBatches++
+        }
+
+        // 记录进度日志
+        const progress = await this.getTaskProgress(pendingTask.id)
+        this.app.log.info(
+          `任务 ${pendingTask.id} 进度: ${progress.processed}/${progress.total} (${progress.percentage}%), ` +
+            `成功率: ${progress.successRate}%, 批次: ${totalBatches}, 失败批次: ${failedBatches}`
+        )
+
+        // 实时检查失败率,如果超过 70% 且至少处理了 3 个批次,立即停止
+        const currentFailureRate = totalBatches > 0 ? (failedBatches / totalBatches) * 100 : 0
+        if (totalBatches >= 3 && currentFailureRate > 70) {
+          this.app.log.error(`任务 ${pendingTask.id} 失败率过高 (${currentFailureRate.toFixed(2)}%),立即停止任务`)
+          await this.smsTaskRepository.update(pendingTask.id, {
+            status: TaskStatus.ERROR
+          })
+          this.app.log.info(
+            `任务 ${pendingTask.id} 已标记为 ERROR,总批次: ${totalBatches}, 失败批次: ${failedBatches}`
+          )
+          return
+        }
+      }
+
+      hasMore = smsTaskItems.length === queryBatchSize
+    }
+
+    // 计算失败率,超过 70% 标记为 ERROR
+    const failureRate = totalBatches > 0 ? (failedBatches / totalBatches) * 100 : 0
+    const finalStatus = failureRate > 70 ? TaskStatus.ERROR : TaskStatus.COMPLETED
+
+    await this.smsTaskRepository.update(pendingTask.id, {
+      status: finalStatus
+    })
+
+    this.app.log.info(
+      `任务 ${pendingTask.id} 处理完成,状态: ${finalStatus}, ` +
+        `总批次: ${totalBatches}, 失败批次: ${failedBatches}, 失败率: ${failureRate.toFixed(2)}%`
+    )
+  }
+
+  /**
+   * 发送一个批次的短信
+   * @returns 是否成功
+   */
+  private async sendBatch(batch: SmsTaskItem[], message: string): Promise<boolean> {
+    // 边界检查
+    if (!batch || batch.length === 0) {
+      this.app.log.warn('批次为空,跳过处理')
+      return true
+    }
+
+    try {
+      const phoneNumbers = batch.map(item => item.target.replace('+', ''))
+
+      const sendResult = await this.xins.sendSms(phoneNumbers, message)
+
+      // 获取报告,重试 10 次,每次间隔 3s
+      let reportResult: GetReportResult | null = null
+      const maxRetries = 10
+      const retryInterval = 3000
+
+      for (let attempt = 1; attempt <= maxRetries; attempt++) {
+        const currentReport = await this.xins.getReport({ msgid: sendResult.msgid })
+
+        const hasResult =
+          currentReport.phoneStatusList.length > 0 &&
+          batch.some(item => currentReport.phoneStatusList.some(status => status.number === item.target))
+
+        if (hasResult) {
+          reportResult = currentReport
+          this.app.log.info(`批次报告获取成功 (尝试 ${attempt}/${maxRetries})`)
+          break
+        }
+
+        if (attempt < maxRetries) {
+          this.app.log.info(`批次报告暂无结果,等待 ${retryInterval / 1000} 秒后重试 (尝试 ${attempt}/${maxRetries})`)
+          await new Promise(resolve => setTimeout(resolve, retryInterval))
+        } else {
+          this.app.log.warn(`批次报告获取失败:${maxRetries} 次尝试后仍无结果`)
+          reportResult = null
+        }
+      }
+
+      // 更新状态
+      const phoneStatusMap = reportResult
+        ? new Map(
+            reportResult.phoneStatusList.map(status => [
+              status.number,
+              status.status === 'success'
+                ? TaskItemStatus.SUCCESS
+                : status.status === 'waiting'
+                ? TaskItemStatus.WAITING
+                : TaskItemStatus.FAILED
+            ])
+          )
+        : null
+
+      const now = new Date()
+      let processed = 0
+      let successed = 0
+
+      for (const smsTaskItem of batch) {
+        smsTaskItem.status = phoneStatusMap?.get(smsTaskItem.target) ?? TaskItemStatus.FAILED
+        smsTaskItem.operatingAt = now
+        processed++
+        if (smsTaskItem.status === TaskItemStatus.SUCCESS) {
+          successed++
+        }
+      }
+      await this.smsTaskItemRepository.save(batch)
+
+      // 更新任务的发送数和成功数
+      const taskId = batch[0].taskId
+      await this.smsTaskRepository.increment({ id: taskId }, 'processed', processed)
+      if (successed > 0) {
+        await this.smsTaskRepository.increment({ id: taskId }, 'successed', successed)
+      }
+
+      return true
+    } catch (error) {
+      this.app.log.error(`发送短信失败: ${error}`)
+
+      const batchIds = batch.map(i => i.id)
+      const now = new Date()
+      await this.smsTaskItemRepository.update(
+        { id: In(batchIds) },
+        {
+          status: TaskItemStatus.FAILED,
+          operatingAt: now
+        }
+      )
+
+      // 更新任务的发送数
+      const taskId = batch[0].taskId
+      await this.smsTaskRepository.increment({ id: taskId }, 'processed', batch.length)
+
+      return false
+    }
+  }
+
+  /**
+   * 获取任务进度信息
+   */
+  private async getTaskProgress(taskId: number): Promise<{
+    processed: number
+    total: number
+    successed: number
+    percentage: string
+    successRate: string
+  }> {
+    const task = await this.smsTaskRepository.findOne({
+      where: { id: taskId }
+    })
+
+    if (!task) {
+      return {
+        processed: 0,
+        total: 0,
+        successed: 0,
+        percentage: '0.00',
+        successRate: '0.00'
+      }
+    }
+
+    const percentage = task.total > 0 ? ((task.processed / task.total) * 100).toFixed(2) : '0.00'
+    const successRate = task.processed > 0 ? ((task.successed / task.processed) * 100).toFixed(2) : '0.00'
+
+    return {
+      processed: task.processed,
+      total: task.total,
+      successed: task.successed,
+      percentage,
+      successRate
+    }
+  }
+}

+ 1 - 1
src/services/sms/xins.service.ts

@@ -125,7 +125,7 @@ export class xinsService extends GetSmsService {
       const status = this.parseReportStatus(state, error)
 
       return {
-        number: `+${report.Callee}`,
+        number: report.Callee,
         status
       }
     })

+ 2 - 0
src/types/fastify.d.ts

@@ -1,5 +1,6 @@
 import 'fastify'
 import { DataSource } from 'typeorm'
+import { BaseScheduler } from '../schedulers/base.scheduler'
 
 declare module 'fastify' {
   interface FastifyInstance {
@@ -27,6 +28,7 @@ declare module 'fastify' {
       UPLOAD_FOLDER: string
     }
     dataSource: DataSource
+    schedulers?: BaseScheduler[]
   }
 
   interface FastifyRequest {