sms-task.service.ts 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import { Repository, Like } from 'typeorm'
  2. import { FastifyInstance } from 'fastify'
  3. import { SmsTask } from '../entities/sms-task.entity'
  4. import { SmsTaskItem } from '../entities/sms-task-item.entity'
  5. import { PaginationResponse } from '../dto/common.dto'
  6. import { UpdateSmsTaskBody, ListSmsTaskQuery, ListSmsTaskItemQuery } from '../dto/sms-task.dto'
  7. import { TaskStatus, TaskItemStatus } from '../enum/task.enum'
  8. export class SmsTaskService {
  9. private smsTaskRepository: Repository<SmsTask>
  10. private smsTaskItemRepository: Repository<SmsTaskItem>
  11. private app: FastifyInstance
  12. private taskItemInsertChunkSize: number = 1000
  13. constructor(app: FastifyInstance) {
  14. this.app = app
  15. this.smsTaskRepository = app.dataSource.getRepository(SmsTask)
  16. this.smsTaskItemRepository = app.dataSource.getRepository(SmsTaskItem)
  17. }
  18. async create(data: {
  19. userId: number
  20. name: string
  21. message: string
  22. buffer: Buffer
  23. remark?: string
  24. }): Promise<SmsTask> {
  25. const task = this.smsTaskRepository.create({
  26. userId: data.userId,
  27. name: data.name,
  28. message: data.message,
  29. remark: data.remark,
  30. delFlag: false
  31. })
  32. const savedTask = await this.smsTaskRepository.save(task)
  33. const total = await this.createTaskItemByBuffer({ taskId: savedTask.id, buffer: data.buffer })
  34. await this.smsTaskRepository.update(savedTask.id, { total })
  35. return await this.smsTaskRepository.findOneOrFail({ where: { id: savedTask.id } })
  36. }
  37. private async createTaskItemByBuffer(data: { taskId: number; buffer: Buffer }): Promise<number> {
  38. const content = data.buffer.toString('utf-8')
  39. const lines = content.split(/\r?\n/).filter(line => line.trim())
  40. if (lines.length === 0) {
  41. return 0
  42. }
  43. const values = lines.map(line => ({
  44. taskId: data.taskId,
  45. target: line.trim(),
  46. status: TaskItemStatus.PENDING as TaskItemStatus
  47. }))
  48. await this.app.dataSource.transaction(async manager => {
  49. const repo = manager.getRepository(SmsTaskItem)
  50. for (let i = 0; i < values.length; i += this.taskItemInsertChunkSize) {
  51. const chunk = values.slice(i, i + this.taskItemInsertChunkSize)
  52. await repo.insert(chunk)
  53. }
  54. })
  55. return values.length
  56. }
  57. async findById(id: number): Promise<SmsTask> {
  58. return this.smsTaskRepository.findOneOrFail({
  59. where: { id, delFlag: false }
  60. })
  61. }
  62. async findAll(query: ListSmsTaskQuery): Promise<PaginationResponse<SmsTask>> {
  63. const { page = 0, size = 20, userId, status, name } = query
  64. const where: any = {
  65. delFlag: false
  66. }
  67. if (userId) {
  68. where.userId = userId
  69. }
  70. if (status) {
  71. where.status = status
  72. }
  73. if (name) {
  74. where.name = Like(`%${name}%`)
  75. }
  76. const [tasks, total] = await this.smsTaskRepository.findAndCount({
  77. where,
  78. skip: Number(page) * Number(size),
  79. take: Number(size),
  80. order: {
  81. createdAt: 'DESC'
  82. }
  83. })
  84. return {
  85. content: tasks,
  86. metadata: {
  87. total: Number(total),
  88. page: Number(page),
  89. size: Number(size)
  90. }
  91. }
  92. }
  93. async update(id: number, data: UpdateSmsTaskBody): Promise<SmsTask> {
  94. const updateData: Partial<SmsTask> = {}
  95. if (data.name !== undefined) updateData.name = data.name
  96. if (data.message !== undefined) updateData.message = data.message
  97. if (data.status !== undefined) updateData.status = data.status
  98. if (data.processed !== undefined) updateData.processed = data.processed
  99. if (data.successed !== undefined) updateData.successed = data.successed
  100. if (data.total !== undefined) updateData.total = data.total
  101. if (data.startedAt !== undefined) updateData.startedAt = data.startedAt
  102. if (data.remark !== undefined) updateData.remark = data.remark
  103. await this.smsTaskRepository.update(id, updateData)
  104. return this.findById(id)
  105. }
  106. async delete(id: number): Promise<void> {
  107. const task = await this.findById(id)
  108. if (!task) {
  109. throw new Error('当前任务不存在')
  110. }
  111. if (task.status !== TaskStatus.IDLE) {
  112. throw new Error('当前任务状态无法删除')
  113. }
  114. await this.smsTaskRepository.update(id, { delFlag: true })
  115. }
  116. async findAllTaskItems(query: ListSmsTaskItemQuery): Promise<PaginationResponse<SmsTaskItem>> {
  117. const { page = 0, size = 20, taskId, status } = query
  118. const where: any = {}
  119. if (taskId) {
  120. where.taskId = taskId
  121. }
  122. if (status) {
  123. where.status = status as TaskItemStatus
  124. }
  125. const [items, total] = await this.smsTaskItemRepository.findAndCount({
  126. where,
  127. skip: Number(page) * Number(size),
  128. take: Number(size),
  129. order: {
  130. id: 'DESC'
  131. }
  132. })
  133. return {
  134. content: items,
  135. metadata: {
  136. total: Number(total),
  137. page: Number(page),
  138. size: Number(size)
  139. }
  140. }
  141. }
  142. async start(id: number): Promise<void> {
  143. const task = await this.findById(id)
  144. if (task.status !== TaskStatus.IDLE && task.status !== TaskStatus.PAUSED && task.status !== TaskStatus.SCHEDULED) {
  145. throw new Error('当前任务状态无法开始任务')
  146. }
  147. const num = await this.smsTaskRepository.count({
  148. where: {
  149. status: TaskStatus.PENDING,
  150. delFlag: false
  151. }
  152. })
  153. const newStatus = num > 0 ? TaskStatus.QUEUED : TaskStatus.PENDING
  154. await this.smsTaskRepository.update(id, {
  155. status: newStatus,
  156. startedAt: new Date()
  157. })
  158. }
  159. async pause(id: number): Promise<void> {
  160. const task = await this.findById(id)
  161. if (
  162. task.status !== TaskStatus.PENDING &&
  163. task.status !== TaskStatus.RUNNING &&
  164. task.status !== TaskStatus.CUTTING &&
  165. task.status !== TaskStatus.QUEUED &&
  166. task.status !== TaskStatus.SCHEDULED
  167. ) {
  168. throw new Error('当前任务状态无法暂停任务')
  169. }
  170. await this.smsTaskRepository.update(id, {
  171. status: TaskStatus.PAUSED
  172. })
  173. }
  174. }