Background jobs

CKAN allows you to create jobs that run in the ‘background’, i.e. asynchronously and without blocking the main application. Such jobs can be created in Extensions or in core CKAN.

Background jobs can be essential to providing certain kinds of functionality, for example:

  • Creating web-hooks that notify other services when certain changes occur (for example a dataset is updated)

  • Performing processing or validation or on data (as done by the Archiver and DataStorer Extensions)

Basically, any piece of work that takes too long to perform while the main application is waiting is a good candidate for a background job.

Note

The current background job system is based on RQ and was introduced in CKAN 2.7. See Migrating from CKAN’s previous background job system for details on how to migrate your jobs from the previous system introduced in CKAN 1.5.

Writing and enqueuing background jobs

Note

This section is only relevant for developers working on CKAN or an extension.

The core of a background job is a regular Python function. For example, here’s a very simply job function that logs a message:

import logging

def log_job(msg, level=logging.INFO, logger=u'ckan'):
    u'''
    Background job to log a message.
    '''
    logger = logging.getLogger(logger)
    logger.log(level, msg)

And that’s it. Your job function can use all the usual Python features. Just keep in mind that your function will be run in a separate process by a worker, so your function should not depend on the current state of global variables, etc. Ideally your job function should receive all the information it needs via its arguments.

In addition, the module that contains your job function must be importable by the worker, which must also be able to get the function from its module. This means that nested functions, lambdas and instance methods cannot be used as job functions. While class methods of top-level classes can be used it’s best to stick to ordinary module-level functions.

Note

Background jobs do not support return values (since they run asynchronously there is no place to return those values to). If your job function produces a result then it needs to store that result, for example in a file or in CKAN’s database.

Once you have a job function, all you need to do is to use ckan.lib.jobs.enqueue to create an actual job out of it:

import ckan.lib.jobs as jobs

jobs.enqueue(log_job, [u'My log message'])

This will place a job on the job queue where it can be picked up and executed by a worker.

Note

Extensions should use ckan.plugins.toolkit.enqueue_job() instead. It’s the same function but accessing it via ckan.plugins.toolkit decouples your code from CKAN’s internal structure.

The first argument to enqueue is the job function to use. The second is a list of the arguments which should be passed to the function. You can omit it in which case no arguments will be passed. You can also pass keyword arguments in a dict as the third argument:

jobs.enqueue(log_job, [u'My log message'], {u'logger': u'ckanext.foo'})

You can also give the job a title which can be useful for identifying it when managing the job queue:

jobs.enqueue(log_job, [u'My log message'], title=u'My log job')

A timeout can also be set on a job iwth the timeout keyword argument:

jobs.enqueue(log_job, [u'My log message'], rq_kwargs={"timeout": 3600})

The default background job timeout is 180 seconds. This is set in the ckan config .ini file under the ckan.jobs.timeout item.

Accessing the database from background jobs

Code running in a background job can access the CKAN database like any other CKAN code.

In particular, using the action functions to modify the database from within a background job is perfectly fine. Just keep in mind that while your job is running in the background, the CKAN main process or other background jobs may also modify the database. Hence a single call to an action function is atomic from your job’s view point, but between multiple calls there may be foreign changes to the database.

Special care has to be taken if your background job needs low-level access to the database, for example to modify SQLAlchemy model instances directly without going through an action function. Each background job runs in a separate process and therefore has its own SQLAlchemy session. Your code has to make sure that the changes it makes are properly contained in transactions and that you refresh your view of the database to receive updates where necessary. For these (and other) reasons it is recommended to use the action functions to interact with the database.

Running background jobs

Jobs are placed on the job queue, from which they can be retrieved and executed. Since jobs are designed to run asynchronously that happens in a separate process called a worker.

After it has been started, a worker listens on the queue until a job is enqueued. The worker then removes the job from the queue and executes it. Afterwards the worker waits again for the next job to be enqueued.

Note

Executed jobs are discarded. In particular, no information about past jobs is kept.

Workers can be started using the Run a background job worker command:

ckan -c /etc/ckan/default/ckan.ini jobs worker

The worker process will run indefinitely (you can stop it using CTRL+C).

Note

You can run multiple workers if your setup uses many or particularly long background jobs.

Using Supervisor

In a production setting, the worker should be run in a more robust way. One possibility is to use Supervisor.

First install Supervisor:

sudo apt-get install supervisor

Next copy the configuration file template:

sudo cp /usr/lib/ckan/default/src/ckan/ckan/config/supervisor-ckan-worker.conf /etc/supervisor/conf.d

Next make sure the /var/log/ckan/ directory exists, if not then it needs to be created:

sudo mkdir /var/log/ckan

Open /etc/supervisor/conf.d/supervisor-ckan-worker.conf in your favourite text editor and make sure all the settings suit your needs. If you installed CKAN in a non-default location (somewhere other than /usr/lib/ckan/default) then you will need to update the paths in the config file (see the comments in the file for details).

Restart Supervisor:

sudo service supervisor restart

The worker should now be running. To check its status, use

sudo supervisorctl status

You can restart the worker via

sudo supervisorctl restart ckan-worker:*

To test that background jobs are processed correctly you can enqueue a test job via

ckan -c |ckan.ini| jobs test

