Duplex Streams

A Duplex stream is a stream that combines Readable and Writable functionality. A good example of a Duplex stream is a TCP socket connection. You can read and write from the socket connection after it has been created.

To implement your own custom Duplex stream object, you need to first inherit the functionality for Duplex streams. The simplest way to do this is to use the following code, which uses the util module’s inherits() method:

var util = require('util'),
util.inherits(MyDuplexStream, stream.Duplex);

Then you create an instance of the object call:

stream.Duplex.call(this, opt);

The opt parameter with creation of a Duplex stream accepts an object with the property allowHalfOpen set to true or false. If this option is true, then the readable side stays open even after the writable side has ended and vice versa. If this option is set to false, then ending the writable side also ends the readable side and vice versa.

When you implement a Duplex stream, you need to implement both a _read(size) and _write(data, encoding, callback) method when prototyping your Duplex class.

The code in Listing 5.8 illustrates the basics of implementing, writing to, and reading from a Duplex stream. The example is very basic but shows the main concepts. The Duplexer() class inherits from the Duplex stream and implements a rudimentary _write() function that stores data in an array in the object. The _read() function uses shift() to get the first item in the array and then pushes null if it is equal to "stop", pushes it if there is a value, or sets a timeout timer to call back to the _read() function if there is no value.

Figure 5.8 shows the output for Listing 5.8. Notice that the first two writes ("I think, " and "therefore") are read together. This is because both were pushed to Readable before the data event was triggered and printed the “read: I think, therefore” line to the console output.

Listing 5.8 stream_duplex.js: Implementing a Duplex stream object


01 var stream = require('stream'),
02 var util = require('util'),
03 util.inherits(Duplexer, stream.Duplex);
04 function Duplexer(opt) {
05   stream.Duplex.call(this, opt);
06   this.data = [];
07 }
08 Duplexer.prototype._read = function readItem(size) {
09   var chunk = this.data.shift();
10   if (chunk == "stop"){
11     this.push(null);
12   } else{
13     if(chunk){
14       this.push(chunk);
15     } else {
16       setTimeout(readItem.bind(this), 500, size);
17     }
18   }
19 };
20 Duplexer.prototype._write = function(data, encoding, callback) {
21   this.data.push(data);
22   callback();
23 };
24 var d = new Duplexer();
25 d.on('data', function(chunk){
26   console.log('read: ', chunk.toString());
27 });
28 d.on('end', function(){
29   console.log('Message Complete'),
30 });
31 d.write("I think, ");
32 d.write("therefore ");
33 d.write("I am.");
34 d.write("Rene Descartes");
35 d.write("stop");


Image

Figure 5.8 Output of stream_duplex.js, implementing a custom Duplex object.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset