Celery/RabbitMQ Notes on Task Timings

celery, django, rabbitmq

The following notes apply when using Celery with the RabbitMQ broker.

Celery Settings

task_acks_late

The Celery setting task_acks_late (by default disabled), if set, will defer message ACK with RabbitMQ until the task completes successfully.

  • If it is enabled, and the Celery task takes too long (see consumer_timeout below), the message will be requeued and redelivered to the next worker. This will cause the message to be processed multiple times.
  • If it is disabled, then Celery will ACK the message to RabbitMQ as soon as the task starts. This tells RabbitMQ that the message was “delivered and processed,” and RabbitMQ will delete this message from the queue. This will cause the message to be lost if the Celery task was interrupted before it finishes.

task_acks_on_failure_or_timeout

Then there is task_acks_on_failure_or_timeout (by default enabled). According to the doc, this will ACK the message even if the task failed or timed out. This may or may not be the correct choice depending on the task.

RabbitMQ Settings

consumer_timeout

The RabbitMQ consumer_timeout configuration (by default 30 minutes) gives a task a maximum timeout before requeuing the message. If a client (a Celery task in this case) does not ACK the message before this timeout expires, the message will be requeued and redelivered.

TTL / message TTL

Furthermore, the TTL / message TTL configuration (by default ??) determines how long a message will stay in RabbitMQ before being discarded. If a message stays in a queue for longer than this time, then it is removed from the queue.

Implications

To increase the chance that tasks are not aborted/lost due to restarts (e.g. deployments):

  • Design each task to finish as soon as possible. Spawning additional smaller tasks if necessary.
  • The entire task is designed to survive a partial completion state and is able to completely restart without ending up in a bad state (e.g. duplicate records created, abandoned/orphaned/partial updates).
  • Enable task_acks_late so that RabbitMQ ACKs are not sent until tasks are finished.
  • Extend the TTL / message TTL for RabbitMQ so that we have enough time to consume messages (in case the Celery task needs to be paused for some time to fix bugs).

There is no support to retry failed tasks. In fact, unless there is a Result Backend set up, there won’t even be a good way to audit which tasks failed (other than application logs).

Celery with RabbitMQ on Docker

celery, django, docker, programming, Python

Picking up from Django app template w/ Docker, here are the steps to add Celery to the Django app.

Add RabbitMQ as the message queue

Modify docker-compose.yml to include:

services:
  ...
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "15672:15672"
    expose:
      - "15672"

