在NodeJS Cloud函数中转发ReadableStream,并在结束前添加额外的块

lmyy7pcs  于 2023-05-22  发布在  Node.js
关注(0)|答案(1)|浏览(113)

我有下面的函数,它应该做以下事情:

  • 获取ReadableStream
  • 一块一块地读
  • 在最后一个块之前(openai用“DONE”字符串表示),应该添加一个带有我的扩展名Payload的额外块。

问题是来自原始/开放流的最后一个块的数据+我的扩展数据被合并到一个块中。但是要在客户端中处理这些块,我需要它们是独立的块。

export async function extendOpenAIStream(
  openaiStream: ReadableStream<Uint8Array>,
  extensionPayload: JSONValue
) {
  const encoder = new TextEncoder()
  const decoder = new TextDecoder()

  const reader = openaiStream.getReader()

  const stream = new ReadableStream({
    cancel() {
      reader.cancel()
    },
    async start(controller) {
      while (true) {
        const { done, value } = await reader.read()
        const dataString = decoder.decode(value)

        // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
        if (done || dataString.includes('[DONE]')) {
          // Enque our extension
          const extendedValue = encoder.encode(
            `data: ${JSON.stringify(extensionPayload)}\n\n`
          )
          controller.enqueue(extendedValue)

          // Enque the original chunk
          controller.enqueue(value)

          // Close the stream
          controller.close()
          break
        }
        controller.enqueue(value)
      }
    },
  })

  return stream
}

预期的块(分隔块):

data: {"extensionPayload": {...}}
data: {"id":"...,"object":"chat.completion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}

data: [DONE]

实际块(合并为一个块):

data: {"extensionPayload": {...}}

data: {"id":"...,"object":"chat.completion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}

data: [DONE]
1rhkuytd

1rhkuytd1#

如果有人需要做类似的事情,我终于有了解决方案。事实证明,由于我使用的是eventsource-parser,所以是否拆分块并不重要。真实的的问题是,块在哪里得到碎片。根据Vercel的说法,这是预期的(见此处)。我现在也在cloud函数中使用eventsource-parser来确保块不会被分割:

const stream = new ReadableStream({
  cancel() {
    reader.cancel()
  },
  async start(controller) {
    // Chunks might get fragmented so we use eventsource-parse to ensure the chunks are complete
    // See: https://vercel.com/docs/concepts/functions/edge-functions/streaming#caveats
    const parser = createParser((e) => {
      if (e.type !== 'event') return
      controller.enqueue(encoder.encode(`data: ${e.data}\n\n`))
    })

    while (true) {
      const { done, value } = await reader.read()
      const dataString = decoder.decode(value)

      // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
      if (done || dataString.includes('[DONE]')) {
        // Enque our extension
        const extendedValue = encoder.encode(
          `data: ${JSON.stringify(extensionPayload)}\n\n`
        )
        controller.enqueue(extendedValue)

        // Close the stream
        controller.close()
        break
      }
      parser.feed(dataString)
    }
  },
})

相关问题