NodeJS的数据流stream之高级用法

在node中,只要涉及到文件IO的场景一般都会涉及到一个类-Stream。Stream是对IO设备的抽象表示,其在JAVA中也有涉及,主要体现在四个类-InputStream、Reader、OutputStream、Writer,其中InputStream和OutputStream类针对字节数据进行读写;Reader和Writer针对字符数据读写。同时Java中有多种针对这四种类型的扩展类,如节点流、缓冲流和转换流等。比较而言,node中Stream类型也和Java中的类似,同样提供了支持字节和字符读写的Readable和Writeable类,也存在转换流Transform类,本文主要分析node中Readable和Writeable的实现机制,从底层的角度更好的理解Readable和Writeable实现机制,解读在读写过程中发生的一些重要事件。

Readable类

Readable对应于Java中的InputStream和Reader两个类,针对Readable设置encode编码可完成内部数据由Buffer到字符的转换。Readable Stream有两种模式,即flowing和paused模式。这两种模式对于用户而言区别在于是否需要手动调用Readable.prototype.read(n),读取缓冲区的数据。查询node API文档可知触发flowing模式有三种方式:

  • 侦听data事件
  • readable.resume()
  • readable.pipe()
    而触发paused模式同样有几种方式:
  • 移除data事件
  • readable.pause()
  • readable.unpipe()
    可能这样讲解大家仍不明白Readable Stream这两种模式的区别,那么下文从更深层次分析两种模式的机制。
深入Readable的实现

Readable继承EventEmitter,大家也都知道。但是相信大家应该不怎么熟悉Readable的实例属性**_readableState**。该属性是一个ReadableState类型的对象,保存了Readable实例的重要信息,如读取模式(是否为对象模式)、highWaterMask(缓冲区存放的最大字节数)、缓冲区、flowing模式等。在Readable的实现中,处处使用ReadableState对象记录当前读取状态,并设置缓冲区保证读操作的顺利进行。

~~~

1. 读写文件 #

  • readFile和readFileSync是先将文件完整地读入缓存区,再从该缓存区中读取文件
  • writeFile和writeSync时都是将文件完整地读入缓存区,然后再从缓存区中一次性写入到文件中

node把读写的文件视为一个整体,为其分配缓存区并一次性将文件内容读取到缓存区中,在此之间node不能做任何工作。
如果文件特别大很容易带来内存溢出的问题

var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
    fs.readFile('./data.txt', function (err, data) {
        res.end(data);
    });
});
server.listen(8080);

2. 流 #

在有些时候我们并不关心文件内容,只是关注是否读到数据,以及读到数据时的处理,此时可以用流
流是一组有序的,有起点和终点的字节数据传输手段
网络中传输数据时,总是先将对象包含的数据转成流数据也就是字节数据,再通过流的传输,到达目的地时再转换为原始的数据

2.1 可读流 #

2.1.1 stream.Readable #

用了stream.Readable接口的对象来将对象数据读取为流数据

  • 这些对象都继承了EventEmitter的实例对象,在读取数据的过程中,将可能触发各种事件
  • 可以使用flowing和非flowing两种模式来读取数据,当使用flowing模式时,将使用操作系统内部IO机制来读取数据,速度最快
  • 当使用非flowing模式时,你必须显式调用对象的read方法来读取数据

2.1.2 触发的事件 #

readable可以从流中读取数据时触发。如果指定了回调函数,将迫使操作系统先读入缓存区,再从缓存区中读取数据
当缓存区中的对象全部读出时,且可以继续读取数据时,触发一个新的readable事件

  • data 参数值为已读取到的一个buffer对象或字符串,如果指定data事件回调,使用flowing模式来读取数据,速度快。
  • end 读完所有的数据时触发,不再触发data事件
  • error 产生错误时触发
  • close 读取流对象关闭触发

2.1.3 读流的方法 #

  • read 读取数据
  • setEncoidng 提定编码
  • pause 停止触发data事件
  • resume 恢复触发data事件
  • pipe设置数据通道,取出流中的数据并输出到另一端指向的目标对象
  • unpipe 取消在pipe中设置的通道
  • unshift

2.2 可写流 #

2.2.1 stream.Writable #

2.2.2 事件 #

  • drain 当写入数据write返回 false之后触发,表示操作系统缓存区中的数据已经输出到目标对象中,可以继续向缓存区写入数据
  • finish 当end方法被调用且数据被全部写入操作系统缓存区时触发
  • pipe 读取数据对象的pipe方法被调用时触发
  • unpipe 读取对象的unpipe
  • error

