Node.js Stream Introduction

La Javaness IT
4 min readApr 29, 2021

--

Photo by Jackie Zhao on Unsplash

Some definition

From https://nodejs.org/api/stream.html :

“A stream is an abstract interface for working with streaming data”.

“There are many stream objects provided by Node.js. For instance, a request to an HTTP server and process.stdout are both stream instances”.

Streams are an interface used to process data, one chunk at a time.
They are especially useful when:

  • You don’t need to access your data all at once.
  • The size of your data is unknown or large enough to increase ram usage significantly ( in a single instance, or because of concurrent processing ).

Such scenarios can include: user input, file you need to read from, the result of database query ( fetching a million objects for analytics/exports etc. ).

In those case, not storing everything in memory at once will be beneficial.

Some Stream example

Let’s create the simplest Stream example possible: reading from a hello.txt file that contains the string “Hello World”.

As Streams are instances of EventEmitter ( Node.js pub/sub interface ), we will need to attach a callback to an event.

const fs = require('fs')const myStream = fs.createReadStream('hello.txt')myStream.on('data', console.log)

Executing this code will output:

<Buffer 48 65 6c 6c 6f 20 57 6f 72 6c 64

To display the content encoded in UTF-8 rather than binary format: call toString on the buffer object.

Replace the last line with:

myStream.on('data', chunk => console.log(chunk.toString()))

Finally you will display your text content:

Hello World

In this example the string “Hello world” fits in a single Buffer with the default size, but the whole point of streams is to have your .on ‘data’ callback called multiple times, letting the Node.js garbage collector free up the ram of your chunks of data between callback executions.

To have our .on ‘data’ being callback multiple time we can :

  • pasting more text in our file so its length is more than 64kb.
  • or simply split “Hello World” in chunks by changing our stream buffer size, overwriting the default of 64kb parameter.
const myStream = fs.createReadStream('hello.txt', {   highWaterMark: 4 // Change our stream buffers length to 4 bytes})myStream.on('data', chunk => console.log(chunk.toString()))

This code will call .on ‘data‘ callback three times with three buffer objects and will display:

Hell
O Wo
rld

Async iteration

In my previous example, i used callbacks to consume a stream, but there is another way to do so since Node.js 10.

Streams provide async iterables you can use in “for await of” loop.

Instead of consuming your stream with:

myStream.on(‘data’...

You can do this:

for await(const chunk of myStream){   console.log(chunk.toString())}

Types of Streams

The four types of streams provided by the Stream API:

  • Readable stream (streams from which data can be read)
  • Writable stream (streams to which we can write data )
  • Duplex stream (streams that are both Readable and Writable)
  • Transform stream ( a type of Duplex stream that can modify or transform the data as it is written and read )

If you try to read from a Writable stream, and write to a Readable stream your process will throw with error such as

TypeError: dest.write is not a function

or

Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readable

Piping Streams

One of the key features of Node.js Streams is the ability to chain streams together.

Here is a full example of three streams chained together.

Using streams simplified construction https://nodejs.org/api/stream.html#stream_simplified_construction

const { Transform } = require('stream')const fs = require('fs')const myTransformStream = new Transform({  transform(chunk, encoding, done) {    const str = chunk.toString() + "!\n"    this.push(str)    done() }})const myStream = fs.createReadStream('hello.txt', {  highWaterMark: 4})myStream.pipe(myTransformStream).pipe(process.stdout)

The result:

Hell!o wo!rld!

You might want to look up npm for existing transform streams packages for parsing, batching etc.

( such as: https://www.npmjs.com/package/stream-json )

Error Handling

When using streams you need to handle the error events, or your process will throw saying:

Unhandled 'error' event

Note that Pipe does not propagate errors.

There is two way to handle streams errors:

  • First is by adding an on error handler to each stream. ( here directly on pipe )
const { Transform } = require('stream')const fs = require('fs')const myTransformStream = new Transform( {  transform(chunk, encoding, done) {    const str = chunk.toString() + " !"    this.push(str)    done()  }})fs.createReadStream('hello.txt').on('error', console.log).pipe(myTransformStream).on('error', console.log).pipe(process.stdout).on('error', console.log)
  • Second is using the “pipelines” function to replace .pipe

Either with a callback:

const { pipeline } = require('stream')pipeline(  fs.createReadStream('hello.txt'),  myTransformStream,  process.stdout,  myErrorHandler)

Or with a promise ( since Node 15.0.0 )

const { pipeline } = require('stream/promises')await pipeline(  fs.createReadStream('hello.txt'),  myTransformStream,  process.stdout,)

Hope this was useful !

About

Simon Dutertre is Lead Backend Developer at La Javaness since 2019

--

--

La Javaness IT
La Javaness IT

Written by La Javaness IT

La Javaness brings your team and your business to the AI ​​revolution!

No responses yet