NodeJs中的数据了流/Stream的详解

Stream 是一个抽象接口,Node 中有很多对象实现了这个接口。例如,对http 服务器发起请求的request 对象就是一个 Stream,还有stdout(标准输出)。

Node.js,Stream 有四种流类型:

  • Readable – 可读操作。

  • Writable – 可写操作。

  • Duplex – 可读可写操作.

  • Transform – 操作被写入数据,然后读出结果。

所有的 Stream 对象都是 EventEmitter 的实例。常用的事件有:

  • data – 当有数据可读时触发。

  • end – 没有更多的数据可读时触发。

  • error – 在接收和写入过程中发生错误时触发。

  • finish – 所有数据已被写入到底层系统时触发。

本教程会为大家介绍常用的流操作。

nodejs的fs模块并没有提供一个copy的方法,但我们可以很容易的实现一个,比如:

var source = fs.readFileSync('/path/to/source', {encoding: 'utf8'});
fs.writeFileSync('/path/to/dest', source);

这种方式是把文件内容全部读入内存,然后再写入文件,对于小型的文本文件,这没有多大问题,比如grunt-file-copy就是这样实现的。但是对于体积较大的二进制文件,比如音频、视频文件,动辄几个GB大小,如果使用这种方法,很容易使内存“爆仓”。理想的方法应该是读一部分,写一部分,不管文件有多大,只要时间允许,总会处理完成,这里就需要用到流的概念。

~~~

流的概念 #

流是一组有序的,有起点和终点的字节数据传输手段
它不关心文件的整体内容,只关注是否从文件中读到了数据,以及读到数据之后的处理
流是一个抽象接口,被 Node 中的很多对象所实现。比如HTTP 服务器request和response对象都是流。

一.  为什么需要流(Stream)?

  举个例子,如果要读取一个文件,一次性读取需要占用大内存,是不可取的。因此就有了流,用流会很方便,可以帮我们避免这样的问题,调用其接口不用关心底层如何实现。

二. 什么是流(Stream)?

  流(Stream)是可读,可写或双工的。可以通过require(‘stream’)加载流的基类,其中包括四类流, Readable 流、Writable 流、Duplex 流和Transform 流的基类。

  另外如果觉得上述四类基类流不能满足需求,可以编写自己的扩充类流。像我们Team现在正做的Node项目,就重写了Transform类以供使用。

  按照官方的API文档,步骤如下:

  1. 在您的子类中扩充适合的父类。(例如util.inherits(MyTransform, Transform); )
  2. 在您的构造函数中调用父类的构造函数,以确保内部的机制被正确初始化。
  3. 实现一个或多个特定的方法,

1.可读流createReadStream #

实现了stream.Readable接口的对象,将对象数据读取为流数据,当监听data事件后,开始发射数据

fs.createReadStream = function(path, options) {
  return new ReadStream(path, options);
};
util.inherits(ReadStream, Readable);

1.1 创建可读流 #

var rs = fs.createReadStream(path,[options]);
  1. path读取文件的路径
  2. options
    • flags打开文件要做的操作,默认为’r’
    • encoding默认为null
    • start开始读取的索引位置
    • end结束读取的索引位置
    • highWaterMark读取缓存区默认的大小64kb

如果指定utf8编码highWaterMark要大于3个字节

1.1.1 监听data事件 #

流切换到流动模式,数据会被尽可能快的读出

rs.on('data', function (data) {
    console.log(data);
});

1.1.2 监听end事件 #

该事件会在读完数据后被触发

rs.on('end', function () {
    console.log('读取完成');
});

1.1.3 监听error事件 #

rs.on('error', function (err) {
    console.log(err);
});

1.1.4 设置编码 #

与指定{encoding:’utf8′}效果相同,设置编码

rs.setEncoding('utf8');

1.1.5暂停触发data恢复触发data #

通过pause()方法和resume()方法

rs.on('data', function (data) {
    rs.pause();
    console.log(data);
});
setTimeout(function () {
    rs.resume();
},2000);

2.可写流createWriteStream #

实现了stream.Writable接口的对象来将流数据写入到对象中

fs.createWriteStream = function(path, options) {
  return new WriteStream(path, options);
};

util.inherits(WriteStream, Writable);

