Celery¶
Note
The following documentation is deprecated. The approved installation is via Docker.
Celery is a task queue powered by RabbitMQ. You can use it for anything that doesn’t need to complete in the current request-response cycle. Or use it wherever Les tells you to use it.
For example, each addon has a current_version
cached property. This query
on initial run causes strain on our database. We can create a denormalized
database field called current_version
on the addons
table.
We’ll need to populate regularly so it has fairly up-to-date data. We can do this in a process outside the request-response cycle. This is where Celery comes in.
Installation¶
RabbitMQ¶
Celery depends on RabbitMQ. If you use homebrew
you can install this:
brew install rabbitmq
Setting up rabbitmq invovles some configuration. You may want to define the following
# On a Mac, you can find this in System Preferences > Sharing
export HOSTNAME='<laptop name>.local'
Then run the following commands:
# Set your host up so it's semi-permanent
sudo scutil --set HostName $HOSTNAME
# Update your hosts by either:
# 1) Manually editing /etc/hosts
# 2) `echo 127.0.0.1 $HOSTNAME >> /etc/hosts`
# RabbitMQ insists on writing to /var
sudo rabbitmq-server -detached
# Setup rabitty things (sudo is required to read the cookie file)
sudo rabbitmqctl add_user olympia olympia
sudo rabbitmqctl add_vhost olympia
sudo rabbitmqctl set_permissions -p olympia olympia ".*" ".*" ".*"
Back in safe and happy django-land you should be able to run:
celery -A olympia worker -E
Celery understands python and any tasks that you have defined in your app are now runnable asynchronously.
Celery Tasks¶
Any python function can be set as a celery task. For example, let’s say we want
to update our current_version
but we don’t care how quickly it happens, just
that it happens. We can define it like so:
@task(rate_limit='2/m')
def _update_addons_current_version(data, **kw):
task_log.debug("[%s@%s] Updating addons current_versions." %
(len(data), _update_addons_current_version.rate_limit))
for pk in data:
try:
addon = Addon.objects.get(pk=pk)
addon.update_version()
except Addon.DoesNotExist:
task_log.debug("Missing addon: %d" % pk)
@task
is a decorator for Celery to find our tasks. We can specify a
rate_limit
like 2/m
which means celery
will only run this command
2 times a minute at most. This keeps write-heavy tasks from killing your
database.
If we run this command like so:
from celery.task.sets import TaskSet
all_pks = Addon.objects.all().values_list('pk', flat=True)
ts = [_update_addons_current_version.subtask(args=[pks])
for pks in amo.utils.chunked(all_pks, 300)]
TaskSet(ts).apply_async()
All the Addons with ids in pks
will (eventually) have their
current_versions
updated.
Cron Jobs¶
This is all good, but let’s automate this. In Olympia we can create cron jobs like so:
@cronjobs.register
def update_addons_current_version():
"""Update the current_version field of the addons."""
d = Addon.objects.valid().exclude(
type=amo.ADDON_PERSONA).values_list('id', flat=True)
with establish_connection() as conn:
for chunk in chunked(d, 1000):
print chunk
_update_addons_current_version.apply_async(args=[chunk],
connection=conn)
This job will hit all the addons and run the task we defined in small batches of 1000.
We’ll need to add this to both the prod
and preview
crontabs so that
they can be run in production.
Better than Cron¶
Of course, cron is old school. We want to do better than cron, or at least not rely on brute force tactics.
For a surgical strike, we can call _update_addons_current_version
any time
we add a new version to that addon. Celery will execute it at the prescribed
rate, and your data will be updated … eventually.
During Development¶
celery
only knows about code as it was defined at instantiation time. If
you change your @task
function, you’ll need to HUP
the process.
However, if you’ve got the @task
running perfectly you can tweak all the
code, including cron jobs that call it without need of restart.