The following notes apply when using Celery with the RabbitMQ broker.
Celery Settings
task_acks_late
The Celery setting task_acks_late (by default disabled), if set, will defer message ACK with RabbitMQ until the task completes successfully.
If it is enabled, and the Celery task takes too long (see consumer_timeout below), the message will be requeued and redelivered to the next worker. This will cause the message to be processed multiple times.
If it is disabled, then Celery will ACK the message to RabbitMQ as soon as the task starts. This tells RabbitMQ that the message was “delivered and processed,” and RabbitMQ will delete this message from the queue. This will cause the message to be lost if the Celery task was interrupted before it finishes.
task_acks_on_failure_or_timeout
Then there is task_acks_on_failure_or_timeout (by default enabled). According to the doc, this will ACK the message even if the task failed or timed out. This may or may not be the correct choice depending on the task.
RabbitMQ Settings
consumer_timeout
The RabbitMQ consumer_timeout configuration (by default 30 minutes) gives a task a maximum timeout before requeuing the message. If a client (a Celery task in this case) does not ACK the message before this timeout expires, the message will be requeued and redelivered.
TTL / message TTL
Furthermore, the TTL / messageTTL configuration (by default ??) determines how long a message will stay in RabbitMQ before being discarded. If a message stays in a queue for longer than this time, then it is removed from the queue.
Implications
To increase the chance that tasks are not aborted/lost due to restarts (e.g. deployments):
Design each task to finish as soon as possible. Spawning additional smaller tasks if necessary.
The entire task is designed to survive a partial completion state and is able to completely restart without ending up in a bad state (e.g. duplicate records created, abandoned/orphaned/partial updates).
Enable task_acks_late so that RabbitMQ ACKs are not sent until tasks are finished.
Extend the TTL / message TTL for RabbitMQ so that we have enough time to consume messages (in case the Celery task needs to be paused for some time to fix bugs).
There is no support to retry failed tasks. In fact, unless there is a Result Backend set up, there won’t even be a good way to audit which tasks failed (other than application logs).
I’m getting into the fray that is Generative AI since, according to some, my job as a programmer will soon be taken over by some literal code cranking machine.
There are a few things to set up:
conda — (optional) “an environment and package manager” according to Welcome! — Anaconda documentation. Though if this is desired, it should be installed before installing PyTorch (below). Supposedly pip will work fine.
PyTorch — the tool (one of them, anyway) used for learning Generative AI. Start Locally | PyTorch
Like most tools and libraries, the instructions assume that I have nothing else happening on my machine, so install just Anaconda globally according to their simplistic assumptions. All the other dependencies like even the Python version on my machine can go to hell.
Dockerizing the environment
Well. Until I have enough $$$ to buy a new machine for each new tool I want to try out, I’ll be using Docker (think goodness I don’t need an actual VM) to isolate an environment to play with.
After some trial and error, this is a template Dockerfile and docker-compose.yml I’m using:
Dockerfile
FROM python:3.11-bookworm
ENV PYTHONUNBUFFERED 1
WORKDIR /code
RUN apt update && apt install -y \
vim
RUN curl -O https://repo.anaconda.com/miniconda/Miniconda3-py311_24.1.2-0-Linux-x86_64.sh
RUN sh Miniconda3-py311_24.1.2-0-Linux-x86_64.sh -b
ENV PATH="$PATH:/root/miniconda3/bin"
RUN conda install -y pytorch torchvision torchaudio pytorch-cuda=12.1 -c pytorch -c nvidia
RUN conda init
I use the “3-management” tag so that it includes a management plugin (accessible at http://localhost:15672/). However, simpler tags (e.g. “3” or “latest“) can be used if the management UI is not needed.
Rebuild the container with the new packages added by celery.
Add a couple of files to set up Celery in our Django project
The Celery stuff will be added into the myapp Django app.
myapp/celery.py
from celery import Celery
app = Celery(
'celery_app',
broker='amqp://rabbitmq',
# backend='rpc://',
# This should include modules/files that define tasks. This is a list of strs
# to be evaluated later in order to get around circular dependencies, I suspect.
include=[
'myapp.tasks', # This is our file containing our task
]
)
# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600)
if __name__ == '__main__':
app.start()
myapp/tasks.py
import logging
from myapp.celery import app
logger = logging.getLogger(__name__)
# This is where Celery tasks are defined
@app.task
def add(x: int, y: int) -> int:
logger.info(f"add({x}, {y}) called.")
return x + y
Add a Celery service to docker-compose.yml
Modify docker-compose.yml again to add a Celery service. This can be done together with the RabbitMQ service above, but it is shown here separately for readability.
A bunch of things to highlight to show where the connection points are:
The broker URL when instantiating the Celery app is amqp://rabbitmq (not amqp://localhost) because that’s how networking in Docker works. The “rabbitmq” in this case the name of the service we use for the RabbitMQ container. So if a different container name is used, this AMQP URL needs to use that corresponding name.
The Celery app parameter (-A myapp.celery) is the path to the myapp/celery.py file where the Celery app (app = Celery('celery_app', ...) ) is created.
Speaking of which, when defining the Celery app, its include=[ ... ] should include str values that point to files where Celery tasks are defined.
And the task files that define the Celery tasks need to import the Celery app and use its @app.task decorator for the task functions.
Sentry is one of those things that most companies use, but most people (or maybe just I) don’t really know how to use it. The way I learn these things is to set it up on my own.
Fortunately, they have a way for anyone to start with a free account to play with.
Register for an account
Just go to the https://sentry.io and click their “GET STARTED” button. Fill out the info and create an Account. By default I got a free account this way. I don’t know what limitations it has, but to just test things, I don’t care.
Create a Project
Once the account is set up, log in and create a new Project. There is a collection of SDKs to select from. Fortunately, there is one for Django.
Selecting it and giving the project a name (replacing the default “python-django” name) brings me to a very useful page where I can copy and paste the blurb to paste into my Django settings.py file:
The “dsn” URL is specific to the project and account. Think of that as the “API key” to the project. That’s why I redacted mine from the pic.
As seen in that “Verify” section, this will hook in an exception handler into the app to catch and log to Sentry any uncaught exceptions.
What about Raven?
The code from work also installed the raven package. However, looking at the docs, it seems that the raven is deprecated and replaced by sentry-sdk.
However, if Raven is used in a Django project, then a logger handler can be connected to handle logs.
Hooking Up Sentry to Error Logger
My workplace, for instance, has Raven installed and set it up so that any errors logged will also go into Sentry via a handler from the Raven package. This is done by configuring the LOGGING setting:
From the above, the properties prop1 and prop2 are normal strings. prop3 is an object which is serialized (bonus: it works with datetime instances, unlike json.puts()).
One limitation of these extra values is that they are not searchable.
Searching for Events: Custom Tags
The Sentry console allows some searching with a list of default searchable properties. It seems like the only way to work with search for our own data is to use Custom Tags. For Python/Django, the easiest way to do that is to set up a scope, then set the tag(s) before calling Sentry:
import logging
from sentry_sdk import push_scope, set_tag
...
logger = logging.getLogger(__name__)
...
with push_scope() as scope:
set_tag("VanTag", "Hello1") # This will add a VanTag:Hello1 to the event
logger.error("Test Error")
With the above, the event raised will now have a VanTag tag of Hello1.
As a result, you can now search for issues/events with this tag:
Grouping Control: Custom Fingerprints
An issue that annoys probably every Sentry user one time or another is how it groups (or ungroups) issues. The documentation says a lot of stuff, but the summary is pretty much, “We have a super algorithm. Trust us.”
Well. When it does not do what we want. One way to influence the grouping is to set the scope fingerprints before calling Sentry.
with push_scope() as scope:
scope.fingerprint = ["Hello1"] # list of strings make the fingerprint
logger.error("Appointment Creation Error")
When the event is sent to Sentry, it will be grouped by the fingerprint (list of strings).
Note: PyCharm mistakenly says that scope.fingerprint is a read-only attribute. It’s not; the code above will work as expected.