I use the “3-management” tag so that it includes a management plugin (accessible at http://localhost:15672/). However, simpler tags (e.g. “3” or “latest“) can be used if the management UI is not needed.

Install celery to the Django project

docker-compose run --rm app /bin/bash
...
pip install celery
pip freeze -r requirements.txt > requirements.txt
exit

Rebuild the container with the new packages added by celery.

Add a couple of files to set up Celery in our Django project

The Celery stuff will be added into the myapp Django app.

myapp/celery.py

from celery import Celery


app = Celery(
    'celery_app',
    broker='amqp://rabbitmq',
    # backend='rpc://',

    # This should include modules/files that define tasks. This is a list of strs 
    # to be evaluated later in order to get around circular dependencies, I suspect.
    include=[  
        'myapp.tasks',  # This is our file containing our task
    ]
)

# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600)


if __name__ == '__main__':
    app.start()

myapp/tasks.py

import logging

from myapp.celery import app


logger = logging.getLogger(__name__)


# This is where Celery tasks are defined


@app.task
def add(x: int, y: int) -> int:
    logger.info(f"add({x}, {y}) called.")
    return x + y

Add a Celery service to docker-compose.yml

Modify docker-compose.yml again to add a Celery service. This can be done together with the RabbitMQ service above, but it is shown here separately for readability.

services:
  ...
  rabbitmq:
    ...
  app-celery:
    build: .
    environment:
    - DJANGO_SETTINGS_MODULE=myapp.settings
    command: >
      sh -c "celery -A myapp.celery worker --loglevel=INFO"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started

Things to watch out for

A bunch of things to highlight to show where the connection points are:

  • The broker URL when instantiating the Celery app is amqp://rabbitmq (not amqp://localhost) because that’s how networking in Docker works. The “rabbitmq” in this case the name of the service we use for the RabbitMQ container. So if a different container name is used, this AMQP URL needs to use that corresponding name.
  • The Celery app parameter (-A myapp.celery) is the path to the myapp/celery.py file where the Celery app (app = Celery('celery_app', ...) ) is created.
  • Speaking of which, when defining the Celery app, its include=[ ... ] should include str values that point to files where Celery tasks are defined.
  • And the task files that define the Celery tasks need to import the Celery app and use its @app.task decorator for the task functions.

Complete docker-compose.yml

The entire file looks like:

services:
  app:
    build: .
    command: >
      sh -c "python manage.py migrate &&
        python manage.py runserver 0.0.0.0:8000"
    ports:
      - "8000:8000"
    expose:
      - "8000"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started
    tty: true
    stdin_open: true
  app-celery:
    build: .
    command: >
      sh -c "celery -A myapp.celery worker --loglevel=INFO"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "15672:15672"
    expose:
      - "15672"

Sentry on Django

django, programming, Python, sentry

Sentry is one of those things that most companies use, but most people (or maybe just I) don’t really know how to use it. The way I learn these things is to set it up on my own.

Fortunately, they have a way for anyone to start with a free account to play with.

Register for an account

Just go to the https://sentry.io and click their “GET STARTED” button. Fill out the info and create an Account. By default I got a free account this way. I don’t know what limitations it has, but to just test things, I don’t care.

Create a Project

Once the account is set up, log in and create a new Project. There is a collection of SDKs to select from. Fortunately, there is one for Django.

Selecting it and giving the project a name (replacing the default “python-django” name) brings me to a very useful page where I can copy and paste the blurb to paste into my Django settings.py file:

The “dsn” URL is specific to the project and account. Think of that as the “API key” to the project. That’s why I redacted mine from the pic.

As seen in that “Verify” section, this will hook in an exception handler into the app to catch and log to Sentry any uncaught exceptions.

What about Raven?

The code from work also installed the raven package. However, looking at the docs, it seems that the raven is deprecated and replaced by sentry-sdk.

However, if Raven is used in a Django project, then a logger handler can be connected to handle logs.

Hooking Up Sentry to Error Logger

My workplace, for instance, has Raven installed and set it up so that any errors logged will also go into Sentry via a handler from the Raven package. This is done by configuring the LOGGING setting:

LOGGING = {
    ...
    "handlers": {
        ...
        "sentry": {
            "class": "raven.contrib.django.raven_compat.handlers.SentryHandler",
            "level": "ERROR",
            ...
        },
    },
    ...
}

Just make sure that the “sentryhandler is invoked by a logger.

NOTE: This is not necessary when using sentry-sdk.

Extra Data

There is a section in the event where additional “extra” data can be added. The values can be:

  • simple strings
  • “object” values

Adding extra data to logging or errors and exceptions is simple: just include the “extra” property:

import logging

logger = logging.getLogger(__name__)
...

    logger.error(
        "Hello", extra={
            "prop1": "property one",
            "prop2": "property two",
            "prop3": {
                "prop4": "property four",
                "now": datetime.utcnow(),
            }
        }
    )

From the above, the properties prop1 and prop2 are normal strings. prop3 is an object which is serialized (bonus: it works with datetime instances, unlike json.puts()).

One limitation of these extra values is that they are not searchable.

Searching for Events: Custom Tags

The Sentry console allows some searching with a list of default searchable properties. It seems like the only way to work with search for our own data is to use Custom Tags. For Python/Django, the easiest way to do that is to set up a scope, then set the tag(s) before calling Sentry:

import logging

from sentry_sdk import push_scope, set_tag
...

logger = logging.getLogger(__name__)

...

with push_scope() as scope:
    set_tag("VanTag", "Hello1")  # This will add a VanTag:Hello1 to the event 
    logger.error("Test Error")

With the above, the event raised will now have a VanTag tag of Hello1.

As a result, you can now search for issues/events with this tag:

Grouping Control: Custom Fingerprints

An issue that annoys probably every Sentry user one time or another is how it groups (or ungroups) issues. The documentation says a lot of stuff, but the summary is pretty much, “We have a super algorithm. Trust us.”

Well. When it does not do what we want. One way to influence the grouping is to set the scope fingerprints before calling Sentry.

        with push_scope() as scope:

            scope.fingerprint = ["Hello1"]  # list of strings make the fingerprint
            logger.error("Appointment Creation Error")

When the event is sent to Sentry, it will be grouped by the fingerprint (list of strings).

  • Note: PyCharm mistakenly says that scope.fingerprint is a read-only attribute. It’s not; the code above will work as expected.