2.2.3 方法 #

  • write 写入数据
  • end 没有数据写入流时触发,迫使操作系统将剩余的数据立即写入目标对象。调用后不能再写入

3. createReadStream #

使用readstream对象读取文件

  • fs.createReadStream(path,[options])
    • flags 默认为 r
    • encoding 编码,默认为null
    • autoClose是否读取完毕或出错时关闭文件描述符,默认为true.否则 必须调用close来关闭
    • start 整数值,指定文件的开始读取位置
    • end 整数值,指定文件的结束读取位置

流动模式

var file = fs.createReadStream('./msg.txt',{start:3,end:11});//要注意end是包括这个位置的,
file.on('open',function(){
    console.log('文件打开了');
})

file.on('data',function(data){
    console.log('数据拿到了'+data.length);
})
file.on('end',function(){
    console.log('全部读取完毕');
})

file.on('close',function(){
    console.log('文件关闭了');
})

file.on('error',function(err){
    console.log('出错了'+err);
})

//使用pause方法暂停data事件的触发,这个意味着停止文件的读取,而已经被读取到操作系统缓存区中的数据也将被
//保存在操作系统缓存区。

var readStream = fs.createReadStream('./msg.txt');
readStream.pause();
readStream.on('data',function(data){
    console.log(data);
})
setTimeout(function(){
    readStream.resume();
},5000)

非流动式读取

var rs = fs.createReadStream('./128.txt');//要注意end是包括这个位置的,
var arr = [];
rs.on('readable',function(){
    var data;
    while(null != (data = rs.read())){
        //console.log('read:%d',data.length);
        arr.push(data);
    }
}).on('end',function(){
    var b = Buffer.concat(arr);
    //console.log(b.toString());
})

4. writeStream #

使用writeStream对象写入文件,可以创建一个将流数据写入文件中的writestream对象

  • fs.createWriteStream(path,[options]);

    • path将要写入的文件路径
    • flags 指定对文件采取什么操作 ‘w’
    • encoding 指定编码
    • start 使用整数值来指定文件开始写入的位置,如果要追加数据,需要将flags = ‘a’

    writeable对象有一个write方法,用于将流数据写入到目标对象中。
    write(chunk,[encoding],callback());

      chunk要写入的数据
      encoding 编写格式
      callback指定当数据写入完毕后的回调函数
    

    它有一个返回值,操作系统缓存区已满时返回false,未满时返回true
    它还有一个end方法,当没有数据再被写入时可调用该方法关闭文件, 这将迫使操作系统缓存区中的剩余数据立即被写入
    到文件中。可以在回调中指定关闭之前的处理。

writeable.end(chunk,[encoding],[callback])
writeStream还有一个byteswritten属性,表示已经在文件中写入的数据的字节数

var fs = require('fs');
var src = fs.createReadStream('./msg.txt');
var desc = fs.createWriteStream('./anotherMsg.txt');
desc.on('open',function(){
    console.log('需要写入的文件已经被打开');
});
src.on('data',function(data){
    desc.write(data);
})

src.on('end',function(){
    desc.end('再见',function(){
        console.log('文件全部写入完毕');
        console.log('共写入%d字节数据',desc.bytesWritten);
    });
})
var stream = require('stream');
var util = require('util');
var fs= require('fs');
//构建一个自定义的读流
function ZfReadStream(){
    stream.Readable.call(this);
}
util.inherits(ZfRadStream,stream.Readable);
//实现_read方法
ZfRadStream.prototype._read = function(){
    var content = fs.readFileSync('./qqq.txt')
    this.push(content);
    this.push(null);//表示结束
}

function Zf1RadStream(){
    stream.Readable.call(this);
}
util.inherits(Zf1RadStream,stream.Readable);

Zf1RadStream.prototype._read = function(){
    var content = fs.readFileSync('./qqq.txt')
    this.push(content);
    this.push(null);
}

var zfRadStream = new ZfRadStream();
zfRadStream.on('data',function(data){
    fs.writeFile('./ttt.jpg',data);
}).on('end',function(){
    console.log('end');
});
var stream = require('stream');
var util = require('util');
var fs= require('fs');
//自定义的写流
function ZfWriteStream(){
    stream.Writable.call(this);
}
util.inherits(ZfWriteStream,stream.Writable);

ZfWriteStream.prototype._write = function(data,encoding,callback){
 console.log('zf1'+data.toString('utf8'));
    callback();
}

