Running Average with Node.js Streams

Just as a fun exercise I want to implement a running average using node.js streams.

First we'll create a stream that supplies random numbers as input:

var ReadableStream = require('stream').Readable;
var util = require('util');

util.inherits(RandomStream, ReadableStream);

function RandomStream(max, low, high) {
  if (!(this instanceof RandomStream)) return new RandomStream(max, low, high);
  if (high < low) high = low;

  ReadableStream.call(this, {
    objectMode: true
  });

  this._low = low;
  this._high = high;
  this._max = max;
  this._index = 1;
}

RandomStream.prototype._read = function() {
  var i = this._index++;
  if (i > this._max) {
    this.push(null);
  } else {
    this.push(getRandomInt(this._low, this._high + 1));
  }
};

function getRandomInt(min, max) {
  return Math.floor(Math.random() * (max - min)) + min;
}

module.exports = RandomStream;

The next step is to make a stream implementation to compare the previous value with the current:

(This stream supports both syncronous returning function and async functions that uses a callback - as input)

var util = require('util');
var TransformStream = require('stream').Transform;

util.inherits(CompareStream, TransformStream);

function CompareStream(fn) {
  if (!(this instanceof CompareStream)) return new CompareStream(fn);

  this._fn = fn;
  this._previous = null;

  TransformStream.call(this, {
    objectMode: true
  });
}

CompareStream.prototype._transform = function(chunk, encoding, done) {
  if (this._fn.length > 1) {
    this._fn(this._previous, chunk, done);
  } else {
    done(null, this._fn(this._previous, chunk));
  }
  this._previous = chunk;
};

module.exports = CompareStream;

We'll also create a transform stream that stringifies/serializes objects, so that we can pipe the output to process.stdout for printing/debugging:

var util = require('util');
var assign = require('object-assign');
var TransformStream = require('stream').Transform;

util.inherits(StringifyStream, TransformStream);

function StringifyStream(opt) {
  if (!(this instanceof StringifyStream)) return new StringifyStream(options);
  var options = assign(opt || {}, { objectMode: true });

  TransformStream.call(this, options);
}

StringifyStream.prototype._transform = function(chunk, encoding, done) {
  this.push(JSON.stringify(chunk) + '\n');
  done();
};

module.exports = StringifyStream;

Now we can use this to compare values that are passing through the stream:

var StringifyStream = require('./stringify-stream');
var CompareStream = require('./compare-stream');
var RandomStream = require('./random-stream');
var util = require('util');

var random = RandomStream(100, 0, 10);

random
  .pipe(CompareStream(runningAverage(5)))
  .pipe(StringifyStream())
  .pipe(process.stdout)

function runningAverage(len) {
  var values = [];

  function addToList(val) {
    if (!(values.length < len)) values.shift();
    values.push(val);
  }

  return function(prev, next, callback) {
    addToList(next);

    setImmediate(function() {
      callback(null, values.reduce(function(a, b) {
        return a + b;
      }, 0) / values.length);
    });
  };
}

The output will be something like:

2
1
1
1.25
2.8
4.2
4.2
4.4
5.2
5
..
..

If we add a console.log(values) in our runningAverage implementation we can see the average stabilizing:

[ 2 ]
[ 2, 0 ]
[ 2, 0, 1 ]
[ 2, 0, 1, 2 ]
[ 2, 0, 1, 2, 9 ]
[ 0, 1, 2, 9, 9 ]
[ 1, 2, 9, 9, 0 ]
[ 2, 9, 9, 0, 2 ]
[ 9, 9, 0, 2, 6 ]
[ 9, 0, 2, 6, 8 ]
..
..

Cheers!

comments powered by Disqus