Node.js Flow (part 1) - Callback Hell vs. Async vs. Highland

(Part 2 of this series can be found here)

In this post i want to compare some different approaches to application flow in a JavaScript/node.js application:

Here's a (poor) example of an express route:

  • reading from a file
  • doing some processing (in 3 steps)
    • process* is just some arbitrary async operation that calls back with extended data
  • writing the result to a file
  • responding to the request with either a success or error message

Approach 1 - Continuation Passing

var express = require('express');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  fs.readFile(inputFile, function(err, data) {
    if (err) return res.status(500).send(err);

    process1(data, function(err, data) {
      if (err) return res.status(500).send(err);

      process2(data, function(err, data) {
        if (err) return res.status(500).send(err);

        process3(data, function(err, data) {
          if (err) return res.status(500).send(err);

          fs.writeFile(outputFile, data, function(err) {
            if (err) return res.status(500).send(err);

            res.status(200).send('processed successfully using callback hell');
          });
        });
      });
    });
  });
});

Already with just these (relatively) simple steps in our route we are starting to have issues with readability and ease of reasoning (at least I am..). It is not immediately apparent what our route does.

If we were to increase the number of processing steps it would get increasingly worse, and our application would get very hard to understand, maintain or extend.

In addition it doesn't feel DRY, as we are handling errors in each step of the flow, using the same line of code.

Approach 2 - Named Continuation Passing

This example was provided by pmuellr - @pmuellr.

var express = require('express')  
var fs = require('fs')  
var app = express()

app.post('/process-file', onProcessFile)

function onProcessFile(req, res) {  
  var inputFile = 'input.txt'
  var outputFile = 'output.txt'

  fs.readFile(inputFile, onReadFile);

  function onReadFile(err, data) {
    if (err) return res.status(500).send(err)
    process1(data, onProcess1)
  }  

  function onProcess1(err, data) {
    if (err) return res.status(500).send(err)
    process2(data, onProcess2)
  }  

  function onProcess2(err, data) {
    if (err) return res.status(500).send(err)
    process3(data, onProcess3)
  }  

  function onProcess3(err, data) {
    if (err) return res.status(500).send(err)
    fs.writeFile(outputFile, data, onWriteFile)
  }

  function onWriteFile(err) {
    if (err) return res.status(500).send(err)
    res.status(200).send('processed successfully using callback hell')
  }
}

At least with this approach you no longer nest the continuations deeply, but there are still some issues here with non-DRY handling of errors in each step of the flow. You might also have a difficult time following your flow as your number of steps grow larger.

Approach 3 - Using async.js

Here's the same route using the async library and the waterfall method:

var express = require('express');  
var async = require('async');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  async.waterfall([
    function(callback) {
      fs.readFile(inputFile, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      process1(data, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      process1(data, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      process1(data, function(err, data) {
        callback(err, data);
      });
    },
    function(data, callback) {
      fs.writeFile(outputFile, data, function(err) {
        callback(err, data);
      });
    }
  ], function(err, result) {
    if (err) return res.status(500).send(err);
    res.status(200).send('processed successfully using async lib');
  });
});

This approach makes it easier to see what our route is doing, and at least now we avoid repeating ourselves in the error handling.

It could also be using the processing functions directly in the flow (since the arguments are the same as for the flow function) and create a named callback for the final handling:

var express = require('express');  
var async = require('async');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  var done = function(err, result) {
    if (err) return res.status(500).send(err);
    res.status(200).send('processed successfully with async');
  };

  async.waterfall([
    fs.readFile.bind(fs, inputFile),
    process1,
    process2,
    process3,
    fs.writeFile.bind(fs, outputFile)
  ], done);
});

A lot better than the callback hell at least, but still it doesn't quite feel right.

Approach 4 - Using highland.js (streams)

Recently I've taken the liking of @caolan's highland library, which is a high level streams library that aims to be fully compatible with native node.js streams. I won't be going in to the library workings in this post, but here's the same route using highland streams:

var express = require('express');  
var _ = require('highland');  
var fs = require('fs');  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  // create a highland stream
  var data = _([inputFile]);

  data
    .flatMap(_.wrapCallback(fs.readFile))
    .flatMap(_.wrapCallback(process1))
    .flatMap(_.wrapCallback(process2))
    .flatMap(_.wrapCallback(process3))
    .flatMap(_.wrapCallback(fs.writeFile.bind(fs, outputFile)))
    .stopOnError(function(err) {
      res.status(500).send(err);
    })
    .apply(function(data) {
      res.status(200).send('processed successfully using highland streams')
    });
});

I find this approach readable and extensible. This is a functional approach that makes your application more maintainable and easy to reason about. Even if we added 50 more steps, it would still be easy to get a quick grip of the application flow. I highly encourage anyone who creates JavaScript applications to take a look at this library, and node.js streams in general.

Approach 5 - Promises (bluebird)

Credit to Esailija - @PetkaAntonov (the maker of bluebird) for chiming in with an example using promises.

var express = require('express');  
var Promise = require("bluebird");  
var fs = Promise.promisifyAll(require('fs'));  
var app = express();

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  fs.readFileAsync(inputFile)
    .then(Promise.promisify(process1))
    .then(Promise.promisify(process2))
    .then(Promise.promisify(process3))
    .then(fs.writeFileAsync.bind(fs, outputFile))
    .then(function(data) {
      res.status(200).send('processed successfully using bluebird promises');
    })
    .catch(function(err) {
      res.status(500).send(err);
    });
});

As we can see from this example - promises is very much like streams, except they only emit a single value. You can think of the .then() method of promises as an equivalent of .pipe() for node streams.

As a result of this I've spent the last week implementing promises (A+) from scratch just for learning, and it has been really fun and useful.

Bonus example using highland and .reduce()

Credit to Lewis Ellis - @LewisJEllis (one of the contributors of highland) for this clever example.

var express = require('express');  
var _ = require('highland');  
var fs = require('fs');  
var app = express();

function chain(s, f) {  
  return s.flatMap(_.wrapCallback(f))
}

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  var data = _([inputFile]);

  _([
    fs.readFile,
    process1,
    process2,
    process3,
    writeToFileName(outputFile)
  ]).reduce(data, chain).flatten()
    .stopOnError(function (err) {
      return res.status(500).send(err);
    }).apply(function (data) {
      return res.status(200).send('processed');
    });
});

Bonus example 2 using highland and async.compose

var express = require('express');  
var _ = require('highland');  
var async = require('async');  
var fs = require('fs');  
var app = express();

function chain(s, f) {  
  return s.flatMap(_.wrapCallback(f))
}

app.post('/process-file', function(req, res) {  
  var inputFile = 'input.txt';
  var outputFile = 'output.txt';

  var data = _([inputFile]);

  data
    .flatMap(_.wrapCallback(
      async.compose(
        fs.writeFile.bind(fs, outputFile),
        process3,
        process2,
        process1,
        fs.readFile
      )
    ))
    .errors(function(err) {
      res.status(500).send(err);
    })
    .each(function(data) {
      res.status(200).send('processed successfully using highland streams and async.compose');
    });

Check out part 2 here for more async flow examples using generators and fibers.

comments powered by Disqus