Oleg Romanovskyi

The experienced  Software architect NodeJS developer AngularJS developer

Distributed job processing with node.js

A lot of web applications start as simple as "take data from DB and apply some template". But more CPU and time-consuming tasks occur as time goes. It can be large photos processing for a photo gallery, sophisticated reporting, massive sending of emails, etc. That's where single threaded nature of node.js and its browser roots become a flaw.

When it matters

Here is an incomplete list of cases, when distributed job processing will be helpful:

  • Tasks are CPU bound. It's hard to get perfect load-balancing when some requests are lightweight, and some contain heavy computations. Moreover, it's easy to cause lag accidentally for other users due to single threaded nature of node.js.
  • Tasks are memory intensive. When one or couple jobs fits node.js memory limits, but real workload sometimes causes out of memory exceptions due to V8 engine heap limitations (~1.5Gb by default).
  • Reloading a web service is hard. If your task needs 5 minutes, then you should wait for at least this amount of time before service reloading. Otherwise, there is a chance to break the current task.

Way from a "throw and forget" task to advanced usage

For a demo application, we'll use a fake file downloader, which takes a URL and "downloads" it. In fact, it'll use setTimeout, to strip away as much unrelated code as possible.

Prerequisites

For job queue we'll use Kue npm module. Kue itself requires a Redis >= 2.6.12. If you are using Ubuntu, then run

$ sudo apt-get install redis-server

On other OS either use docker container or read an OS-specific installation manual.

I wrote samples as console applications to keep the code simple.

Basic job

Install Kue module

$ npm install kue

Create worker.js file

var kue = require('kue');

// We create an queue to work with.
// Prefix allows to have more than one queue in a single redis instance.
// Change redis connetion options if Redis in not local
var queue = kue.createQueue({
"prefix": "q","redis": {
"host": "127.0.0.1"
});

// Register a handler for 'download' job type.
// We specify that worker can handle up to 2 jobs of this type at the same time.
queue.process('download', 2, function(job, done){
//call job implementation with required parameters.
downloadDocument(job.data.url, done);
});

// stub for document downloading
function downloadDocument(url, done) {
console.log('Got a file to download "%s"', url);
setTimeout(function(){
console.log('File "%s" download complete', url);
done();
}, 2000);
}

Create client.js file

var kue = require('kue');

//Same queue as we use in the worker
var queue = kue.createQueue({
"prefix": "q","redis": {
"host": "192.168.59.103"
}
});

//as a sample client will create 10 tasks for file download
var filesToDownload = 10;

//counter to know when all jobs saved to Redis so client can exit
var enqueued = 0;

//event handler. Called when job is saved to the Redis.
queue.on('job enqueue', function(){
enqueued++;
// Exit client when everything is saved
if(enqueued === filesToDownload){
process.exit(0);
}
});

//create 10 download jobs
for(var i=1;i<=filesToDownload;i++){
queue.create('download', {
url: 'http://example.org/document-'+i+'.pdf'
})
.removeOnComplete(true)
.save()
}

Now you can start the client

$ node client.js

It will exit shortly after launch. It's because we don't wait until jobs are done. Just save them to the queue. Then you can start worker

$node worker.js

And it'll write data about progress to a console. Something like

Got a file to download "http://example.org/document-1.pdf"
Got a file to download "http://example.org/document-2.pdf"
File "http://example.org/document-1.pdf" download complete
File "http://example.org/document-2.pdf" download complete
Got a file to download "http://example.org/document-3.pdf"
Got a file to download "http://example.org/document-4.pdf"
File "http://example.org/document-3.pdf" download complete
File "http://example.org/document-4.pdf" download complete

With a little effort, we can restart our client whenever needed without waiting until processing done. And even without waiting until there is a free worker to handle a job. By adjusting of a concurrency, we can prevent out of memory exceptions, limit an amount of HTTP connections, etc.

Complete source code for this tutorial can be found on Github.

Track how job is going

In a real project, it's useful to report back to user that job is done (video is converted, import complete, ...). Also, some information about progress is helpful to give a user a clue, when an operation will be completed. Let's check how we can get detailed information about a job.

Kue makes it easy. We call a single method to report progress and an event listener to receive progress/complete notifications.

Firstly, to get a progress information - job handler should provide it. Modified version of job handler:

function downloadDocument(job, url, done) {
console.log('Got a file to download "%s"', url);
var progress = 0;
function step(){
progress += 20;
//we specify that we done 'progress' amount out of 100
job.progress(progress, 100);
if(progress>=100){
console.log('File "%s" download complete', url);
return done();
}
setTimeout(step, 400);
}
step();
}

Then we should listen for progress events on client side of the queue. After adding of listeners, creating of a job can look like this:

function createJob(url){
var job = queue.create('download', {
url: url
});
//we add the listener to 'progress' event.
job.on('progress', function(progress, data){
console.log('file "%s" %d%% complete', job.data.url, progress);
})
.on('complete', function(result){
complete++;
console.log('File "%s" downloaded.', job.data.url);
if(complete === filesToDownload){
process.exit(0);
}
})
.removeOnComplete(true)
.save()

You can find complete code in steps/tracking folder of the git repository. But what if an application starts a new task and then we restart the application? In such a case, we have no event listeners and can't get them because we have no job object. The solution to this is listening for events on the queue instead of the job object. Code will look like this:

queue.on('job progress', function(id, progress){
kue.Job.get(id, function(err, job){
console.log('file "%s" %d%% complete', job.data.url, progress);
});
}).on('job complete', function(id, result){
kue.Job.get(id, function(err, job){
complete++;
console.log('File "%s" downloaded.', job.data.url);
job.remove(function(err){
if (err) throw err;
console.log('removed completed job #%d', job.id);
});
if(complete === filesToDownload){
process.exit(0);
}
});
});

function createJob(url){
var job = queue.create('download', {
url: url
}).save();
}

Note, that we removed removeOnComplete(true) to prevent automatic removing of jobs. Otherwise, we'll get an error on getting a job in 'job complete' handler.

We this approach we can continue to track progress after a restart. But what will happen with jobs which are finished between restarts? There are two issues to deal with: we don't know that job is done to report to a user, and we have a "lost" finished job in the queue which eats Redis memory. To clean up completed task we select jobs by status and do what's needed:

//cleanup jobs, which are finished before client started
kue.Job.rangeByState('complete', 0, 1000, 'asc', function( err, jobs ) {
jobs.forEach( function( job ) {
job.remove( function(){
console.log('File "%s" downloaded.', job.data.url);
console.log( 'removed ', job.id );
});
});
});

So now we can listen for complete events to report progress and completion status to the user. And we can catch up if we missed something.

What to learn next

All the best games are easy to learn and difficult to master

-- Nolan Bushnell

Distributed job processing is easy to start but has a notable amount of edge cases. Here is a list of some topics, not covered in this tutorial, which will be useful in complex systems:

  • How to restart task
  • Collecting and displaying of statistics
  • How to take out nodes for maintenance
  • Job priorities

Take a look at Kue documentation for better understanding what it's capable of.