The worker’s log files (/var/log/ckan/ckan-worker.stdout.log and/or /var/log/ckan/ckan-worker.stderr.log) should then show how the job was processed by the worker.

In case you run into problems, make sure to check the logs of Supervisor and the worker:

cat /var/log/supervisor/supervisord.log
cat /var/log/ckan/ckan-worker.stdout.log
cat /var/log/ckan/ckan-worker.sterr.log

Managing background jobs

Once they are enqueued, background jobs can be managed via the ckan command and the web API.

List enqueues jobs

Show details about a job

Cancel a job

A job that hasn’t been processed yet can be canceled via

Clear all enqueued jobs

Logging

Information about enqueued and processed background jobs is automatically logged to the CKAN logs. You may need to update your logging configuration to record messages at the INFO level for the messages to be stored.

Background job queues

By default, all functionality related to background jobs uses a single job queue that is specific to the current CKAN instance. However, in some situations it is useful to have more than one queue. For example, you might want to distinguish between short, urgent jobs and longer, less urgent ones. The urgent jobs should be processed even if a long and less urgent job is already running.

For such scenarios, the job system supports multiple queues. To use a different queue, all you have to do is pass the (arbitrary) queue name. For example, to enqueue a job at a non-default queue:

jobs.enqueue(log_job, [u"I'm from a different queue!"],
             queue=u'my-own-queue')

Similarly, to start a worker that only listens to the queue you just posted a job to:

ckan -c |ckan.ini| jobs worker my-own-queue

See the documentation of the various functions and commands for details on how to use non-standard queues.

Note

If you create a custom queue in your extension then you should prefix the queue name using your extension’s name. See Avoid name clashes.

Queue names are internally automatically prefixed with the CKAN site ID, so multiple parallel CKAN instances are not a problem.

Testing code that uses background jobs

Due to the asynchronous nature of background jobs, code that uses them needs to be handled specially when writing tests.

A common approach is to use the mock package to replace the ckan.plugins.toolkit.enqueue_job function with a mock that executes jobs synchronously instead of asynchronously:

import unittest.mock as mock

from ckan.tests import helpers


def synchronous_enqueue_job(job_func, args=None, kwargs=None, title=None):
    '''
    Synchronous mock for ``ckan.plugins.toolkit.enqueue_job``.
    '''
    args = args or []
    kwargs = kwargs or {}
    job_func(*args, **kwargs)


class TestSomethingWithBackgroundJobs(helpers.FunctionalTestBase):

    @mock.patch('ckan.plugins.toolkit.enqueue_job',
                side_effect=synchronous_enqueue_job)
    def test_something(self, enqueue_job_mock):
        some_function_that_enqueues_a_background_job()
        assert something

Depending on how the function under test calls enqueue_job you might need to adapt where the mock is installed. See mock’s documentation for details.

Migrating from CKAN’s previous background job system

Before version 2.7 (starting from 1.5), CKAN offered a different background job system built around Celery. As of CKAN 2.8, that system is no longer available. You should therefore update your code to use the new system described above.

Migrating existing job functions is easy. In the old system, a job function would look like this:

@celery.task(name=u'my_extension.echofunction')
def echo(message):
    print message

As described above, under the new system the same function would be simply written as

def echo(message):
    print message

There is no need for a special decorator. In the new system there is also no need for registering your tasks via setup.py.

Migrating the code that enqueues a task is also easy. Previously it would look like this:

celery.send_task(u'my_extension.echofunction', args=[u'Hello World'],
                 task_id=str(uuid.uuid4()))

With the new system, it looks as follows:

import ckan.lib.jobs as jobs

jobs.enqueue(ckanext.my_extension.plugin.echo, [u'Hello World'])

As you can see, the new system does not use strings to identify job functions but uses the functions directly instead. There is also no need for creating a job ID, that will be done automatically for you.

Supporting both systems at once

Not all CKAN installations will immediately update to CKAN 2.7. It might therefore make sense for you to support both the new and the old job system. That way you are ready when the old system is removed but can continue to support older CKAN installations.

The easiest way to do that is to use ckanext-rq, which provides a back-port of the new system to older CKAN versions.

If you are unable to use ckanext-rq then you will need to write your code in such a way that it works on both systems. This could looks as follows. First split your Celery-based job functions into the job itself and its Celery handler. That is, change

@celery.task(name=u'my_extension.echofunction')
def echo(message):
    print message

to

def echo(message):
    print message

@celery.task(name=u'my_extension.echofunction')
def echo_celery(*args, **kwargs):
  echo(*args, **kwargs)

That way, you can call echo using the new system and use the name for Celery.

Then use the new system if it is available and fall back to Celery otherwise:

def compat_enqueue(name, fn, args=None):
    u'''
    Enqueue a background job using Celery or RQ.
    '''
    try:
        # Try to use RQ
        from ckan.plugins.toolkit import enqueue_job
        enqueue_job(fn, args=args)
    except ImportError:
        # Fallback to Celery
        import uuid
        from ckan.lib.celery_app import celery
        celery.send_task(name, args=args, task_id=str(uuid.uuid4()))

Use that function as follows for enqueuing a job:

compat_enqueue(u'my_extension.echofunction',
               ckanext.my_extension.plugin.echo,
               [u'Hello World'])