function Zf2WriteStream(){
    stream.Writable.call(this);
}
util.inherits(Zf2WriteStream,stream.Writable);
Zf2WriteStream.prototype = new stream.Writable();
Zf2WriteStream.prototype._write = function(data,encoding,callback){
    console.log('zf2'+data.toString('utf8'));
    callback();
}

var zfWriteStream = new ZfWriteStream();
zfWriteStream.write(new Buffer('珠峰'),function(){
    console.log('写入成功');
});
var zf2WriteStream = new Zf2WriteStream();
zf2WriteStream.write(new Buffer('珠峰'),function(){
    console.log('写入成功');
});
/**
 *双工流,能读能写
 */

var Duplex = require('stream').Duplex;
var util = require('util');
var fs = require('fs');
function Change(){
    Duplex.call(this);
}
util.inherits(Change,Duplex);
Change.prototype._read = function(){

}

Change.prototype._write = function(data,encoding,cb){
    for(var i=0;i<data.length;i++){
        data[i] = 255-data[i];
    }
    this.push(data);
    this.push(null);
}

var c = new Change();
fs.createReadStream('./qqqt.txt').pipe(c)
    .pipe(fs.createWriteStream('qqqtt.txt'));
/**
 * Stream.Transform
 */
var Transform = require('stream').Transform;
var util = require('util');
var fs = require('fs');
function Tran(){
    Transform.call(this);
}
util.inherits(Tran,Transform);
Tran.prototype._transform = function(chunk, encoding, cb){
    console.log(chunk);
    for(var i=0;i<chunk.length;i++){
        chunk[i] = 255-chunk[i];
    }
    this.push(chunk);
    this.push(null);
}
var c = new Tran();
fs.createReadStream('./qqq2.txt').pipe(c)
    .pipe(fs.createWriteStream('qqq3.txt'));

~

扩展:

二、Readable流

有的前传util模块从其他对象继承的功能的了解,Readable就很好理解了.主要它包含以下方法和事件。

1.事件:

readable:在数据块可以从流中读取的时候发出。

data:类似readable,不同之处在于,当数据的事件处理程序被连接时,流被转变成流动的模式,并且数据处理程序被连续的调用,直到所有数据都被用尽

end:当数据不再被提供时由流发出

close:当底层资源,如文件,已关闭时发出。

error:在接收数据中出错是发出。

2.方法:

read([size]):从流中读数据.数据可以是String、Buffer、null(下面代码会有),当指定size,那么只读仅限于那个字节数

setEncoding(encoding):设置read()请求读取返回String时使用的编码

pause():暂停从该对象发出的data事件

resume():恢复从该对象发出的data事件

pipe(destination,[options]):把这个流的输出传输到一个由deatination(目的地)指定的Writable流对象。options是一个js对象.例如:{end:true}当Readable结束时就结束Writable目的地。

unpipe([destination]):从Writale目的地断开这一对象。

三、Writable流

有读就会有写,毕竟是可逆的,它和readable一样也有一些事件和方法

1.方法

write(chunk,[encoding],[callback]):将数据写入流。chunk(数据块)中包含要写入的数据,encoding指定字符串的编码,callback指定当数据已经完全刷新时执行的一个回调函数。如果成功写入,write()返回true.

end([chunk],[encoding],[callback]):与write()相同,它把Writable对象设为不再接受数据的状态,并发送finish事件。

2.事件

drain:在write()调用返回false后,当准备好开始写更多数据时,发出此事件通知监视器。

finish:当end()在Writable对象上调用,所以数据被刷新,并不会有更多的数据被接受时触发

pipe:当pipe()方法在Readable流上调用,已添加此writable为目的地时发出

unpipe:当unpipe()方法被调用,以删除Writable为目的地时发出。

~

StreamNode.js中非常重要的一个模块,应用广泛。一个流是一个具备了可读、可写或既可读又可写能力的接口,通过这些接口,我们可以和磁盘文件、套接字、HTTP请求来交互,实现数据从一个地方流动到另一个地方的功能。

所有的流都实现了EventEmitter的接口,具备事件能力,通过发射事件来反馈流的状态。比如有错误发生时会发射“error”事件,有数据可被读取时发射“data”事件。这样我们就可以注册监听器来处理某个事件,达到我们的目的。

node.js定义了Readable、Writable、Duplex、Transform四种流,Node.js有各种各样的模块,分别实现了这些流,我们挑出来一一看一看他们的用法。当然,我们也可以实现自己的流,可以参考Stream的文档或我们即将提到的这些Node.js里的实现。

~

未经允许不得转载:WEB前端开发 » NodeJS的数据流stream之高级用法

赞 (1)