Running Average with Node.js Streams

Just as a fun exercise I want to implement a running average using node.js streams.

First we'll create a stream that supplies random numbers as input:

var ReadableStream = require('stream').Readable;  
var util = require('util');

util.inherits(RandomStream, ReadableStream);

function RandomStream(max, low, high) {  
  if (!(this instanceof RandomStream)) return new RandomStream(max, low, high);
  if (high < low) high = low;

  ReadableStream.call(this, {
    objectMode: true
  });

  this._low = low;
  this._high = high;
  this._max = max;
  this._index = 1;
}

RandomStream.prototype._read = function() {  
  var i = this._index++;
  if (i > this._max) {
    this.push(null);
  } else {
    this.push(getRandomInt(this._low, this._high + 1));
  }
};

function getRandomInt(min, max) {  
  return Math.floor(Math.random() * (max - min)) + min;
}

module.exports = RandomStream;  

The next step is to make a stream implementation to compare the previous value with the current:

(This stream supports both syncronous returning function and async functions that uses a callback - as input)

var util = require('util');  
var TransformStream = require('stream').Transform;

util.inherits(CompareStream, TransformStream);

function CompareStream(fn) {  
  if (!(this instanceof CompareStream)) return new CompareStream(fn);

  this._fn = fn;
  this._previous = null;

  TransformStream.call(this, {
    objectMode: true
  });
}

CompareStream.prototype._transform = function(chunk, encoding, done) {  
  if (this._fn.length > 1) {
    this._fn(this._previous, chunk, done);
  } else {
    done(null, this._fn(this._previous, chunk));
  }
  this._previous = chunk;
};

module.exports = CompareStream;  

We'll also create a transform stream that stringifies/serializes objects, so that we can pipe the output to process.stdout for printing/debugging:

var util = require('util');  
var assign = require('object-assign');  
var TransformStream = require('stream').Transform;

util.inherits(StringifyStream, TransformStream);

function StringifyStream(opt) {  
  if (!(this instanceof StringifyStream)) return new StringifyStream(options);
  var options = assign(opt || {}, { objectMode: true });

  TransformStream.call(this, options);
}

StringifyStream.prototype._transform = function(chunk, encoding, done) {  
  this.push(JSON.stringify(chunk) + '\n');
  done();
};

module.exports = StringifyStream;  

Now we can use this to compare values that are passing through the stream:

var StringifyStream = require('./stringify-stream');  
var CompareStream = require('./compare-stream');  
var RandomStream = require('./random-stream');  
var util = require('util');

var random = RandomStream(100, 0, 10);

random  
  .pipe(CompareStream(runningAverage(5)))
  .pipe(StringifyStream())
  .pipe(process.stdout)

function runningAverage(len) {  
  var values = [];

  function addToList(val) {
    if (!(values.length < len)) values.shift();
    values.push(val);
  }

  return function(prev, next, callback) {
    addToList(next);

    setImmediate(function() {
      callback(null, values.reduce(function(a, b) {
        return a + b;
      }, 0) / values.length);
    });
  };
}

The output will be something like:

2  
1  
1  
1.25  
2.8  
4.2  
4.2  
4.4  
5.2  
5  
..
..

If we add a console.log(values) in our runningAverage implementation we can see the average stabilizing:

[ 2 ]
[ 2, 0 ]
[ 2, 0, 1 ]
[ 2, 0, 1, 2 ]
[ 2, 0, 1, 2, 9 ]
[ 0, 1, 2, 9, 9 ]
[ 1, 2, 9, 9, 0 ]
[ 2, 9, 9, 0, 2 ]
[ 9, 9, 0, 2, 6 ]
[ 9, 0, 2, 6, 8 ]
..
..

Cheers!

comments powered by Disqus