| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- import { Repository, Like } from 'typeorm'
- import { FastifyInstance } from 'fastify'
- import { SmsTask } from '../entities/sms-task.entity'
- import { SmsTaskItem } from '../entities/sms-task-item.entity'
- import { PaginationResponse } from '../dto/common.dto'
- import { UpdateSmsTaskBody, ListSmsTaskQuery, ListSmsTaskItemQuery } from '../dto/sms-task.dto'
- import { TaskStatus, TaskItemStatus } from '../enum/task.enum'
- export class SmsTaskService {
- private smsTaskRepository: Repository<SmsTask>
- private smsTaskItemRepository: Repository<SmsTaskItem>
- private app: FastifyInstance
- private taskItemInsertChunkSize: number = 1000
- constructor(app: FastifyInstance) {
- this.app = app
- this.smsTaskRepository = app.dataSource.getRepository(SmsTask)
- this.smsTaskItemRepository = app.dataSource.getRepository(SmsTaskItem)
- }
- async create(data: {
- userId: number
- name: string
- message: string
- buffer: Buffer
- remark?: string
- }): Promise<SmsTask> {
- const task = this.smsTaskRepository.create({
- userId: data.userId,
- name: data.name,
- message: data.message,
- remark: data.remark,
- delFlag: false
- })
- const savedTask = await this.smsTaskRepository.save(task)
- const total = await this.createTaskItemByBuffer({ taskId: savedTask.id, buffer: data.buffer })
- await this.smsTaskRepository.update(savedTask.id, { total })
- return await this.smsTaskRepository.findOneOrFail({ where: { id: savedTask.id } })
- }
- private async createTaskItemByBuffer(data: { taskId: number; buffer: Buffer }): Promise<number> {
- const content = data.buffer.toString('utf-8')
- const lines = content.split(/\r?\n/).filter(line => line.trim())
- if (lines.length === 0) {
- return 0
- }
- const values = lines.map(line => ({
- taskId: data.taskId,
- target: line.trim(),
- status: TaskItemStatus.IDLE
- }))
- await this.app.dataSource.transaction(async manager => {
- const repo = manager.getRepository(SmsTaskItem)
- for (let i = 0; i < values.length; i += this.taskItemInsertChunkSize) {
- const chunk = values.slice(i, i + this.taskItemInsertChunkSize)
- await repo.insert(chunk)
- }
- })
- return values.length
- }
- async findById(id: number): Promise<SmsTask> {
- return this.smsTaskRepository.findOneOrFail({
- where: { id, delFlag: false }
- })
- }
- async findAll(query: ListSmsTaskQuery): Promise<PaginationResponse<SmsTask>> {
- const { page = 0, size = 20, userId, status, name } = query
- const where: any = {
- delFlag: false
- }
- if (userId) {
- where.userId = userId
- }
- if (status) {
- where.status = status
- }
- if (name) {
- where.name = Like(`%${name}%`)
- }
- const [tasks, total] = await this.smsTaskRepository.findAndCount({
- where,
- skip: Number(page) * Number(size),
- take: Number(size),
- order: {
- createdAt: 'DESC'
- }
- })
- return {
- content: tasks,
- metadata: {
- total: Number(total),
- page: Number(page),
- size: Number(size)
- }
- }
- }
- async update(id: number, data: UpdateSmsTaskBody): Promise<SmsTask> {
- const updateData: Partial<SmsTask> = {}
- if (data.name !== undefined) updateData.name = data.name
- if (data.message !== undefined) updateData.message = data.message
- if (data.status !== undefined) updateData.status = data.status
- if (data.processed !== undefined) updateData.processed = data.processed
- if (data.successed !== undefined) updateData.successed = data.successed
- if (data.total !== undefined) updateData.total = data.total
- if (data.startedAt !== undefined) updateData.startedAt = data.startedAt
- if (data.remark !== undefined) updateData.remark = data.remark
- await this.smsTaskRepository.update(id, updateData)
- return this.findById(id)
- }
- async delete(id: number): Promise<void> {
- const task = await this.findById(id)
- if (!task) {
- throw new Error('当前任务不存在')
- }
- if (task.status !== TaskStatus.IDLE) {
- throw new Error('当前任务状态无法删除')
- }
- await this.smsTaskRepository.update(id, { delFlag: true })
- }
- async findAllTaskItems(query: ListSmsTaskItemQuery): Promise<PaginationResponse<SmsTaskItem>> {
- const { page = 0, size = 20, taskId, status } = query
- const where: any = {}
- if (taskId) {
- where.taskId = taskId
- }
- if (status) {
- where.status = status as TaskItemStatus
- }
- const [items, total] = await this.smsTaskItemRepository.findAndCount({
- where,
- skip: Number(page) * Number(size),
- take: Number(size),
- order: {
- id: 'DESC'
- }
- })
- return {
- content: items,
- metadata: {
- total: Number(total),
- page: Number(page),
- size: Number(size)
- }
- }
- }
- async start(id: number): Promise<void> {
- const task = await this.findById(id)
- if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSED && task.status !== TaskStatus.SCHEDULED) {
- throw new Error('当前任务状态无法开始任务')
- }
- const num = await this.smsTaskRepository.count({
- where: {
- status: TaskStatus.PENDING,
- delFlag: false
- }
- })
- const newStatus = num > 0 ? TaskStatus.QUEUED : TaskStatus.PENDING
- await this.smsTaskRepository.update(id, {
- status: newStatus,
- startedAt: new Date()
- })
- }
- async pause(id: number): Promise<void> {
- const task = await this.findById(id)
- if (
- task.status !== TaskStatus.PENDING &&
- task.status !== TaskStatus.RUNNING &&
- task.status !== TaskStatus.CUTTING &&
- task.status !== TaskStatus.QUEUED &&
- task.status !== TaskStatus.SCHEDULED
- ) {
- throw new Error('当前任务状态无法暂停任务')
- }
- await this.smsTaskRepository.update(id, {
- status: TaskStatus.PAUSED
- })
- }
- }
|