经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » JS/JS库/框架 » Node.js » 查看文章
深入浅出了解Node.js Streams
来源:jb51  时间:2019/5/28 8:54:56  对本文有异议

什么是流(steams)

流(stream)是 Node.js 中处理流式数据的抽象接口。

Streams 不是 Node.js 独有的概念。它们是几十年前在 Unix 操作系统中引入的。

它们能够以一种有效的方式来处理文件的读、写,网络通信或任何类型的端到端信息交换。
例如,当你编写了一段程序用来读取文件时,传统的方法是将文件从头到尾读入内存,然后再进行处理。而使用流的话,你就可以逐块读取它,处理其内容而不将其全部保存在内存中。
以如下代码为例

  1. const fs = require('fs');
  2. const rs = fs.createReadStream('test.md');
  3. let data = '';
  4. rs.on("data", function (chunk) {
  5. data += chunk;
  6. });
  7. rs.on("end", function() {
  8. console.log(data);
  9. });

利用 createReadStream 创建一个读取数据的流,来读取 test.md 文件的内容,此时监听 data 事件,它是在当流将数据块传送给消费者后触发。并在对应的 eventHandler 中,拼接 chunk。在 end 事件中,打印到终端上。
之前说流,可以逐块读取文件内容,那么这个块,也就是 chunk 是什么?
一般情况下是 Buffer,修改 data 事件的 eventHandler 来验证下

  1. rs.on("data", function (chunk) {
  2. console.log("chunk", Buffer.isBuffer(chunk)) // log true
  3. data += chunk;
  4. });

流的工作方式可以具体的表述为,在内存中准备一段 Buffer,然后在 fs.read() 读取时逐步从磁盘中将字节复制到 Buffer 中。

为什么要使用 Stream

利用 Stream 来处理数据,主要是因为它的两个优点:

内存效率:在够处理数据之前,不需要占用大量内存;

时间效率:处理数据花费的时间更少,因为流是逐块来处理数据,而不是等到整个数据有效负载才启动。

首先内存效率,与 fs.readFile 这种会缓冲整个文件相比,流式传输充分地利用 Buffer (超过 8kb)不受 V8 内存控制的特点,利用堆外内存完成高效地传输。相关验证可以参考这篇博文,地址。
时间效率,与 fs.FileSync 相比,有些优势,但是与异步的 fs.readFile 相比,优势不大。

Node.js 中 Stream 的使用

首先用一张图来了解下 Node.js 中有哪些内置的 Stream 接口

图中提供了一些 Node.js 原生的流的示例,有些是可读、写的流。 也有一些是可读写的流,如 TCP sockets、zlib 以及 crypto。

特别注意: 流的读、写与环境是密切相关的。例如 HTTP 响应在客户端上的可读流,但它是服务器上的可写流。同时还需要注意,stdio streams(stdin,stdout,stderr) 在子进程上是相反的流。

使用一个例子来展示流的使用

首先利用如下脚本创建一个比较大的文件(大概 430 MB)

  1. const fs = require('fs');
  2. const file = fs.createWriteStream('test.md');
  3. for(let i=0; i<= 1e6; i++) {
  4. file.write('hello world.\n');
  5. }
  6. file.end();

在当前目录下,启动 http 服务

  1. const http = require('http')
  2. const fs = require('fs')
  3. const server = http.createServer(function (req, res) {
  4. fs.readFile(__dirname + '/test.md', (err, data) => {
  5. res.end(data)
  6. })
  7. })
  8. server.listen(3000)
  9.  

得到的结果,如图

  1. const http = require('http')
  2. const fs = require('fs')
  3. const server = http.createServer((req, res) => {
  4. const stream = fs.createReadStream(__dirname + '/test.md')
  5. stream.pipe(res)
  6. })
  7. server.listen(3000)

时间减少了 2s 多。这可以解释为,在读取文件内容,并且不需要改变内容的场景下,流能够完成只读取 buffer,然后直接传输,不做额外的转换,避免损耗,提高性能。
上述代码中,应用了 stream.pipe(...) 。它主要是对流进行链式地管道操作,例如

  1. src.pipe(dest1).pipe(dest2)

这样数据流会被自动管理。

如果可读流发生错误,目标可写流不会自动关闭,需要手动关闭所有流以避免内存泄漏。

通常,当你使用 pipe 方法时,就不需要使用事件,但如果场景需要以更灵活、自定义的方式使用流,那么就要考虑事件。

Stream events

在上述例子中,我们使用了可读流的 data 、end 事件来控制文件的读取,它本质上与 pipe 方法相同,例如

  1. # readable.pipe(writable)
  2. readable.on('data', (chunk) => {
  3. writable.write(chunk);
  4. });
  5. readable.on('end', () => {
  6. writable.end();
  7. });

只不过,使用 event 会更加灵活,可控。

图中简单罗列了可读流、可写流的相关事件、方法,其中最重要的是

可读流:

  • data 事件:每当流将一大块数据传递时,就会触发;
  • end 事件:当没有更多数据要从流发出时,就会触发。

可写流:

  • drain 事件:当可以继续写入数据到流时会触发事件;
  • finish 事件:处理完全部数据块之后触发。

流的不同类型

除了上面涉及到的可读、写流之后,还有 Duplex、Transform 两类:

  • Readable :可以接收数据,但不能向其发送数据。当你将数据推送到可读流中时,它会被缓冲,直到消费者开始读取数据;
  • writable :可以发送数据,但不能从中接收;
  • Duplex :即可读也可写;
  • Tranform :与 Duplex 一样是可写又可读的,但它的输出与输入是相关联的。

如何创建一个可读流

这里只做简单介绍,具体见 stream module

  1. const Stream = require('stream')
  2. const readableStream = new Stream.Readable()
  3. readableStream._read = (size) => {
  4. console.log('read', size)
  5. }

利用 Stream 模块初始化一个可读流,然后向其中发送数据

  1. readableStream.push('hi!')
  2. readableStream.push('ho!')

如何创建一个可写流

为了创建可写流,需要扩展了基本的 Writable 对象,并实现了它的 _write 方法。

  1. const Stream = require('stream')
  2. const writableStream = new Stream.Writable()

实现 _write 方法:

  1. writableStream._write = (chunk, encoding, next) => {
  2. console.log(chunk.toString())
  3. next()
  4. }

结合上述例子实现

利用 readableStream 读入数据,并输出到 writableStream

  1. const Stream = require('stream')
  2. const readableStream = new Stream.Readable()
  3. readableStream._read = (size) => {
  4. console.log('read', size)
  5. }
  6. const writableStream = new Stream.Writable()
  7. writableStream._write = (chunk, encoding, next) => {
  8. console.log('write', chunk.toString())
  9. next()
  10. }
  11. readableStream.pipe(writableStream)
  12. readableStream.push('hi!')
  13. readableStream.push('ho!')
  14. /*
  15. log:
  16. read 16384
  17. write hi!
  18. write ho!
  19. */
  20.  

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持w3xue。

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号