Understand node stream (what I learned when fixing Aws sdk bug)

Node.js stream

Node.js provides asynchronous I/O base on event loop.
When reading and writing from filesystem or sending http request,
Node.js can process other events when waiting for response, which we called it non-blocking I/O.
Stream is an extend of this concept, it provides an event base I/O interface to
save memory buffers and bandwidth.

Event Based I/O

When reading from filesystem, node provides non-blocking method with callback:

1
2
3
4
5
6
7
8
var require('fs');
fs.readFile('./test.json', function(err, err){
if (err) {
return console.log(err);
}

console.log('test file is loaded:\n', data);
});

However, for large file we may want to do something before the file is completely
loaded to save memory buffer. This is where stream comes in:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var fs = require('fs');
var stream = fs.createReadStream('./test.mp4');

stream.on('data', function(data) {
console.log('loaded part of the file');
});

stream.on('end', fucntion() {
console.log('all parts is loaded');
});

stream.on('error', function(err) {
console.log('something is wrong :( ');
});

Basically a read stream is an EventEmitter with ‘data’, ‘end’ and ‘error’ event.

‘data’ event return the part of file,
‘end’ event is called when read finished.
‘error’ event is called when error happened

So we can write or process part of the file, but no need to wait until the whole file is loaded.
For example, when we request a file from internet:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var fs = require('fs');
var request = require('request');

var stream = request('http://i.imgur.com/dmetFjf.jpg');
var writeStream = fs.createWriteStream('test.jpg')

stream.on('data', function(data) {
writeStream.write(data)
});

stream.on('end', fucntion() {
writeStream.end();
});

stream.on('error', function(err) {
console.log('something is wrong :( ');
writeStream.close();
});

This will write the data to file when it receive part of the data.

Pipe

Pipe is another concept that can let you redirect input to output.
The above download file code can be present with pipe:

1
2
3
4
5
6
7
var fs = require('fs');
var request = require('request');

var stream = request('http://i.imgur.com/dmetFjf.jpg');
var writeStream = fs.createWriteStream('./testimg.jpg');

stream.pipe(writeStream);

What pipe function do is, it connect the read and write events between streams,
and return another pipe. So we can even chaining multiple pipes together:

1
2
3
4
5
6
7
8
9
10
var fs = require('fs');
var request = require('request');
var gzip = require('zlib').createGzip();

var stream = request('http://i.imgur.com/dmetFjf.jpg');
var writeStream = fs.createWriteStream('./testimg.jpg');

// write gzipped image file
stream.pipe(gzip).pipe(writeStream);

Stream2 (Readable and Writable stream)

One problem of the ‘data’ event based stream is the stream consumer can’t control the timimg of read
and how much data to read each times.
When data event is triggered,
handler function need to store the data into buffer or write it to disk right away.
That becomes a problem when we have slow or limited write I/O.

Therefore, in node.js v0.10. It introduce the new stream interface, called stream2.

It provides 2 new stream classes:

Readable Stream

Readable stream extend the old stream interface with new ‘readable’ event,
which let the consumer control the timing of read and how many bytes to read.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

// node.js >= v0.10
var fs = require('fs');
var stream = fs.createReadStream('./testimg.jpg');
var writeStream = fs.createWriteStream('./output.jpg');

stream.on('readable', function() {
// stream is ready to read
var data = stream.read();
writeStream.write(data);
});

stream.on('end', function() {
writeStream.end();
});

So when readable event is triggered, the consumer control to call the stream.read() to read the data.
if the data is not read, readable event will be throwed back to eventloop and be triggered later.

The Readable stream is also backward competable, so when ‘data’ event is listened.
Stream will not use readable event but downgrade to old stream behavior.

Writable Stream

Writable stream added new ‘drain’ event, which will be triggered when all data in buffer is written.
So we can control the timing to write when the buffer is empty.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// node.js >= v0.10
var fs = require('fs');

var stream = fs.createReadStream('./input.mp4');
var writeStream = fs.createWriteStream('./output.mp4');

var writable = true;
var doRead = function() {
var data = stream.read();
// when writable return false, it means the buffer is full.
writable = writeStream.write(data);
}

stream.on('readable', function() {
if(writable) {
doRead()
} else {
// wait for drain event if stream buffer is full
writeStream.removeAllListeners('drain');
writeStream.once('drain', doRead)
}
});

stream.on('end', function() {
writeStream.end();
});

AWS sdk bug (How to implement readable stream)

issue link

So when I was using AWS sdk to download image from S3 with stream.
I found out it only download part of the image.

After dig into the source of readable stream.
I found out the mechanism of readable stream and what happened durning download.

For implementing Readable stream, we can overwrite the _read method of Readable stream.
As the spec said, the _read method should read the data from source and push the data into read buffer

In AWS js sdk:

1
2
3
4
5
6
stream._read = function() {
var data;
while (data = httpStream.read()) {
stream.push(data);
}
};

It looks fine. The _read function consume data from http stream and push back to stream.
However, in the source of readable stream,
It is actually implementented with a event pull method,
The stream will try to read the data first, if read got null value.
it put the read event back to eventloop, wait until it to be triggered again.
However, when the _read method is called, it set a reading flag to block
further read event to avoid race condition.
And a push method call set the reading flag to false and unblock stream read.

Therefore when the httpStream.read() return null, the stream.push will not be called.
And block any following read events.

The solution is to remember to unblock the stream when reading:

1
2
3
4
5
6
7
stream._read = function() {
var data = httpStream.read();
do {
stream.push(data);
} while(data = httpStream.read());
};

Further reading

Jimchao

A developer, hacker, traveler and boarder live in New York City. You can follow my code at github.com/rafe

Comments