Celery/RabbitMQ Notes on Task Timings

celery, django, rabbitmq

The following notes apply when using Celery with the RabbitMQ broker.

Celery Settings

task_acks_late

The Celery setting task_acks_late (by default disabled), if set, will defer message ACK with RabbitMQ until the task completes successfully.

  • If it is enabled, and the Celery task takes too long (see consumer_timeout below), the message will be requeued and redelivered to the next worker. This will cause the message to be processed multiple times.
  • If it is disabled, then Celery will ACK the message to RabbitMQ as soon as the task starts. This tells RabbitMQ that the message was “delivered and processed,” and RabbitMQ will delete this message from the queue. This will cause the message to be lost if the Celery task was interrupted before it finishes.

task_acks_on_failure_or_timeout

Then there is task_acks_on_failure_or_timeout (by default enabled). According to the doc, this will ACK the message even if the task failed or timed out. This may or may not be the correct choice depending on the task.

RabbitMQ Settings

consumer_timeout

The RabbitMQ consumer_timeout configuration (by default 30 minutes) gives a task a maximum timeout before requeuing the message. If a client (a Celery task in this case) does not ACK the message before this timeout expires, the message will be requeued and redelivered.

TTL / message TTL

Furthermore, the TTL / message TTL configuration (by default ??) determines how long a message will stay in RabbitMQ before being discarded. If a message stays in a queue for longer than this time, then it is removed from the queue.

Implications

To increase the chance that tasks are not aborted/lost due to restarts (e.g. deployments):

  • Design each task to finish as soon as possible. Spawning additional smaller tasks if necessary.
  • The entire task is designed to survive a partial completion state and is able to completely restart without ending up in a bad state (e.g. duplicate records created, abandoned/orphaned/partial updates).
  • Enable task_acks_late so that RabbitMQ ACKs are not sent until tasks are finished.
  • Extend the TTL / message TTL for RabbitMQ so that we have enough time to consume messages (in case the Celery task needs to be paused for some time to fix bugs).

There is no support to retry failed tasks. In fact, unless there is a Result Backend set up, there won’t even be a good way to audit which tasks failed (other than application logs).

Celery with RabbitMQ on Docker

celery, django, docker, programming, Python

Picking up from Django app template w/ Docker, here are the steps to add Celery to the Django app.

Add RabbitMQ as the message queue

Modify docker-compose.yml to include:

services:
  ...
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "15672:15672"
    expose:
      - "15672"

