Celery/RabbitMQ Notes on Task Timings

celery, django, rabbitmq

The following notes apply when using Celery with the RabbitMQ broker.

Celery Settings

task_acks_late

The Celery setting task_acks_late (by default disabled), if set, will defer message ACK with RabbitMQ until the task completes successfully.

  • If it is enabled, and the Celery task takes too long (see consumer_timeout below), the message will be requeued and redelivered to the next worker. This will cause the message to be processed multiple times.
  • If it is disabled, then Celery will ACK the message to RabbitMQ as soon as the task starts. This tells RabbitMQ that the message was “delivered and processed,” and RabbitMQ will delete this message from the queue. This will cause the message to be lost if the Celery task was interrupted before it finishes.

task_acks_on_failure_or_timeout

Then there is task_acks_on_failure_or_timeout (by default enabled). According to the doc, this will ACK the message even if the task failed or timed out. This may or may not be the correct choice depending on the task.

RabbitMQ Settings

consumer_timeout

The RabbitMQ consumer_timeout configuration (by default 30 minutes) gives a task a maximum timeout before requeuing the message. If a client (a Celery task in this case) does not ACK the message before this timeout expires, the message will be requeued and redelivered.

TTL / message TTL

Furthermore, the TTL / message TTL configuration (by default ??) determines how long a message will stay in RabbitMQ before being discarded. If a message stays in a queue for longer than this time, then it is removed from the queue.

Implications

To increase the chance that tasks are not aborted/lost due to restarts (e.g. deployments):

  • Design each task to finish as soon as possible. Spawning additional smaller tasks if necessary.
  • The entire task is designed to survive a partial completion state and is able to completely restart without ending up in a bad state (e.g. duplicate records created, abandoned/orphaned/partial updates).
  • Enable task_acks_late so that RabbitMQ ACKs are not sent until tasks are finished.
  • Extend the TTL / message TTL for RabbitMQ so that we have enough time to consume messages (in case the Celery task needs to be paused for some time to fix bugs).

There is no support to retry failed tasks. In fact, unless there is a Result Backend set up, there won’t even be a good way to audit which tasks failed (other than application logs).

Celery with RabbitMQ on Docker

celery, django, docker, programming, Python

Picking up from Django app template w/ Docker, here are the steps to add Celery to the Django app.

Add RabbitMQ as the message queue

Modify docker-compose.yml to include:

services:
  ...
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "15672:15672"
    expose:
      - "15672"

I use the “3-management” tag so that it includes a management plugin (accessible at http://localhost:15672/). However, simpler tags (e.g. “3” or “latest“) can be used if the management UI is not needed.

Install celery to the Django project

docker-compose run --rm app /bin/bash
...
pip install celery
pip freeze -r requirements.txt > requirements.txt
exit

Rebuild the container with the new packages added by celery.

Add a couple of files to set up Celery in our Django project

The Celery stuff will be added into the myapp Django app.

myapp/celery.py

from celery import Celery


app = Celery(
    'celery_app',
    broker='amqp://rabbitmq',
    # backend='rpc://',

    # This should include modules/files that define tasks. This is a list of strs 
    # to be evaluated later in order to get around circular dependencies, I suspect.
    include=[  
        'myapp.tasks',  # This is our file containing our task
    ]
)

# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600)


if __name__ == '__main__':
    app.start()

myapp/tasks.py

import logging

from myapp.celery import app


logger = logging.getLogger(__name__)


# This is where Celery tasks are defined


@app.task
def add(x: int, y: int) -> int:
    logger.info(f"add({x}, {y}) called.")
    return x + y

Add a Celery service to docker-compose.yml

Modify docker-compose.yml again to add a Celery service. This can be done together with the RabbitMQ service above, but it is shown here separately for readability.

services:
  ...
  rabbitmq:
    ...
  app-celery:
    build: .
    environment:
    - DJANGO_SETTINGS_MODULE=myapp.settings
    command: >
      sh -c "celery -A myapp.celery worker --loglevel=INFO"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started

Things to watch out for

A bunch of things to highlight to show where the connection points are:

  • The broker URL when instantiating the Celery app is amqp://rabbitmq (not amqp://localhost) because that’s how networking in Docker works. The “rabbitmq” in this case the name of the service we use for the RabbitMQ container. So if a different container name is used, this AMQP URL needs to use that corresponding name.
  • The Celery app parameter (-A myapp.celery) is the path to the myapp/celery.py file where the Celery app (app = Celery('celery_app', ...) ) is created.
  • Speaking of which, when defining the Celery app, its include=[ ... ] should include str values that point to files where Celery tasks are defined.
  • And the task files that define the Celery tasks need to import the Celery app and use its @app.task decorator for the task functions.

Complete docker-compose.yml

The entire file looks like:

services:
  app:
    build: .
    command: >
      sh -c "python manage.py migrate &&
        python manage.py runserver 0.0.0.0:8000"
    ports:
      - "8000:8000"
    expose:
      - "8000"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started
    tty: true
    stdin_open: true
  app-celery:
    build: .
    command: >
      sh -c "celery -A myapp.celery worker --loglevel=INFO"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "15672:15672"
    expose:
      - "15672"