Duplex Streams are data streams that are both readable and writable. Many languages that have some kind of stream
concept build two way network connections as an object with an input stream property and an output stream property. An example of this can be seen in node.js (process.stdin
is the input and process.stdout
is the output). Many node.js users choose instead to have the item itself be both a readable and writable stream (e.g. substack’s dnode).
This is the classic server.pipe(client).pipe(server)
.
The other way that people use Duplex Streams is for transformations. You can pipe raw network data into the stream and recieve parsed output on the other side, or you can pipe structured data in and get raw bytes out (that you can send over a network or write to a file).
This is the classic inputfile.pipe(parser).pipe(update).pipe(stringify).pipe(outputFile)
.
Error Handling
The problem with these two different concepts being represented by exactly the same programming concept is that they have different desirable error handling characteristics.
For the client/server architecture you want your errors thrown immediately (and crashing the applications) if they are not handled. From the point of view of handling, you want to make the consumers of the API handle the errors as close as possible to where it was thrown. That means you do no automatic forwarding and require users to listen to the 'error'
event immediately. This is how node currently works.
The reset of this post represents how I feel streams should work, it may not always reflect the views of the node community at large (that’s up to you to decide)
For the transformation streams, the desirable error handling behavior is totally different. When readable streams are piped, you want their errors to go with the data. Consider the following function to return a parsed stream of a file:
function read(path) {
return fs.createReadStream(path)
.pipe(parseRawData())
}
If there is an error parsing the data, the returned stream emits an error. This is what you want. If there is an error reading the file (e.g. the file does not exist), the error will crash the application. This is not what you want. The behavior you would want is something like:
function read(path) {
var src = fs.createReadStream(path)
var dest = src.pipe(parseRawData())
src.on('error', dest.emit.bind(dest, 'error'))
return dest
}
That’s a far too convoluted way to do what is almost always what you want with transform streams. Until node.js has something native built in, I’m going to use my extension library barrage. It adds a new method called syphon
which acts like pipe
, except it forwards our errors. This lets me re-write the fixed version of read
as:
function read(path) {
return barrage(fs.createReadStream(path))
.syphon(parseRawData())
}
This is way closer to the original method we wanted to write, but it handles errors properly. I’d really like syphon
to be added to node natively :)
The Kind of Obvious
Once you’ve fixed the error forwarding so you can choose between pipe
and syphon
depending on whether the errors need to be forwarded or thrown, there are a few other things that really should be more convenient:
- It should be easier to buffer the output of a read stream when you no longer nead streaming for maintaining low memory usage. It’s surprisingly tricky to get back to a proper callback API.
- When you’re writing to something there isn’t anything to buffer, but it still needs to be easier to get back to a nice callback API.
I solve both these methods in barrage as barrage(readable).buffer(callback)
and barrage(writable).wait(callback)
.
Both methods offer the guarantee that the callback will never be called more than once and both these methods return promises if you omit the callback, because that’s my personal preference.