embedding.ts 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import { Logger } from '@nestjs/common'
  2. import { OpenAIEmbeddings } from 'langchain/embeddings/openai'
  3. import * as queue from 'fastq'
  4. import { setTimeout } from 'timers/promises'
  5. const chunkArray = (arr, chunkSize) =>
  6. arr.reduce((chunks, elem, index) => {
  7. const chunkIndex = Math.floor(index / chunkSize)
  8. const chunk = chunks[chunkIndex] || []
  9. // eslint-disable-next-line no-param-reassign
  10. chunks[chunkIndex] = chunk.concat([elem])
  11. return chunks
  12. }, [])
  13. export class OpenAIParallelEmbeddings extends OpenAIEmbeddings {
  14. override async embedDocuments(texts: string[]) {
  15. const subPrompts = chunkArray(
  16. this.stripNewLines ? texts.map((t) => t.replace(/\n/g, ' ')) : texts,
  17. this.batchSize
  18. )
  19. const embeddings = []
  20. const self = this
  21. async function worker({ i, input }) {
  22. try {
  23. const { data } = await self['embeddingWithRetry']({
  24. model: self.modelName,
  25. input
  26. })
  27. embeddings[i] = data.data[0].embedding
  28. Logger.log(`create embedding for ${i + 1}/${subPrompts.length}`)
  29. } catch (error) {
  30. Logger.error(error)
  31. }
  32. }
  33. const q = queue.promise(worker, 8)
  34. subPrompts.forEach((item, index) => {
  35. q.push({
  36. input: item,
  37. i: index
  38. })
  39. })
  40. await q.drained()
  41. return embeddings
  42. }
  43. }