通过异步迭代简化 Node.js 流
使用异步迭代操作 Node.js 流相当有意思。这篇博客文章探讨如何做到这些。
目录:
[[toc]]
概括:异步迭代与异步生成器
异步迭代是异步检索数据内容的协议(意味着在取得项目前可能暂停当前任务)。
异步迭代器用于强化异步迭代。举例来说,下面是一个异步迭代器函数:
/**
* @returns an asynchronous iterable
*/
async function* asyncGenerator(asyncIterable) {
for await (const item of asyncIterable) { // input
if (···) {
yield '> ' + item; // output
}
}
}
for-await-of
循环迭代输入asyncIterable
。该循环也作用于通常的异步函数。yield
提供由该生成器返回的异步迭代值。
在文章接下来的部分,不再说明函数是异步函数还是异步生成器函数:
/** @returns a Promise */
async function asyncFunction() {
/* ··· */
}
/** @returns an async iterable */
async function* asyncGeneratorFunction() {
/* ··· */
}
流
流的核心思想是一种“分隔并攻克”大量数据的模式:当我们将大量数据分割为一些小的部分并一次处理一部分,我们可以处理它。
Node.js 提供多种类的流,举例来说:
- readable streams(可读流) 是可以读取数据的流。换句话说,它们是数据的来源。一个例子是可读文件流,可以让我们读取文件的内容。
- writable streams(可写流) 是可以写数据的流。换句话说,它们是数据的水池。一个例子是可写文件流,可以让我们向文件写数据。
- transform stream(转换流) 是同时可读和可写的流。作为可写流,它提取数据,并转换(改变或丢弃)它们,然后作为可读流输出它们。
流程
为了处理数据流可分为多个步骤,我们可以管道化(连接)流:
- 通过可读流收集输入。
- 通过转换流处理每一步。
- 在最后一步,我们有两个选项:
- 在最新的可读流中写入信息到一个可写流。于是可写流是管道步骤的最后一步。
- 可以使用其它方法在最新的可读流中提取数据。
第(2)步可选。
文本编码
当创建文本流,最好总是使用同一种编码:
- Node.js 文档有一份支持的编码和它们的默认拼写的列表,举例来说:
'utf8'
'utf16le'
'base64'
- 同时也允许使用一些不同拼写方式。可以使用 Buffer.isEncoding() 来检查哪一种是支持的:
buffer.Buffer.isEncoding('utf8') // true buffer.Buffer.isEncoding('utf-8') // true buffer.Buffer.isEncoding('UTF-8') // true buffer.Buffer.isEncoding('UTF:8') // false
编码的默认值是 null
,这等价于 'utf8'
。
帮助函数:readableToString()
有时候我们使用下面的帮助函数。不需要理解它如何工作,仅仅知道它做了什么。
import * as stream from 'stream'
/**
* Reads all the text in a readable stream
* and returns it as a string,
* via a Promise.
* @param {stream.Readable} readable
*/
function readableToString(readable) {
return new Promise((resolve, reject) => {
let data = ''
readable.on('data', (chunk) => {
data += chunk
})
readable.on('end', () => {
resolve(data)
})
readable.on('error', (err) => {
reject(err)
})
})
}
该函数是通过基础事件 API 实现的。我们将在后面使用一个简单方式实现这些,通过异步迭代。
一些前置设定
- 在这篇博客中,我们只使用文本流。
- 在例子中,我们有时候会使用顶层
await
,在这种情况下,我们假设我们是在一个模块的内部或者async
函数的内部。 - 无论哪一种换行符,我们两种都支持:
- Unix:
'\n'
(LF) - Windows:
'\r\n'
(CRLF) 当前平台的换行符可以通过os
模块的 the constant EOL 访问到。
- Unix:
可读流
创建可读流
从文件创建可读流
可以使用 fs.createReadStream() 创建可读流:
import * as fs from 'fs'
const readableStream = fs.createReadStream('tmp/test.txt', {
encoding: 'utf8',
})
console(await readableToString(readableStream))
// 'This is a test!\n'
Readable.from()
:从迭代器创建可读流
静态方法 Readable.from(iterable, options?) 创建可读流,它保存数据在 iterable. iterable
的内容可以是同步迭代或异步迭代。参数 options
是可选的,可以使用标准的文本编码。
import * as stream from 'stream'
function* gen() {
yield 'One line\n'
yield 'Another line\n'
}
const readableStream = stream.Readable.from(gen(), { encoding: 'utf8' })
assert.equal(await readableToString(readableStream), 'One line\nAnother line\n')
从字符串创建可读流
Readable.from()
接受任何迭代,并且除此外也可以转换字符串为流:
import { Readable } from 'stream'
const str = 'Some text!'
const readable = Readable.from(str, { encoding: 'utf8' })
assert.equal(await readableToString(readable), 'Some text!')
在 moment,Readable.from()
将字符串与其它任何可迭代对象一样对待,因此在其代码点上进行迭代。从性能角度来看,这不是理想的选择,但是对于大多数用例来说应该可以。我希望 Readable.from()
经常与字符串一起使用,因此将来可能会进行优化。
通过 for-await-of
读取可读流
每一个可读流都是异步可迭代的,这意味着我们可以使用 for-await-of
循环读取它的内容:
import * as fs from 'fs'
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk)
}
}
const readable = fs.createReadStream('tmp/test.txt', { encoding: 'utf8' })
logChunks(readable)
// Output:
// 'This is a test!\n'
收集可读流的内容为字符串
下面的函数是一个简单的实现,我们可以在这篇博客文章的开头看到它:
import { Readable } from 'stream'
async function readableToString2(readable) {
let result = ''
for await (const chunk of readable) {
result += chunk
}
return result
}
const readable = Readable.from('Good morning!', { encoding: 'utf8' })
assert.equal(await readableToString2(readable), 'Good morning!')
注意,在这种情况下,我们不得不使用 async
函数,因为我们想要返回 Promise
。
通过 async generators
(异步生成器)转换可读流
异步迭代提供一种优雅的替代方案,可以转换流以分多个步骤处理流数据:
- 输入一个可读流。
- 第一个转换由异步生成器执行,该异步生成器遍历可读流并按其认为合适的方式产生。
- 可选的是,我们可以通过使用更多的异步生成器来进一步转换。
- 最后,我们有几个选项来处理最后一个生成器返回的异步可迭代:
- 我们可以通过
Readable.from()
将其转换为可读流(以后可以通过管道将其写入可写流)。 - 我们可以使用异步函数来处理它。
- 其它。
- 我们可以通过
总之,这些处理管道包括几部分:
可读的
→ 第一个异步生成器[→ … → 最后一个异步生成器]
→ 可读或者异步方法\
在下面的例子中,最后一步是执行异步函数 logLines()
,该函数在迭代器中输出项目到控制台。
import { Readable } from 'stream'
/**
* @param chunkIterable An asynchronous or synchronous iterable
* over “chunks” (arbitrary strings)
* @returns An asynchronous iterable over “lines”
* (strings with at most one newline that always appears at the end)
*/
async function* chunksToLines(chunkIterable) {
let previous = ''
for await (const chunk of chunkIterable) {
previous += chunk
while (true) {
const eolIndex = previous.indexOf('\n')
if (eolIndex < 0) break
// line includes the EOL
const line = previous.slice(0, eolIndex + 1)
yield line
previous = previous.slice(eolIndex + 1)
}
}
if (previous.length > 0) {
yield previous
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1
for await (const line of lineIterable) {
yield `${lineNumber} ${line}`
lineNumber++
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line)
}
}
const chunks = Readable.from('Text with\nmultiple\nlines.\n', {
encoding: 'utf8',
})
logLines(numberLines(chunksToLines(chunks)))
// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'
可写流
为文件创建可写流
我们使用 fs.createWriteStream() 创建可写流:
const writableStream = fs.createWriteStream('tmp/log.txt', {
encoding: 'utf8',
})
向可写流写数据
在这一节,我们了解三种向可写流写数据的方法:
- 通过可写流的方法
.write()
直接地向可写流写数据。 - 使用可读流的方法
.pipe
连接到可写流。 - 使用
stream
模块函数pipeline()
连接可读流到可写流。
为了验证这些方法,我们以三种不同的方式实现相同功能函数 writeIterableToFile()
。
在异步函数中向可写流写数据
import { once } from 'events'
import * as fs from 'fs'
import * as stream from 'stream'
import * as util from 'util'
const finished = util.promisify(stream.finished) // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, { encoding: 'utf8' })
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
// (B)
// Handle backpressure
await once(writable, 'drain')
}
}
writable.end() // (C)
// Wait until done. Throws if there are errors.
await finished(writable)
}
await writeIterableToFile(['One', ' line of text.\n'], 'tmp/log.txt')
assert.equal(
fs.readFileSync('tmp/log.txt', { encoding: 'utf8' }),
'One line of text.\n'
)
stream.finished()
的默认版本是基于回调函数的,但是该函数可以通过 util.promisify()
转换为 Promise 版本(行 A)。
我们可以使用下面两种模式:
- 当处理反面时(行 B)写入可写流:
if (!writable.write(chunk)) { await once(writable, 'drain') }
- 关闭可写流并等待写入结束(行 C):
writable.end() await finished(writable)
pipeline(readable, writable)
import * as fs from 'fs'
import * as stream from 'stream'
const pipeline = util.promisify(stream.pipeline)
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(iterable, { encoding: 'utf8' })
const writable = fs.createWriteStream(filePath)
await pipeline(readable, writable) // (A)
}
await writeIterableToFile(['One', ' line of text.\n'], 'tmp/log.txt')
// ···
我们使用下面的模式(行 A):
await pipeline(readable, writable)
也可以使用 Readable.prototype.pipe()
,但该方法有一个风险(如果可读流抛出错误,可写流不会默认关闭)。方法 stream.pipeline()
没有这个风险。
流相关功能
os
模块:
-
const EOL: string
(Node.js 0.7.8+)用于获取当前操作系统平台行结束分隔符
-
Buffer.isEncoding(encoding: string): boolean
(Node.js 0.9.1+)如果
encoding
编码名称是 Node.js 支持的文本编码格式,该方法返回true
。支持的编码格式有:'utf8'
'utf16le'
'ascii'
'latin1
'base64'
'hex'
(每个字节为两个十六进制字符)
stream
模块:
-
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any>
(Nodejs 10.0.0+)可读流是异步迭代。举例来说,可以在 async 函数和异步迭代器中使用
for-await-of
循环迭代它们。 -
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void>
(Node.js 10.0.0+)该方法在可读流/可写流结束或发生错误时返回 Promise。 这个 Promise 版本是这样创建的:
const finished = util.promisify(stream.finished)
-
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void>
(Node.js 10.0.0+)流之间的管道。当管道完成或发生错误时,将返回 Promise。 此 Promise 版本是这样创建:
const pipeline = util.promisify(stream.pipeline)
-
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable
(Node.js 12.3.0+)转换迭代为可读流。
interface ReadableOptions { highWaterMark?: number encoding?: string objectMode?: boolean read?: (this: Readable, size: number) => void destroy?: ( this: Readable, error: Error | null, callback: (error: Error | null) => void ) => void autoDestroy?: boolean }
这些选项与
Readable
构造函数效果相同,更多介绍文档。
fs
模块:
-
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream
(Node.js 2.3.0+)创建可读流。可以接受更多参数。
-
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream
(Node.js 2.3.0+)使用参数
.flags
可以指定是要写入还是要追加,以及如果文件存在或不存在会发生什么。可以接受更多参数。
本章节的静态类型信息基于 Definitely Typed
扩展阅读
- Node.js 文档章节 “Streams Compatibility with Async Generators and Async Iterators”
- “JavaScript for impatient programmers” 章节 “Async functions”
- “JavaScript for impatient programmers” 章节 “Asynchronous iteration”
本文由 吳文俊 翻译,原文地址 Easier Node.js streams via async iteration