| 123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- import { Logger } from '@nestjs/common'
- import { OpenAIEmbeddings } from 'langchain/embeddings/openai'
- import * as queue from 'fastq'
- import { setTimeout } from 'timers/promises'
- const chunkArray = (arr, chunkSize) =>
- arr.reduce((chunks, elem, index) => {
- const chunkIndex = Math.floor(index / chunkSize)
- const chunk = chunks[chunkIndex] || []
- // eslint-disable-next-line no-param-reassign
- chunks[chunkIndex] = chunk.concat([elem])
- return chunks
- }, [])
- export class OpenAIParallelEmbeddings extends OpenAIEmbeddings {
- override async embedDocuments(texts: string[]) {
- const subPrompts = chunkArray(
- this.stripNewLines ? texts.map((t) => t.replace(/\n/g, ' ')) : texts,
- this.batchSize
- )
- const embeddings = []
- const self = this
- async function worker({ i, input }) {
- try {
- const { data } = await self['embeddingWithRetry']({
- model: self.modelName,
- input
- })
- embeddings[i] = data.data[0].embedding
- Logger.log(`create embedding for ${i + 1}/${subPrompts.length}`)
- } catch (error) {
- Logger.error(error)
- }
- }
- const q = queue.promise(worker, 8)
- subPrompts.forEach((item, index) => {
- q.push({
- input: item,
- i: index
- })
- })
- await q.drained()
- return embeddings
- }
- }
|