Python celery

From ArchWiki

Quoting authors of the project:

Celery is "an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well. (...) Tasks can execute asynchronously (in the background) or synchronously (wait until ready)."

Installation

Install the package python-celery. As with most python-based packages you get a package compatible with Python 3.x.

Quoting Celery documentation: "Celery requires a solution to send and receive messages" - one of the options is rabbitmq which also can be installed from official repositories.

Configuration

Celery

For configuration files, the directory /etc/celery/ needs to be created with a configuration file named app.conf where app is the name of your application. An example configuration file is provided within Celery documentation.

Start/enable the celery@app.service.

To run celery in a virtualenv, make a copy of celery@.service in /etc/systemd/system so you can customize it, and change the paths of the celery binary to the copy in your virtualenv.

RabbitMQ

RabbitMQ stores its configuration within /etc/rabbitmq/rabbitmq-env.conf

/etc/rabbitmq/rabbitmq-env.conf
NODENAME=rabbit@rakieta
NODE_IP_ADDRESS=0.0.0.0
NODE_PORT=5672
    
LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia

You probably want to replace 0.0.0.0 with 127.0.0.1, RabbitMQ does not support Unix sockets.

For simple configurations, you may also want to add HOME=/var/lib/rabbitmq. Read more about environmental variables within RabbitMQ docs

Start/enable rabbitmq.service.

Note: rabbitmq-service is being started as rabbitmq user with home folder stored within /var/lib/rabbitmq - you may want to make sure rabbitmq user owns this folder and all subfolders

Follow RabbitMQ documentation and add your user and virtual host:

$ cd /var/lib/rabbitmq
$ su rabbitmq -c 'rabbitmqctl add_user myuser mypassword'
$ su rabbitmq -c 'rabbitmqctl add_vhost myvhost'
$ su rabbitmq -c 'rabbitmqctl set_user_tags myuser mytag'
$ su rabbitmq -c 'rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"'

Read the RabbitMQ admin guide to understand the above.

If issuing su rabbitmq -c "rabbitmqctl status" results in badrpc,nodedown visit this blog post for more information how to fix the problem.

Note: You may also want to run su rabbitmq -c "erl" and as a result you should get an erlang prompt with no errors

Security

You may want to read a security section from relevant Celery documentation

Example task

Celery application

Follow Celery documentation to create a python sample task:

test.py
from celery import Celery
    
    app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')
    
    @app.task
    def add(x, y):
        return x + y

amqp://myuser:mypassword@localhost:5672/myvhost

Use the same credentials/vhost you have created when configuring RabbitMQ

backend='amqp' - this parameter is optional since RabbitMQ is the default broker utilised by celery.

Test run

While in the same directory as your test.py you can run:

$ celery -A task worker --loglevel=info

Then from another console (but within same directory) create:

call.py
from test import add
    
    add.delay(4, 4)

Run it:

$ python call.py

First, the console should log some information suggesting worker was called:

Received task: task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e]
Task task.add[f4aff99a-7477-44db-9f6e-7e0f9342cd4e] succeeded in 0.0007182330009527504s: 8

Prepare module for Celery service

Procedure below is slightly different than what you will find within Celery documentation

To make the test_task module as root, create first the /lib/python3.5/site-packages/test_task directory, a blank /lib/python3.5/site-packages/test_task/__init__.py and the following files should be created inside of it:

/lib/python3.5/site-packages/test_task/celery.py
from __future__ import absolute_import

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://myuser:mypassword@localhost:5672/myvhost')

if __name__ == '__main__':
 app.start()
/lib/python3.5/site-packages/test_task/test_task.py
from __future__ import absolute_import

from test_task.celery import app

@app.task
def add(x, y):
 return x + y

At this point if you issue python in your console you should be able to issue following without any error:

>>> from test_task import celery

In /etc/celery/celery.conf replace:

CELERY_APP="proj"

with the following line:

CELERY_APP="test_task"

Restart the celery@celery.service.

Run tasks periodically

Tasks can be ran periodicaly through Celery Beat, basic setup is described within relevant Celery documentation pages. An example:

If you want to specify CELERYBEAT_SCHEDULE within your celery.py, then you need to add the app.conf prefix to make celery recognise your scheduled tasks. After that you need to add the --beat --schedule=/var/lib/celery/celerybeat-schedule parameters when you start the celery daemon. Further, the /var/lib/celery directory must exist within the celery-relevant environment and be owned by the user that runs celery.