What are streams?
Streams are sequences of characters or numbers or objects — a collection of data. Streams provide a way to handle any kind of end-to-end information exchange in an efficient way. It is primarily used for handling a large amount of data (even more than 10GB) in node.js
“Streams are Node’s best and most misunderstood idea.”
— Dominic Tarr
Are streams unique to node.js?
No. Streams were introduced in Unix operating system by Dennis Ritchie many decades ago [Read More]
Programs in Unix can interact with each other using pipe operator (|). Simply, the output of one process can be given to another process
Advantages of streams
Memory efficiency — It helps to convert a large amount of data into chunks of data. So there is no need to load huge data in memory before starting the process.
Time efficiency — It won’t wait for the whole data payload. It starts processing with the available chunk of data.
Backpressure — If one process takes more time to complete, then the flow of data can be slowed down from the root which was handled by node.js itself.
Usecase:
Read data —> Process data —> Write data
Writing to a disk is slower than reading from a disk. As it does not push all chunks of data in the write queue, it slows down the read stream process.
Simplify the processing — It is simple to add, delete or modify the stream processing step.
How to connect two processes in streams?
Using pipe() — We can connect n number of steps.
pipe() takes the source and pipes it into destination process. It return the value of the pipe() method to the destination stream. We can chain multiple pipes.
inputStream .pipe(modifyStream1) .pipe(modifyStream2) .pipe(desiredOutput)
Can we directly modify the stream data?
How to process stream data?
We can process data using stream.Transform subclass. It has a method called transform which can able to read chunks and push the transformed data. Three arguments should be provided for the transform method.
Chunk — Output of the previous step
Encoding — Encoding type
Callback — After modifying/ manipulating data, return it using callback(error, modifiedData)
Example
Here we create a file and transform it using streams.
const fs = require('fs');
const { Transform } = require('stream');
const ReadlineTransform = require('readline-transform');
const toLines = new ReadlineTransform({});const fileData = fs.createReadStream('inputFile.txt');
const writeFile = fs.createWriteStream('outputFile.txt');const modifyStream = new Transform({
transform : (chunk, encoding, done) => {
const modifiedString = `${chunk} number\n`;
done(null, modifiedString);
}
});fileData.pipe(toLines).pipe(modifyStream).pipe(writeFile);
How to create your own Stream?
— Pardon this is not a memory efficient stream. Think of streaming data from the database to your browser for a better example.
const { Readable, Transform } = require('stream');
const fs = require('fs');
const _ = require('lodash');const inputStream = new Readable({ objectMode: true });
const inputs = [
'Lorem ipsum dolor sit amet',
'Lorem ipsum dolor sit amet',
'Lorem ipsum dolor sit amet'
];_.each(inputs, (input) => {
inputStream.push(input);
});inputStream.push(null);const modifyStream = new Transform({
transform : (chunk, encoding, done) => {
const modifiedString = `${chunk}`.toUpperCase();
done(null, modifiedString);
}
});inputStream
.pipe(modifyStream)
.pipe(process.stdout)
.on('error', (error) => {
console.log(error);
});
That’s all I have for this topic. Thanks for reading! Until next time!
Read more about streams