我有下面的函数,它应该做以下事情:
- 获取ReadableStream
- 一块一块地读
- 在最后一个块之前(openai用“DONE”字符串表示),应该添加一个带有我的扩展名Payload的额外块。
问题是来自原始/开放流的最后一个块的数据+我的扩展数据被合并到一个块中。但是要在客户端中处理这些块,我需要它们是独立的块。
export async function extendOpenAIStream(
openaiStream: ReadableStream<Uint8Array>,
extensionPayload: JSONValue
) {
const encoder = new TextEncoder()
const decoder = new TextDecoder()
const reader = openaiStream.getReader()
const stream = new ReadableStream({
cancel() {
reader.cancel()
},
async start(controller) {
while (true) {
const { done, value } = await reader.read()
const dataString = decoder.decode(value)
// https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
if (done || dataString.includes('[DONE]')) {
// Enque our extension
const extendedValue = encoder.encode(
`data: ${JSON.stringify(extensionPayload)}\n\n`
)
controller.enqueue(extendedValue)
// Enque the original chunk
controller.enqueue(value)
// Close the stream
controller.close()
break
}
controller.enqueue(value)
}
},
})
return stream
}
预期的块(分隔块):
data: {"extensionPayload": {...}}
data: {"id":"...,"object":"chat.completion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}
data: [DONE]
实际块(合并为一个块):
data: {"extensionPayload": {...}}
data: {"id":"...,"object":"chat.completion.chunk","created":1684486791,"model":"gpt-3.5-turbo-0301","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}
data: [DONE]
1条答案
按热度按时间1rhkuytd1#
如果有人需要做类似的事情,我终于有了解决方案。事实证明,由于我使用的是eventsource-parser,所以是否拆分块并不重要。真实的的问题是,块在哪里得到碎片。根据Vercel的说法,这是预期的(见此处)。我现在也在cloud函数中使用eventsource-parser来确保块不会被分割: