Just as a fun exercise I want to implement a running average using node.js streams.
First we'll create a stream that supplies random numbers as input:
var ReadableStream = require('stream').Readable;
var util = require('util');
util.inherits(RandomStream, ReadableStream);
function RandomStream(max, low, high) {
if (!(this instanceof RandomStream)) return new RandomStream(max, low, high);
if (high < low) high = low;
ReadableStream.call(this, {
objectMode: true
});
this._low = low;
this._high = high;
this._max = max;
this._index = 1;
}
RandomStream.prototype._read = function() {
var i = this._index++;
if (i > this._max) {
this.push(null);
} else {
this.push(getRandomInt(this._low, this._high + 1));
}
};
function getRandomInt(min, max) {
return Math.floor(Math.random() * (max - min)) + min;
}
module.exports = RandomStream;
The next step is to make a stream implementation to compare the previous value with the current:
(This stream supports both syncronous returning function and async functions that uses a callback - as input)
var util = require('util');
var TransformStream = require('stream').Transform;
util.inherits(CompareStream, TransformStream);
function CompareStream(fn) {
if (!(this instanceof CompareStream)) return new CompareStream(fn);
this._fn = fn;
this._previous = null;
TransformStream.call(this, {
objectMode: true
});
}
CompareStream.prototype._transform = function(chunk, encoding, done) {
if (this._fn.length > 1) {
this._fn(this._previous, chunk, done);
} else {
done(null, this._fn(this._previous, chunk));
}
this._previous = chunk;
};
module.exports = CompareStream;
We'll also create a transform stream that stringifies/serializes objects, so that we can pipe the output to process.stdout for printing/debugging:
var util = require('util');
var assign = require('object-assign');
var TransformStream = require('stream').Transform;
util.inherits(StringifyStream, TransformStream);
function StringifyStream(opt) {
if (!(this instanceof StringifyStream)) return new StringifyStream(options);
var options = assign(opt || {}, { objectMode: true });
TransformStream.call(this, options);
}
StringifyStream.prototype._transform = function(chunk, encoding, done) {
this.push(JSON.stringify(chunk) + '\n');
done();
};
module.exports = StringifyStream;
Now we can use this to compare values that are passing through the stream:
var StringifyStream = require('./stringify-stream');
var CompareStream = require('./compare-stream');
var RandomStream = require('./random-stream');
var util = require('util');
var random = RandomStream(100, 0, 10);
random
.pipe(CompareStream(runningAverage(5)))
.pipe(StringifyStream())
.pipe(process.stdout)
function runningAverage(len) {
var values = [];
function addToList(val) {
if (!(values.length < len)) values.shift();
values.push(val);
}
return function(prev, next, callback) {
addToList(next);
setImmediate(function() {
callback(null, values.reduce(function(a, b) {
return a + b;
}, 0) / values.length);
});
};
}
The output will be something like:
2
1
1
1.25
2.8
4.2
4.2
4.4
5.2
5
..
..
If we add a console.log(values)
in our runningAverage
implementation we can see the average stabilizing:
[ 2 ]
[ 2, 0 ]
[ 2, 0, 1 ]
[ 2, 0, 1, 2 ]
[ 2, 0, 1, 2, 9 ]
[ 0, 1, 2, 9, 9 ]
[ 1, 2, 9, 9, 0 ]
[ 2, 9, 9, 0, 2 ]
[ 9, 9, 0, 2, 6 ]
[ 9, 0, 2, 6, 8 ]
..
..
Cheers!