I use the “3-management” tag so that it includes a management plugin (accessible at http://localhost:15672/). However, simpler tags (e.g. “3” or “latest“) can be used if the management UI is not needed.

Install celery to the Django project

docker-compose run --rm app /bin/bash
...
pip install celery
pip freeze -r requirements.txt > requirements.txt
exit

Rebuild the container with the new packages added by celery.

Add a couple of files to set up Celery in our Django project

The Celery stuff will be added into the myapp Django app.

myapp/celery.py

from celery import Celery


app = Celery(
    'celery_app',
    broker='amqp://rabbitmq',
    # backend='rpc://',

    # This should include modules/files that define tasks. This is a list of strs 
    # to be evaluated later in order to get around circular dependencies, I suspect.
    include=[  
        'myapp.tasks',  # This is our file containing our task
    ]
)

# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600)


if __name__ == '__main__':
    app.start()

myapp/tasks.py

import logging

from myapp.celery import app


logger = logging.getLogger(__name__)


# This is where Celery tasks are defined


@app.task
def add(x: int, y: int) -> int:
    logger.info(f"add({x}, {y}) called.")
    return x + y

Add a Celery service to docker-compose.yml

Modify docker-compose.yml again to add a Celery service. This can be done together with the RabbitMQ service above, but it is shown here separately for readability.

services:
  ...
  rabbitmq:
    ...
  app-celery:
    build: .
    environment:
    - DJANGO_SETTINGS_MODULE=myapp.settings
    command: >
      sh -c "celery -A myapp.celery worker --loglevel=INFO"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started

Things to watch out for

A bunch of things to highlight to show where the connection points are:

  • The broker URL when instantiating the Celery app is amqp://rabbitmq (not amqp://localhost) because that’s how networking in Docker works. The “rabbitmq” in this case the name of the service we use for the RabbitMQ container. So if a different container name is used, this AMQP URL needs to use that corresponding name.
  • The Celery app parameter (-A myapp.celery) is the path to the myapp/celery.py file where the Celery app (app = Celery('celery_app', ...) ) is created.
  • Speaking of which, when defining the Celery app, its include=[ ... ] should include str values that point to files where Celery tasks are defined.
  • And the task files that define the Celery tasks need to import the Celery app and use its @app.task decorator for the task functions.

Complete docker-compose.yml

The entire file looks like:

services:
  app:
    build: .
    command: >
      sh -c "python manage.py migrate &&
        python manage.py runserver 0.0.0.0:8000"
    ports:
      - "8000:8000"
    expose:
      - "8000"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started
    tty: true
    stdin_open: true
  app-celery:
    build: .
    command: >
      sh -c "celery -A myapp.celery worker --loglevel=INFO"
    volumes:
      - ./:/code
    depends_on:
      rabbitmq:
        condition: service_started
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "15672:15672"
    expose:
      - "15672"

Sentry on Django

django, programming, Python, sentry

Sentry is one of those things that most companies use, but most people (or maybe just I) don’t really know how to use it. The way I learn these things is to set it up on my own.

Fortunately, they have a way for anyone to start with a free account to play with.

Register for an account

Just go to the https://sentry.io and click their “GET STARTED” button. Fill out the info and create an Account. By default I got a free account this way. I don’t know what limitations it has, but to just test things, I don’t care.

Create a Project

Once the account is set up, log in and create a new Project. There is a collection of SDKs to select from. Fortunately, there is one for Django.

Selecting it and giving the project a name (replacing the default “python-django” name) brings me to a very useful page where I can copy and paste the blurb to paste into my Django settings.py file:

The “dsn” URL is specific to the project and account. Think of that as the “API key” to the project. That’s why I redacted mine from the pic.

As seen in that “Verify” section, this will hook in an exception handler into the app to catch and log to Sentry any uncaught exceptions.

What about Raven?

The code from work also installed the raven package. However, looking at the docs, it seems that the raven is deprecated and replaced by sentry-sdk.

However, if Raven is used in a Django project, then a logger handler can be connected to handle logs.

Hooking Up Sentry to Error Logger

My workplace, for instance, has Raven installed and set it up so that any errors logged will also go into Sentry via a handler from the Raven package. This is done by configuring the LOGGING setting:

LOGGING = {
    ...
    "handlers": {
        ...
        "sentry": {
            "class": "raven.contrib.django.raven_compat.handlers.SentryHandler",
            "level": "ERROR",
            ...
        },
    },
    ...
}

Just make sure that the “sentryhandler is invoked by a logger.

NOTE: This is not necessary when using sentry-sdk.

Extra Data

There is a section in the event where additional “extra” data can be added. The values can be:

  • simple strings
  • “object” values

Adding extra data to logging or errors and exceptions is simple: just include the “extra” property:

import logging

logger = logging.getLogger(__name__)
...

    logger.error(
        "Hello", extra={
            "prop1": "property one",
            "prop2": "property two",
            "prop3": {
                "prop4": "property four",
                "now": datetime.utcnow(),
            }
        }
    )

From the above, the properties prop1 and prop2 are normal strings. prop3 is an object which is serialized (bonus: it works with datetime instances, unlike json.puts()).

One limitation of these extra values is that they are not searchable.

Searching for Events: Custom Tags

The Sentry console allows some searching with a list of default searchable properties. It seems like the only way to work with search for our own data is to use Custom Tags. For Python/Django, the easiest way to do that is to set up a scope, then set the tag(s) before calling Sentry:

import logging

from sentry_sdk import push_scope, set_tag
...

logger = logging.getLogger(__name__)

...

with push_scope() as scope:
    set_tag("VanTag", "Hello1")  # This will add a VanTag:Hello1 to the event 
    logger.error("Test Error")

With the above, the event raised will now have a VanTag tag of Hello1.

As a result, you can now search for issues/events with this tag:

Grouping Control: Custom Fingerprints

An issue that annoys probably every Sentry user one time or another is how it groups (or ungroups) issues. The documentation says a lot of stuff, but the summary is pretty much, “We have a super algorithm. Trust us.”

Well. When it does not do what we want. One way to influence the grouping is to set the scope fingerprints before calling Sentry.

        with push_scope() as scope:

            scope.fingerprint = ["Hello1"]  # list of strings make the fingerprint
            logger.error("Appointment Creation Error")

When the event is sent to Sentry, it will be grouped by the fingerprint (list of strings).

  • Note: PyCharm mistakenly says that scope.fingerprint is a read-only attribute. It’s not; the code above will work as expected.

Django app template w/ Docker

django, docker, programming, Python

Revisiting https://www.pn.therealvan.com/2021/01/24/postgresql-and-mysql-docker-containers/, this post focuses on a plain Django app with minimal dependencies:

  • exclude pipenv
  • using the default SQLite DB

Bootstrapping

Start with these files:

Dockerfile

FROM python:3
ENV PYTHONUNBUFFERED 1

WORKDIR /code
#COPY requirements.txt /code/

#RUN pip install --upgrade pip && pip install -r requirements.txt

docker-compose.yml

version: '3'
services:
  app:
    build: .
    #command: >
    #  sh -c "python manage.py migrate &&
    #    python manage.py runserver 0.0.0.0:8000"
    ports:
      - "8000:8000"
    expose:
      - "8000"
    volumes:
      - ./:/code
    tty: true
    stdin_open: true

Run these to start up a container:

docker-compose build
docker-compose run --rm app /bin/bash

Initializing a Django project

Run these inside the container:

pip install django
pip freeze > requirements.txt

django-admin startproject myproj .
django-admin startapp myapp

exit

Rebuild and start the app

Now uncomment the lines in Dockerfile and docker-compose.yml.

Build the image and restart the container:

docker-compose build

Modify myproj/settings.py to add a line to register myapp to Django:

INSTALLED_APPS = [
    ...
    'myapp.apps.MyappConfig',  # Add this line
]

Now start the app again:

docker-compose up app

This should now bring up the app listening to http://localhost:8000/

Wagtail in Docker

django, docker, programming, Python, Windows

Over the weekend I found out about a CMS software written on top of Django called Wagtail.

The installation instructions promise an easy path. Just run:

pip install wagtail

Since I guard my global pip pretty carefully in order to reduce version collisions and whatnot, my first inclination was to see if I can test this from within a Docker container.

To start the experiment within a Docker container using Python, I start with:

mkdir wagtail
cd wagtail

docker run -it --rm \
 --mount type=bind,source="%CD%",target=/wagtail \
 python:3.7-buster /bin/bash

I use “%CD%” because I’m on Windows. If I were on Linux/Mac, I suppose it would be something like “${PWD}” instead.

So once in, I install and initialize per the instructions from Getting started — Wagtail Documentation 3.0.1 documentation:

cd /wagtail
pip install wagtail
wagtail start hahaha 

Interestingly, that wagtail start command, in addition to generating a bunch of files typical for a Django app, also created a Dockerfile.

So maybe the installer may already have some Docker support in mind?

Reading through it and spending more time than I expected to get to work, here are the steps to:

  • Get a Wagtail project set up from scratch using Docker without having to install any Python on the host.
  • Use a volume from the host for the app AND the SQLite DB so that they can be conveniently backed up and transferred (e.g. pushed up to some code repository like Github or SVN).

Generate a new Wagtail project

Pretty much what I had above:

mkdir wagtail
cd wagtail

docker run -it --rm \
 --mount type=bind,source="%CD%",target=/wagtail \
 python:3.7-buster /bin/bash
cd /wagtail
pip install wagtail
wagtail start hahaha
exit

I’m using “hahaha” as my project name. Substitute as needed. Also: %CD% should be ${PWD} for Linux/Mac.

Build the Image

cd hahaha
docker build -t hahaha . 

Fix up file permissions (Windows only)

Since the app should be run by the user wagtail:wagtail, fix up the permissions on the files.

For some reason this is not done correctly for Windows despite the command in Dockerfile, so a manual step is required:

docker run -it --rm \
 --mount type=bind,source="%CD%",target=/app \
 --user root hahaha /bin/bash

chown -R wagtail:wagtail .
exit

Set Up the App

This is typical Django setup stuff:

docker run -it --rm \
 --mount type=bind,source="%CD%",target=/app \
 hahaha /bin/bash
python manage.py migrate
python manage.py createsuperuser
exit

The above will create the db.sqlite3 file to be used for the app and also set up an admin user to be used to sign into the app.

Run The App

Finally, to run the app:

docker run -it --rm -p 8000:8000 \
 --mount type=bind,source="%CD%",target=/app hahaha

The -it --rm arguments are optional, but they help in stopping and cleaning up the container during development.

The site can be accessed at, of course, http://localhost:8000/. To manage it, use the superuser created earlier to get into the Admin Interface.

Worker Timeout

And now to actually start playing around with Wagtail….

So far I’m seeing a lot of errors when requesting pages in the Admin site. They look like this:

[2022-08-01 01:48:15 +0000] [10] [CRITICAL] WORKER TIMEOUT (pid:12)
[2022-08-01 01:48:15 +0000] [12] [INFO] Worker exiting (pid: 12)
[2022-08-01 01:48:15 +0000] [13] [INFO] Booting worker with pid: 13
[2022-08-01 01:48:58 +0000] [10] [CRITICAL] WORKER TIMEOUT (pid:13)
[2022-08-01 01:48:58 +0000] [13] [INFO] Worker exiting (pid: 13)
[2022-08-01 01:48:58 +0000] [14] [INFO] Booting worker with pid: 14
[2022-08-01 01:49:56 +0000] [10] [CRITICAL] WORKER TIMEOUT (pid:14)
[2022-08-01 01:49:56 +0000] [14] [INFO] Worker exiting (pid: 14)
[2022-08-01 01:49:56 +0000] [15] [INFO] Booting worker with pid: 15
[2022-08-01 01:50:34 +0000] [10] [CRITICAL] WORKER TIMEOUT (pid:15)
[2022-08-01 01:50:34 +0000] [15] [INFO] Worker exiting (pid: 15)
[2022-08-01 01:50:34 +0000] [16] [INFO] Booting worker with pid: 16