Simple examples of Node.js Stream Module
Stream Data with Node.js Stream Module
Data Stream are memory structure that readable , writable or both. A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. All streams are instances of EventEmitter
Readable streams are designed to provide a mechanism to easily read data coming into an application from another source.
Examples of readable streams include:
http responses, on the client
http requests, on the server
child process stdout and stderr
Readable streams provide the read([ size]) method to read data, where size specifies the number of bytes to read from the stream. read() can return a String object, Buffer object or null. Readable
To implement your own custom Readable stream object, you need to first inherit the functionality for Readable streams.
var stream = require('stream');
var util= require('util');
util.inherits (Answers, stream.Readable);
function Answers (opt) {
stream.Readable.call(this, opt);
this.quotes = ["yes","no","maybe"];
this._index = 0;
}
Answers.prototype._read = function() {
if (this._index > this.quotes.length) {
this.push(null);
} else{
this.push(this.quotes[this._index]);
this._index += 1;
};
};
var r = new Answers();
console.log("Direct read: " + r.read().toString());
r.on('data',function(data) {
console.log("Callback read: " + data.toString());
});
r.on('end',function (data) {
console.log("No more answers");
});
Writable streams provide the write( chunk, [encoding], [callback]) method to write data into the stream, where chunk contains the data to write; encoding specifies the string encoding, if necessary; and callback specifies a callback function to execute when the data has been fully flushed. The write() function returns true if the data was written successfully.
console.log("=========================Writable ");
util.inherits(Writer,stream.Writable);
function Writer (opt) {
stream.Writable.call(this,opt);
this.data = new Array();
}
Writer.prototype._write = function (data, encoding, callback) {
this.data.push(data.toString('utf8'));
console.log("Adding: " + data);
callback();
}
var w = new Writer();
for (var i = 0; i < 5 ;i++) {
w.write("Item" + i ,'utf8');
}
w.end("ItemLast");
console.log(w.data);
A Duplex stream is a stream that combines Readable and Writable functionality.You need to implement both a _read( size) and _write( data, encoding, callback) method when prototyping your Duplex class.
console.log('=======================Duplex');
util.inherits(Duplexer,stream.Duplex);
function Duplexer (opt) {
stream.Duplex.call(this,opt);
this.data = [];
}
Duplexer.prototype._read=function readItem (size) {
var chunk = this.data.shift();
if (chunk == "stop") {
this.push(null);
} else{
if(chunk){
this.push(chunk);
}else {
setTimeout(readItem.bind(this), 500, size);
}
}
};
Duplexer.prototype._write = function (data, encoding, callback) {
this.data.push(data);
callback();
};
var d = new Duplexer();
d.on('data', function(chunk) {
console.log('read : ',chunk.toString());
});
d.on('end', function() {
console.log('Message Complete');
});
d.write("I think, ");
d.write("therefore ");
d.write("I am. ");
d.write("Rene Descartes ");
d.write("stop");
A Transform stream extends the Duplex stream but modifies the data between the Writable stream and the Readable stream.
A major difference between the Duplex and the Transform streams is that for Transform streams, you do not need to implement the _read() and _write() prototype methods. These are provided as pass-through functions. Instead, you implement the _transform( chunk, encoding, callback) and _flush( callback) methods. The _transform() method should accept the data from write() requests, modify it, and push out the modified data.
console.log("================== Transform Stream ");
util.inherits(JSONObjectStream, stream.Transform);
function JSONObjectStream (opt) {
stream.Transform.call(this,opt);
};
JSONObjectStream.prototype._transform = function(data,encoding,callback) {
object = data ? JSON.parse(data.toString()) : "";
this.emit("object",object);
object.handled = true ;
this.push(JSON.stringify(object));
callback();
};
JSONObjectStream.prototype._flush = function(obj) {
cb();
};
var tc = new JSONObjectStream();
tc.on("object",function(object) {
console.log("Name: %s",object.name);
console.log("Color: %s",object.color);
});
tc.on("data",function(data) {
console.log("Data : %s",data.toString());
});
tc.write('{"name":"Jose","color":"Green"}');
tc.write('{"name":"Solarius","color":"Blue"}');
tc.write('{"name":"Lo Tae Zhao","color":"Black"}');
tc.write('{"name":"Ommadon","color":"Brown"}');
PIPIING READABLE STREAMS TO WRITABLE STREAMS
One of the coolest things you can do with stream objects is chain Readable streams to Writable streams by using the pipe( writableStream, [options]) function. This does exactly what the name implies: It inputs the output from the Readable stream directly into the Writable stream. The options parameter accepts an object with the end property set to true or false. When end is true, the Writable stream ends when the Readable stream ends.
console.log("=====================Piping Readable Streams to Writable Streams ");
util.inherits(PipeReader, stream.Readable);
util.inherits(PipeWriter, stream.Writable);
function PipeReader (opt) {
stream.Readable.call(this,opt);
this._index = 1;
}
PipeReader.prototype._read = function(size) {
var i = this._index++;
if (i>10) {
this.push(null);
} else{
this.push("Item " + i.toString());
}
};
function PipeWriter (opt) {
stream.Writable.call(this,opt);
this._index =1 ;
}
PipeWriter.prototype._write = function (data,encoding,callback) {
console.log(data.toString());
callback();
};
var r = new PipeReader();
var w = new PipeWriter();
r.pipe(w);
COMPRESSING AND DECOMPRESSING DATA
Node.js provides an excellent library in the Zlib module that allows you to compress and decompress data in buffers very easily and efficiently.
The Zlib module provides several helper functions that make it easy to compress/ decompress data buffers. They all use the same basic format of function( buffer, callback), where function is the compression/decompression method, buffer is the buffer to be compressed/decompressed, and callback is the callback function that is executed after the compression/ decompression occurs.
console.log ( " ============================= COMPRESSING AND DECOMPRESSING BUFFERS ");
var zlib = require("zlib");
var input = ' .......................text..................';
zlib.deflate(input, function(err, buffer) {
if (!err) {
console.log("deflate (%s):",buffer.length,buffer.toString('base64'));
zlib.inflate(buffer,function(err,buffer) {
if (!err) {
console.log("inflate (%s) :",buffer.length,buffer.toString());
}
});
}
});
zlib.deflateRaw(input,function (err, buffer) {
if(!err){
console.log("deflateRaw (%s): ",buffer.length,buffer.toString('base64'));
zlib.inflateRaw(buffer,function(err,buffer) {
if(!err){
console.log("inflateRaw (%s) : ", buffer.length,buffer.toString());
}
});
}
});
zlib.gzip(input, function (err, buffer) {
if(!err){
console.log("gzip (%s) : ",buffer.length,buffer.toString('base64'));
zlib.gunzip(buffer,function(err,buffer) {
if (!err) {
console.log("gunzip (%s) : ",buffer.length,buffer.toString());
}
});
zlib.unzip(buffer,function(err,buffer) {
if (!err) {
console.log("unzip gzip (%s) : ",buffer.length,buffer.toString());
}
});
}
});
COMPRESSING & DECOMPRESSING STREAMS
Compressing/decompressing streams using Zlib is slightly different from compressing/ decompressing buffers. Instead, you use the pipe() function to pipe the data from one stream through the compression/decompression object into another stream. This can apply to compressing any Readable streams into Writable streams.
console.log("===================== Compressing / DECOMPRESSING STREAMS ");
// var zlib = require("zlib");
var gzip = zlib.createGzip();
var fs = require('fs');
var inFile = fs.createReadStream('test.txt');
var outFile = fs.createWriteStream('test.gz');
inFile.pipe(gzip).pipe(outFile);
setTimeout(function() {
var gunzip= zlib.createUnzip({flush:zlib.Z_FULL_FLUSH});
var inFile = fs.createReadStream('test.gz');
var outFile = fs.createWriteStream('test.unzipped');
inFile.pipe(gunzip).pipe(outFile);
},3000);
For more information you can review the official documentation http://nodejs.org/api/stream.html