2.1 创建可写流 #

var ws = fs.createWriteStream(path,[options]);
  1. path写入的文件路径
  2. options
    • flags打开文件要做的操作,默认为’w’
    • encoding默认为utf8
    • highWaterMark写入缓存区的默认大小16kb

2.1.1 write方法 #

ws.write(chunk,[encoding],[callback]);
  1. chunk写入的数据buffer/string
  2. encoding编码格式chunk为字符串时有用,可选
  3. callback 写入成功后的回调

返回值为布尔值,系统缓存区满时为false,未满时为true

2.1.2 end方法 #

ws.end(chunk,[encoding],[callback]);

调用该方法关闭文件,迫使系统缓存区的数据立即写入文件中。不能再次写入

2.1.3 drain方法 #

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt',{highWaterMark:5});
var i = 0;
function write(){
    var flag = true;
    while (flag&&i<10){
        flag = ws.write(''+i++);
    }
}
write();
ws.on('drain', function () {
    write();
});

3.pipe方法 #

3.1 pipe方法的原理 #

var fs = require('fs');
var ws = fs.createWriteStream('./2.txt');
var rs = fs.createReadStream('./1.txt');
rs.on('data', function (data) {
    var flag = ws.write(data);
    if(!flag)
    rs.pause();
});
ws.on('drain', function () {
    rs.resume();
});
rs.on('end', function () {
    ws.end();
});

3.2 pipe用法 #

readStream.pipe(writeStream);
var from = fs.createReadStream('./1.txt');
var to = fs.createWriteStream('./2.txt');
from.pipe(to);

将数据的滞留量限制到一个可接受的水平,以使得不同速度的来源和目标不会淹没可用内存。

~~

理解网络流

一个TCP连接既是可读流,又是可写流;而Http连接则不同,一个http request对象是可读流,而http response对象则是可写流。

理解客户端缓慢的问题

首先,服务器读取文件的速度是很快的,然而客户端写入的速度确不能跟服务器读取的速度一致,这就对导致服务器

读取数据的时候无法及时传送到客户端,就会采取缓存策略。这会带来一个严重的问题,服务器内存爆满。

为了解决这个问题,我们需要监听客户端写入的事件是否正常,如果可以正常写入则服务器端则继续读取数据并传送。

在上面提到了流的传输过程是可以中断和继续的,并且有drain事件可以监听的到,我们可以利用这些特性来优化我们的

读写流的过程。一般情况下,如果我们不处理这种问题的话,代码像下面这样是很容易出现问题的:

1 require('http').createServer(function(req, res) {
2     var rs = fs.createReadStream('/path/to/big/file');
3     rs.on('data', function(data) {
4         res.write(data);
5     });
6     rs.on('end', function() {
7         res.end();
8     });
9 }).listen(8080);

代码中没有进行任何中断和持续的处理,如果写入流的过程能够正常的话,write()方法能够返回true,否则会返回false; 我们可以通过这样的API来优化我们的代码:

 1 require('http').createServer( function(req, res) {
 2     var rs = fs.createReadStream('/path/to/big/file');
 3     rs.on('data', function(data) {
 4         if (!res.write(data)) {
 5             rs.pause();
 6         }
 7     });
 8     res.on('drain', function() {
 9         rs.resume();
10     });
11     rs.on('end', function() {
12         res.end();
13     });
14 }).listen(8080);

上面的过程在读写流的时候是很常见的,因此Node给出了一个通用的方法,就是pipe();该方法解决了上述所提到的这些问题。

1 require('http').createServer(function(req, res) {
2     var rs = fs.createReadStream('/path/to/big/file');
3     rs.pipe(res);
4 }).listen(8080);

默认情况下,数据传送完成后就会调用end()方法,如果你希望调用自定义的end的话,则可以添加一个参数给他:

1 require('http').createServer(function(req, res) {
2     var rs = fs.createReadStream('/path/to/big/file');
3     rs.pipe(res, { end: false });
4     rs.on('end', function() {
5         res.write("And that's all, folks!");
6         res.end();
7     });
8 }).listen(8080);

~~

未经允许不得转载:WEB前端开发 » NodeJs中的数据了流/Stream的详解

赞 (0)