Node.js Stream Introduction
Some definition
From https://nodejs.org/api/stream.html :
“A stream is an abstract interface for working with streaming data”.
“There are many stream objects provided by Node.js. For instance, a request to an HTTP server and process.stdout are both stream instances”.
Streams are an interface used to process data, one chunk at a time.
They are especially useful when:
- You don’t need to access your data all at once.
- The size of your data is unknown or large enough to increase ram usage significantly ( in a single instance, or because of concurrent processing ).
Such scenarios can include: user input, file you need to read from, the result of database query ( fetching a million objects for analytics/exports etc. ).
In those case, not storing everything in memory at once will be beneficial.
Some Stream example
Let’s create the simplest Stream example possible: reading from a hello.txt file that contains the string “Hello World”.
As Streams are instances of EventEmitter ( Node.js pub/sub interface ), we will need to attach a callback to an event.
const fs = require('fs')const myStream = fs.createReadStream('hello.txt')myStream.on('data', console.log)
Executing this code will output:
<Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64
To display the content encoded in UTF-8 rather than binary format: call toString on the buffer object.
Replace the last line with:
myStream.on('data', chunk => console.log(chunk.toString()))
Finally you will display your text content:
Hello World
In this example the string “Hello world” fits in a single Buffer with the default size, but the whole point of streams is to have your .on ‘data’ callback called multiple times, letting the Node.js garbage collector free up the ram of your chunks of data between callback executions.
To have our .on ‘data’ being callback multiple time we can :
- pasting more text in our file so its length is more than 64kb.
- or simply split “Hello World” in chunks by changing our stream buffer size, overwriting the default of 64kb parameter.
const myStream = fs.createReadStream('hello.txt', { highWaterMark: 4 // Change our stream buffers length to 4 bytes})myStream.on('data', chunk => console.log(chunk.toString()))
This code will call .on ‘data‘ callback three times with three buffer objects and will display:
Hell
O Wo
rld
Async iteration
In my previous example, i used callbacks to consume a stream, but there is another way to do so since Node.js 10.
Streams provide async iterables you can use in “for await of” loop.
Instead of consuming your stream with:
myStream.on(‘data’...
You can do this:
for await(const chunk of myStream){ console.log(chunk.toString())}
Types of Streams
The four types of streams provided by the Stream API:
- Readable stream (streams from which data can be read)
- Writable stream (streams to which we can write data )
- Duplex stream (streams that are both Readable and Writable)
- Transform stream ( a type of Duplex stream that can modify or transform the data as it is written and read )
If you try to read from a Writable stream, and write to a Readable stream your process will throw with error such as
TypeError: dest.write is not a function
or
Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readable
Piping Streams
One of the key features of Node.js Streams is the ability to chain streams together.
Here is a full example of three streams chained together.
Using streams simplified construction https://nodejs.org/api/stream.html#stream_simplified_construction
const { Transform } = require('stream')const fs = require('fs')const myTransformStream = new Transform({ transform(chunk, encoding, done) { const str = chunk.toString() + "!\n" this.push(str) done() }})const myStream = fs.createReadStream('hello.txt', { highWaterMark: 4})myStream.pipe(myTransformStream).pipe(process.stdout)
The result:
Hell!o wo!rld!
You might want to look up npm for existing transform streams packages for parsing, batching etc.
( such as: https://www.npmjs.com/package/stream-json )
Error Handling
When using streams you need to handle the error events, or your process will throw saying:
Unhandled 'error' event
Note that Pipe does not propagate errors.
There is two way to handle streams errors:
- First is by adding an on error handler to each stream. ( here directly on pipe )
const { Transform } = require('stream')const fs = require('fs')const myTransformStream = new Transform( { transform(chunk, encoding, done) { const str = chunk.toString() + " !" this.push(str) done() }})fs.createReadStream('hello.txt').on('error', console.log).pipe(myTransformStream).on('error', console.log).pipe(process.stdout).on('error', console.log)
- Second is using the “pipelines” function to replace .pipe
Either with a callback:
const { pipeline } = require('stream')pipeline( fs.createReadStream('hello.txt'), myTransformStream, process.stdout, myErrorHandler)
Or with a promise ( since Node 15.0.0 )
const { pipeline } = require('stream/promises')await pipeline( fs.createReadStream('hello.txt'), myTransformStream, process.stdout,)
Hope this was useful !
About
Simon Dutertre is Lead Backend Developer at La Javaness since 2019