瀏覽代碼

短信任务优化

wuyi 5 天之前
父節點
當前提交
30602e7261

+ 88 - 8
src/controllers/sms-task.controller.ts

@@ -1,6 +1,6 @@
 import { FastifyRequest, FastifyReply, FastifyInstance } from 'fastify'
 import { SmsTaskService } from '../services/sms-task.service'
-import { CreateSmsTaskBody, UpdateSmsTaskBody, ListSmsTaskQuery, GetSmsTaskParams } from '../dto/sms-task.dto'
+import { UpdateSmsTaskBody, ListSmsTaskQuery, GetSmsTaskParams, ListSmsTaskItemQuery } from '../dto/sms-task.dto'
 
 export class SmsTaskController {
   private smsTaskService: SmsTaskService
@@ -9,9 +9,49 @@ export class SmsTaskController {
     this.smsTaskService = new SmsTaskService(app)
   }
 
-  async create(request: FastifyRequest<{ Body: CreateSmsTaskBody }>, reply: FastifyReply) {
+  async create(request: FastifyRequest, reply: FastifyReply) {
     try {
-      const task = await this.smsTaskService.create(request.body)
+      const userId = request.user.id
+
+      const data = await request.file()
+      if (!data) {
+        return reply.code(400).send({ message: '请选择要上传的文件' })
+      }
+
+      if (!data.filename || !data.filename.toLowerCase().endsWith('.txt')) {
+        return reply.code(400).send({ message: '必须上传 txt 格式的文件' })
+      }
+
+      const nameField = data.fields?.['name']
+      const messageField = data.fields?.['message']
+      const remarkField = data.fields?.['remark']
+
+      const name =
+        nameField && !Array.isArray(nameField) && 'value' in nameField ? (nameField.value as string) : undefined
+      const message =
+        messageField && !Array.isArray(messageField) && 'value' in messageField
+          ? (messageField.value as string)
+          : undefined
+      const remark =
+        remarkField && !Array.isArray(remarkField) && 'value' in remarkField ? (remarkField.value as string) : undefined
+
+      if (!name) {
+        return reply.code(400).send({ message: '任务名称不能为空' })
+      }
+      if (!message) {
+        return reply.code(400).send({ message: 'message 不能为空' })
+      }
+
+      const buffer = await data.toBuffer()
+
+      const task = await this.smsTaskService.create({
+        userId,
+        name,
+        message,
+        buffer,
+        remark
+      })
+
       return reply.code(201).send({
         message: '创建成功',
         data: task
@@ -89,15 +129,55 @@ export class SmsTaskController {
         message: '删除成功'
       })
     } catch (error) {
-      if (error instanceof Error && error.message.includes('Could not find')) {
-        return reply.code(404).send({
-          message: '任务不存在'
-        })
-      }
       return reply.code(500).send({
         message: '删除失败',
         error: error instanceof Error ? error.message : String(error)
       })
     }
   }
+
+  async listSmsTaskItems(request: FastifyRequest<{ Querystring: ListSmsTaskItemQuery }>, reply: FastifyReply) {
+    try {
+      const result = await this.smsTaskService.findAllTaskItems(request.query)
+      return reply.send({
+        message: '查询成功',
+        ...result
+      })
+    } catch (error) {
+      return reply.code(500).send({
+        message: '查询失败',
+        error: error instanceof Error ? error.message : String(error)
+      })
+    }
+  }
+
+  async start(request: FastifyRequest<{ Params: GetSmsTaskParams }>, reply: FastifyReply) {
+    try {
+      const { id } = request.params
+      await this.smsTaskService.start(id)
+      return reply.send({
+        message: '任务已开始'
+      })
+    } catch (error) {
+      return reply.code(500).send({
+        message: '开始任务失败',
+        error: error instanceof Error ? error.message : String(error)
+      })
+    }
+  }
+
+  async pause(request: FastifyRequest<{ Params: GetSmsTaskParams }>, reply: FastifyReply) {
+    try {
+      const { id } = request.params
+      await this.smsTaskService.pause(id)
+      return reply.send({
+        message: '任务已暂停'
+      })
+    } catch (error) {
+      return reply.code(500).send({
+        message: '暂停任务失败',
+        error: error instanceof Error ? error.message : String(error)
+      })
+    }
+  }
 }

+ 5 - 0
src/dto/sms-task.dto.ts

@@ -29,3 +29,8 @@ export interface ListSmsTaskQuery extends Pagination {
 export interface GetSmsTaskParams {
   id: number
 }
+
+export interface ListSmsTaskItemQuery extends Pagination {
+  taskId?: number
+  status?: string
+}

+ 2 - 0
src/entities/sms-task-item.entity.ts

@@ -2,6 +2,8 @@ import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, Update
 import { TaskItemStatus } from '../enum/task.enum'
 
 @Entity()
+@Index('idx_taskId_status_id', ['taskId', 'status', 'id'])
+@Index('idx_taskId_createdAt', ['taskId', 'createdAt'])
 export class SmsTaskItem {
   @PrimaryGeneratedColumn()
   id: number

+ 3 - 1
src/entities/sms-task.entity.ts

@@ -1,7 +1,9 @@
-import { Column, CreateDateColumn, Entity, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'
+import { Column, CreateDateColumn, Entity, Index, PrimaryGeneratedColumn, UpdateDateColumn } from 'typeorm'
 import { TaskStatus } from '../enum/task.enum'
 
 @Entity()
+@Index('idx_status_delFlag_startedAt', ['status', 'delFlag', 'startedAt'])
+@Index('idx_userId_delFlag_createdAt', ['userId', 'delFlag', 'createdAt'])
 export class SmsTask {
   @PrimaryGeneratedColumn()
   id: number

+ 23 - 6
src/routes/sms-task.routes.ts

@@ -1,18 +1,14 @@
 import { FastifyInstance } from 'fastify'
 import { SmsTaskController } from '../controllers/sms-task.controller'
 import { authenticate, hasRole } from '../middlewares/auth.middleware'
-import { CreateSmsTaskBody, UpdateSmsTaskBody, ListSmsTaskQuery, GetSmsTaskParams } from '../dto/sms-task.dto'
+import { ListSmsTaskItemQuery, UpdateSmsTaskBody, ListSmsTaskQuery, GetSmsTaskParams } from '../dto/sms-task.dto'
 import { UserRole } from '../entities/user.entity'
 
 export default async function smsTaskRoutes(fastify: FastifyInstance) {
   const smsTaskController = new SmsTaskController(fastify)
 
   // 创建任务
-  fastify.post<{ Body: CreateSmsTaskBody }>(
-    '/create',
-    { onRequest: [authenticate] },
-    smsTaskController.create.bind(smsTaskController)
-  )
+  fastify.post('/create', { onRequest: [authenticate] }, smsTaskController.create.bind(smsTaskController))
 
   // 查询任务列表
   fastify.get<{ Querystring: ListSmsTaskQuery }>(
@@ -41,4 +37,25 @@ export default async function smsTaskRoutes(fastify: FastifyInstance) {
     { onRequest: [hasRole(UserRole.ADMIN)] },
     smsTaskController.delete.bind(smsTaskController)
   )
+
+  // 查询任务项列表
+  fastify.get<{ Querystring: ListSmsTaskItemQuery }>(
+    '/items',
+    { onRequest: [hasRole(UserRole.ADMIN)] },
+    smsTaskController.listSmsTaskItems.bind(smsTaskController)
+  )
+
+  // 开始任务
+  fastify.post<{ Params: GetSmsTaskParams }>(
+    '/:id/start',
+    { onRequest: [authenticate] },
+    smsTaskController.start.bind(smsTaskController)
+  )
+
+  // 暂停任务
+  fastify.post<{ Params: GetSmsTaskParams }>(
+    '/:id/pause',
+    { onRequest: [authenticate] },
+    smsTaskController.pause.bind(smsTaskController)
+  )
 }

+ 122 - 6
src/services/sms-task.service.ts

@@ -1,18 +1,30 @@
 import { Repository, Like } from 'typeorm'
 import { FastifyInstance } from 'fastify'
 import { SmsTask } from '../entities/sms-task.entity'
+import { SmsTaskItem } from '../entities/sms-task-item.entity'
 import { PaginationResponse } from '../dto/common.dto'
-import { CreateSmsTaskBody, UpdateSmsTaskBody, ListSmsTaskQuery } from '../dto/sms-task.dto'
-import { TaskStatus } from '../enum/task.enum'
+import { UpdateSmsTaskBody, ListSmsTaskQuery, ListSmsTaskItemQuery } from '../dto/sms-task.dto'
+import { TaskStatus, TaskItemStatus } from '../enum/task.enum'
 
 export class SmsTaskService {
   private smsTaskRepository: Repository<SmsTask>
+  private smsTaskItemRepository: Repository<SmsTaskItem>
+  private app: FastifyInstance
+  private taskItemInsertChunkSize: number = 1000
 
   constructor(app: FastifyInstance) {
+    this.app = app
     this.smsTaskRepository = app.dataSource.getRepository(SmsTask)
+    this.smsTaskItemRepository = app.dataSource.getRepository(SmsTaskItem)
   }
 
-  async create(data: CreateSmsTaskBody): Promise<SmsTask> {
+  async create(data: {
+    userId: number
+    name: string
+    message: string
+    buffer: Buffer
+    remark?: string
+  }): Promise<SmsTask> {
     const task = this.smsTaskRepository.create({
       userId: data.userId,
       name: data.name,
@@ -20,7 +32,35 @@ export class SmsTaskService {
       remark: data.remark,
       delFlag: false
     })
-    return this.smsTaskRepository.save(task)
+    const savedTask = await this.smsTaskRepository.save(task)
+
+    const total = await this.createTaskItemByBuffer({ taskId: savedTask.id, buffer: data.buffer })
+    await this.smsTaskRepository.update(savedTask.id, { total })
+    return await this.smsTaskRepository.findOneOrFail({ where: { id: savedTask.id } })
+  }
+
+  private async createTaskItemByBuffer(data: { taskId: number; buffer: Buffer }): Promise<number> {
+    const content = data.buffer.toString('utf-8')
+    const lines = content.split(/\r?\n/).filter(line => line.trim())
+    if (lines.length === 0) {
+      return 0
+    }
+
+    const values = lines.map(line => ({
+      taskId: data.taskId,
+      target: line.trim(),
+      status: TaskItemStatus.PENDING as TaskItemStatus
+    }))
+
+    await this.app.dataSource.transaction(async manager => {
+      const repo = manager.getRepository(SmsTaskItem)
+      for (let i = 0; i < values.length; i += this.taskItemInsertChunkSize) {
+        const chunk = values.slice(i, i + this.taskItemInsertChunkSize)
+        await repo.insert(chunk)
+      }
+    })
+
+    return values.length
   }
 
   async findById(id: number): Promise<SmsTask> {
@@ -68,8 +108,6 @@ export class SmsTaskService {
   }
 
   async update(id: number, data: UpdateSmsTaskBody): Promise<SmsTask> {
-    const task = await this.findById(id)
-
     const updateData: Partial<SmsTask> = {}
     if (data.name !== undefined) updateData.name = data.name
     if (data.message !== undefined) updateData.message = data.message
@@ -86,6 +124,84 @@ export class SmsTaskService {
 
   async delete(id: number): Promise<void> {
     const task = await this.findById(id)
+    if (!task) {
+      throw new Error('当前任务不存在')
+    }
+    if (task.status !== TaskStatus.IDLE) {
+      throw new Error('当前任务状态无法删除')
+    }
     await this.smsTaskRepository.update(id, { delFlag: true })
   }
+
+  async findAllTaskItems(query: ListSmsTaskItemQuery): Promise<PaginationResponse<SmsTaskItem>> {
+    const { page = 0, size = 20, taskId, status } = query
+
+    const where: any = {}
+
+    if (taskId) {
+      where.taskId = taskId
+    }
+
+    if (status) {
+      where.status = status as TaskItemStatus
+    }
+
+    const [items, total] = await this.smsTaskItemRepository.findAndCount({
+      where,
+      skip: Number(page) * Number(size),
+      take: Number(size),
+      order: {
+        id: 'DESC'
+      }
+    })
+
+    return {
+      content: items,
+      metadata: {
+        total: Number(total),
+        page: Number(page),
+        size: Number(size)
+      }
+    }
+  }
+
+  async start(id: number): Promise<void> {
+    const task = await this.findById(id)
+
+    if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSED && task.status !== TaskStatus.SCHEDULED) {
+      throw new Error('当前任务状态无法开始任务')
+    }
+
+    const num = await this.smsTaskRepository.count({
+      where: {
+        status: TaskStatus.PENDING,
+        delFlag: false
+      }
+    })
+
+    const newStatus = num > 0 ? TaskStatus.QUEUED : TaskStatus.PENDING
+
+    await this.smsTaskRepository.update(id, {
+      status: newStatus,
+      startedAt: new Date()
+    })
+  }
+
+  async pause(id: number): Promise<void> {
+    const task = await this.findById(id)
+
+    if (
+      task.status !== TaskStatus.PENDING &&
+      task.status !== TaskStatus.RUNNING &&
+      task.status !== TaskStatus.CUTTING &&
+      task.status !== TaskStatus.QUEUED &&
+      task.status !== TaskStatus.SCHEDULED
+    ) {
+      throw new Error('当前任务状态无法暂停任务')
+    }
+
+    await this.smsTaskRepository.update(id, {
+      status: TaskStatus.PAUSED
+    })
+  }
 }

+ 15 - 0
src/types/fastify.d.ts

@@ -34,6 +34,21 @@ declare module 'fastify' {
       filename: string
       mimetype: string
       toBuffer(): Promise<Buffer>
+      fields?: {
+        [key: string]: {
+          value: string
+        } | Array<{
+          value: string
+        }>
+      }
     } | null>
+    parts(): AsyncIterableIterator<{
+      type: 'file' | 'field'
+      fieldname: string
+      filename?: string
+      mimetype?: string
+      value?: string
+      toBuffer?(): Promise<Buffer>
+    }>
   }
 }