本文介绍了如何使用 JavaScript 的 Streams API 中的 TransformStream 将 ReadableStream 对象分割成行。通过创建一个自定义的 LineSplitter 类,该类继承自 TransformStream,可以有效地处理跨越多个数据块的行,并确保每一行都完整地传递给下游的消费者。该方法避免了简单地按块分割可能导致的不完整行的问题,提供了一种可靠的按行读取 ReadableStream 的解决方案。
使用 TransformStream 分割 ReadableStream 为行的详细教程
ReadableStream 是一种强大的 API,用于处理流式数据,例如从网络请求返回的数据。然而,直接按块读取数据可能不方便,特别是当需要按行处理数据时。本教程将演示如何使用 TransformStream 将 ReadableStream 分割成行,确保每一行都完整地传递给下游的消费者。
理解 TransformStream
TransformStream 允许我们在读取流的过程中转换数据。它接受一个 transform 函数和一个 flush 函数作为参数。
- transform 函数: 接收每个数据块,并决定如何处理它。它可以将数据块转换为新的数据块,并将其传递给下游。
- flush 函数: 在流结束时调用,允许我们处理任何剩余的数据。
实现 LineSplitter 类
以下是一个自定义的 LineSplitter 类,它继承自 TransformStream,用于将 ReadableStream 分割成行。
function concatArrayBuffers(chunks: Uint8Array[]): Uint8Array { const result = new Uint8Array(chunks.reduce((a, c) => a + c.length, 0)); let offset = 0; for (const chunk of chunks) { result.set(chunk, offset); offset += chunk.length; } return result; } class LineSplitter extends TransformStream<Uint8Array, Uint8Array> { protected _buffer: Uint8Array[] = []; constructor() { super({ transform: (chunk, controller) => { let index; let rest = chunk; while ((index = rest.indexOf(0x0a)) !== -1) { controller.enqueue(concatArrayBuffers([...this._buffer, rest.slice(0, index + 1)])); rest = rest.slice(index + 1); this._buffer = []; } if (rest.length > 0) { this._buffer.push(rest); } }, flush: (controller) => { if (this._buffer.length > 0) { controller.enqueue(concatArrayBuffers(this._buffer)); } } }); } }
代码解释:
-
concatArrayBuffers 函数: 该函数用于将多个 Uint8Array 合并成一个 Uint8Array。这是因为我们需要将缓冲区中的数据块和当前块的一部分合并成一行。
-
LineSplitter 类:
- _buffer 属性: 用于存储尚未完成的行的 Uint8Array 块。
- transform 方法:
- 在每个块中查找换行符 (0x0a) 的索引。
- 如果找到换行符,则将缓冲区中的所有块与当前块中换行符之前的部分合并,并将其添加到控制器中。
- 更新剩余的块。
- 如果当前块中还有剩余的数据,则将其添加到缓冲区中。
- flush 方法:
- 在流结束时调用,将缓冲区中剩余的数据添加到控制器中。
使用 LineSplitter
以下是如何使用 LineSplitter 的示例:
const linesStream = (await fetch('http://example.com')).body .pipeThrough(new LineSplitter()); for await (const line of linesStream.pipeThrough(new TextDecoderStream())) { console.log(line); }
代码解释:
- fetch(‘http://example.com’): 发起一个网络请求,获取 ReadableStream 对象。
- .pipeThrough(new LineSplitter()): 将 ReadableStream 通过 LineSplitter 进行转换,将其分割成行的 ReadableStream。
- .pipeThrough(new StringDecoderStream()): 使用 TextDecoderStream 将 Uint8Array 转换为字符串。
- for await (const line of linesStream): 循环遍历行的 ReadableStream,并打印每一行。
注意事项
- 此示例假设行尾使用换行符 (n, 0x0a) 作为分隔符。如果需要支持其他换行符,例如回车符 (r, 0x0d) 或回车换行符 (rn),则需要修改 LineSplitter 类中的逻辑。
- 此示例使用 TextDecoderStream 将 Uint8Array 转换为字符串。可以根据需要使用其他解码器。
总结
通过使用 TransformStream,我们可以轻松地将 ReadableStream 分割成行,从而更方便地处理流式数据。LineSplitter 类提供了一个可重用的解决方案,可以用于处理各种类型的 ReadableStream,并确保每一行都完整地传递给下游的消费者。这种方法避免了简单地按块分割可能导致的不完整行的问题,提供了一种可靠的按行读取 ReadableStream 的解决方案。
javascript java ai red JavaScript for const 字符串 循环 继承 对象 transform http