chat.service.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. import { ForbiddenException, Injectable, InternalServerErrorException, Logger } from '@nestjs/common'
  2. import { Observable, interval } from 'rxjs'
  3. import { ChatGPTAPI, ChatMessage } from '../chatapi'
  4. import type { RequestProps } from './types'
  5. import { chatReplyProcess } from './chatgpt'
  6. import { Repository, MoreThanOrEqual, LessThanOrEqual, And } from 'typeorm'
  7. import { ChatHistory } from './entities/chat.entity'
  8. import { InjectRepository } from '@nestjs/typeorm'
  9. import { TokenUsage } from './entities/token-usage.entity'
  10. import { format } from 'date-fns'
  11. import { MembershipService } from '../membership/membership.service'
  12. import { MemberType } from '../membership/entities/membership.entity'
  13. import { get_encoding } from '@dqbd/tiktoken'
  14. import { fetchSSE } from '../chatapi/fetch-sse'
  15. import { HttpService } from '@nestjs/axios'
  16. import * as types from '../chatapi/types'
  17. import { SysConfigService } from '../sys-config/sys-config.service'
  18. import { ChatPdfService } from "../chat-pdf/chat-pdf.service";
  19. @Injectable()
  20. export class ChatService {
  21. tokenizer = get_encoding('cl100k_base')
  22. constructor(
  23. @InjectRepository(ChatHistory)
  24. private readonly chatHistoryRepository: Repository<ChatHistory>,
  25. @InjectRepository(TokenUsage)
  26. private readonly tokenUsageRepository: Repository<TokenUsage>,
  27. private readonly membershipService: MembershipService,
  28. private readonly httpService: HttpService,
  29. private readonly sysConfigService: SysConfigService,
  30. private readonly chatPdfService: ChatPdfService
  31. ) { }
  32. public chat(req, res): Observable<any> {
  33. res.setHeader('Content-Type', 'application/octet-stream')
  34. return new Observable((observer) => {
  35. const { prompt, options = {}, systemMessage, temperature, top_p } = req.body as RequestProps
  36. let firstChunk = true
  37. chatReplyProcess({
  38. message: prompt,
  39. lastContext: options,
  40. process: (chat: ChatMessage) => {
  41. // observer.next(
  42. // new MessageEvent('message', {
  43. // data: firstChunk ? JSON.stringify(chat) : `\n${JSON.stringify(chat)}`
  44. // })
  45. // )
  46. observer.next(firstChunk ? JSON.stringify(chat) : `\n${JSON.stringify(chat)}`)
  47. firstChunk = false
  48. },
  49. systemMessage,
  50. temperature,
  51. top_p
  52. })
  53. .then(() => { })
  54. .catch((error) => {
  55. observer.error(error)
  56. })
  57. .finally(() => {
  58. observer.complete()
  59. })
  60. })
  61. }
  62. public async chat1(req, res) {
  63. res.setHeader('Content-type', 'application/octet-stream')
  64. const defSysMsg = (await this.sysConfigService.findByName('system_message'))?.value
  65. const membership = await this.membershipService.getMembership(req.user.id)
  66. const { prompt, options = {}, systemMessage, code, temperature, top_p } = req.body as RequestProps
  67. if (!code) {
  68. if (!membership) {
  69. throw new ForbiddenException('请先成为会员')
  70. }
  71. if (membership.memberType == MemberType.Trial && membership.tokenLeft <= 0) {
  72. throw new ForbiddenException('您的试用额度已用完,请升级会员')
  73. }
  74. if (membership.isExpired) {
  75. throw new ForbiddenException('您的会员已过期,请即时续费')
  76. }
  77. }
  78. try {
  79. let content = ''
  80. const promptTime = new Date()
  81. if (!!code) {
  82. content = await this.chatPdfService.getSystemConfig(prompt, code, code)
  83. }else {
  84. content = systemMessage
  85. }
  86. let firstChunk = true
  87. const result = await chatReplyProcess({
  88. message: prompt,
  89. lastContext: options,
  90. process: (chat: ChatMessage) => {
  91. res.write(firstChunk ? JSON.stringify(chat) : `\n${JSON.stringify(chat)}`)
  92. firstChunk = false
  93. },
  94. systemMessage: content || defSysMsg,
  95. temperature,
  96. top_p
  97. })
  98. let chatMessage = result.data as ChatMessage
  99. this.chatHistoryRepository
  100. .save(
  101. new ChatHistory({
  102. messageId: chatMessage.parentMessageId,
  103. parentMessageId: options.parentMessageId,
  104. userId: req.user.id,
  105. message: prompt,
  106. role: 'user',
  107. token: chatMessage.detail.usage.prompt_tokens,
  108. time: promptTime
  109. })
  110. )
  111. .catch((e) => {
  112. Logger.error(e, 'SAVE CHAT HISTORY')
  113. })
  114. this.chatHistoryRepository
  115. .save(
  116. new ChatHistory({
  117. messageId: chatMessage.id,
  118. parentMessageId: chatMessage.parentMessageId,
  119. userId: req.user.id,
  120. message: chatMessage.text,
  121. role: 'assistant',
  122. token: chatMessage.detail.usage.completion_tokens,
  123. time: new Date()
  124. })
  125. )
  126. .catch((e) => {
  127. Logger.error(e, 'SAVE CHAT HISTORY')
  128. })
  129. this.saveUsage(req.user.id, chatMessage.detail.usage.total_tokens)
  130. } catch (error) {
  131. res.write(JSON.stringify(error))
  132. } finally {
  133. res.end()
  134. }
  135. }
  136. public async sendMessage(prompt: string, message: string,): Promise<string> {
  137. try {
  138. const result = await chatReplyProcess({
  139. message: prompt,
  140. systemMessage: message,
  141. process: (chat: ChatMessage) => { },
  142. });
  143. const chatMessage = result.data as ChatMessage;
  144. this.chatHistoryRepository.save(
  145. new ChatHistory({
  146. messageId: chatMessage.parentMessageId,
  147. parentMessageId: null,
  148. userId: 0,
  149. message: message,
  150. role: 'system',
  151. token: chatMessage.detail.usage.prompt_tokens,
  152. time: new Date(),
  153. }),
  154. );
  155. this.chatHistoryRepository.save(
  156. new ChatHistory({
  157. messageId: chatMessage.id,
  158. parentMessageId: chatMessage.parentMessageId,
  159. userId: 0,
  160. message: chatMessage.text,
  161. role: 'assistant',
  162. token: chatMessage.detail.usage.completion_tokens,
  163. time: new Date(),
  164. }),
  165. );
  166. Logger.log(`机器人回答:${chatMessage.text}`, 'SendMessage');
  167. return chatMessage.text
  168. } catch (error) {
  169. Logger.error(error, 'SendMessage');
  170. }
  171. }
  172. public async chatProxy(req) {
  173. const url = `${process.env.AZURE_OPENAI_ENDPOINT}/openai/deployments/${process.env.AZURE_OPENAI_DEPLOYMENT}/chat/completions?api-version=${process.env.AZURE_OPENAI_VERSION}`
  174. req.body.stream = false
  175. try {
  176. const { data } = await this.httpService.axiosRef.post(url, req.body, {
  177. headers: {
  178. 'Content-Type': 'application/json',
  179. 'api-key': `${process.env.AZURE_OPENAI_KEY}`
  180. }
  181. })
  182. this.saveUsage(req.user.id, data.usage.total_tokens, false)
  183. return data
  184. } catch (e) {
  185. throw new InternalServerErrorException(e.response.data)
  186. }
  187. }
  188. public streamChatProxy(req) {
  189. const url = `${process.env.AZURE_OPENAI_ENDPOINT}/openai/deployments/${process.env.AZURE_OPENAI_DEPLOYMENT}/chat/completions?api-version=${process.env.AZURE_OPENAI_VERSION}`
  190. req.body.stream = true
  191. return new Observable((subscriber) => {
  192. let text = ''
  193. fetchSSE(url, {
  194. body: JSON.stringify(req.body),
  195. headers: {
  196. 'Content-Type': 'application/json',
  197. 'api-key': `${process.env.AZURE_OPENAI_KEY}`
  198. },
  199. method: 'POST',
  200. onMessage: (msg: string) => {
  201. subscriber.next(msg)
  202. if ('[DONE]' === msg) {
  203. Logger.log('done', 'CHAT PROXY')
  204. this.tiktokenAndSave(
  205. req.user.id,
  206. req.body.messages.map((message) => `${message.role}:\n${message.content}`).join('\n\n') +
  207. '\n\nassistant:\n' +
  208. text
  209. )
  210. return subscriber.complete()
  211. }
  212. const response: types.openai.CreateChatCompletionDeltaResponse = JSON.parse(msg)
  213. if (response.choices?.length) {
  214. const delta = response.choices[0].delta
  215. if (delta?.content) text += delta.content
  216. }
  217. },
  218. onError: (err) => {
  219. Logger.error(err, 'CHAT PROXY')
  220. subscriber.error(err)
  221. subscriber.complete()
  222. }
  223. }).catch((e) => {
  224. Logger.error(e, 'CHAT PROXY')
  225. subscriber.error(e)
  226. })
  227. })
  228. }
  229. public async tiktokenAndSave(userId: number, text: string) {
  230. // TODO: use a better fix in the tokenizer
  231. text = text.replace(/<\|endoftext\|>/g, '')
  232. const tokenizer = get_encoding('cl100k_base')
  233. const token = tokenizer.encode(text).length
  234. await this.saveUsage(userId, token, false)
  235. }
  236. public async saveUsage(userId: number, usage: number, member = true) {
  237. const date = format(new Date(), 'yyyy-MM-dd')
  238. const tokenUsage = await this.tokenUsageRepository.findOneBy({
  239. userId,
  240. date
  241. })
  242. if (tokenUsage) {
  243. tokenUsage.usage += usage
  244. await this.tokenUsageRepository.save(tokenUsage)
  245. } else {
  246. await this.tokenUsageRepository.save(new TokenUsage({ userId, usage, date }))
  247. }
  248. if (member) {
  249. this.membershipService.saveUsage(userId, usage)
  250. }
  251. }
  252. public async getUsage(userId: number, start?: string, end?: string): Promise<TokenUsage[]> {
  253. const date = format(new Date(), 'yyyy-MM-dd')
  254. return await this.tokenUsageRepository.findBy({
  255. userId,
  256. date: And(MoreThanOrEqual(start || '2023-04-01'), LessThanOrEqual(end || date))
  257. })
  258. }
  259. }