A Duplex
stream is a stream that combines Readable
and Writable
functionality. A good example of a Duplex
stream is a TCP socket connection. You can read and write from the socket connection after it has been created.
To implement your own custom Duplex
stream object, you need to first inherit the functionality for Duplex
streams. The simplest way to do this is to use the following code, which uses the util
module’s inherits()
method:
var util = require('util'),
util.inherits(MyDuplexStream, stream.Duplex);
Then you create an instance of the object call:
stream.Duplex.call(this, opt);
The opt
parameter with creation of a Duplex
stream accepts an object with the property allowHalfOpen
set to true
or false
. If this option is true
, then the readable side stays open even after the writable side has ended and vice versa. If this option is set to false
, then ending the writable side also ends the readable side and vice versa.
When you implement a Duplex
stream, you need to implement both a _read(size)
and _write(data, encoding, callback)
method when prototyping your Duplex
class.
The code in Listing 5.8 illustrates the basics of implementing, writing to, and reading from a Duplex
stream. The example is very basic but shows the main concepts. The Duplexer()
class inherits from the Duplex
stream and implements a rudimentary _write()
function that stores data in an array in the object. The _read()
function uses shift()
to get the first item in the array and then pushes null
if it is equal to "stop"
, pushes it if there is a value, or sets a timeout timer to call back to the _read()
function if there is no value.
Figure 5.8 shows the output for Listing 5.8. Notice that the first two writes ("I think, "
and "therefore"
) are read together. This is because both were pushed to Readable
before the data
event was triggered and printed the “read: I think, therefore” line to the console output.
01 var stream = require('stream'),
02 var util = require('util'),
03 util.inherits(Duplexer, stream.Duplex);
04 function Duplexer(opt) {
05 stream.Duplex.call(this, opt);
06 this.data = [];
07 }
08 Duplexer.prototype._read = function readItem(size) {
09 var chunk = this.data.shift();
10 if (chunk == "stop"){
11 this.push(null);
12 } else{
13 if(chunk){
14 this.push(chunk);
15 } else {
16 setTimeout(readItem.bind(this), 500, size);
17 }
18 }
19 };
20 Duplexer.prototype._write = function(data, encoding, callback) {
21 this.data.push(data);
22 callback();
23 };
24 var d = new Duplexer();
25 d.on('data', function(chunk){
26 console.log('read: ', chunk.toString());
27 });
28 d.on('end', function(){
29 console.log('Message Complete'),
30 });
31 d.write("I think, ");
32 d.write("therefore ");
33 d.write("I am.");
34 d.write("Rene Descartes");
35 d.write("stop");