Celery/RabbitMQ Notes on Task Timings

celery, django, rabbitmq

The following notes apply when using Celery with the RabbitMQ broker.

Celery Settings

task_acks_late

The Celery setting task_acks_late (by default disabled), if set, will defer message ACK with RabbitMQ until the task completes successfully.

  • If it is enabled, and the Celery task takes too long (see consumer_timeout below), the message will be requeued and redelivered to the next worker. This will cause the message to be processed multiple times.
  • If it is disabled, then Celery will ACK the message to RabbitMQ as soon as the task starts. This tells RabbitMQ that the message was “delivered and processed,” and RabbitMQ will delete this message from the queue. This will cause the message to be lost if the Celery task was interrupted before it finishes.

task_acks_on_failure_or_timeout

Then there is task_acks_on_failure_or_timeout (by default enabled). According to the doc, this will ACK the message even if the task failed or timed out. This may or may not be the correct choice depending on the task.

RabbitMQ Settings

consumer_timeout

The RabbitMQ consumer_timeout configuration (by default 30 minutes) gives a task a maximum timeout before requeuing the message. If a client (a Celery task in this case) does not ACK the message before this timeout expires, the message will be requeued and redelivered.

TTL / message TTL

Furthermore, the TTL / message TTL configuration (by default ??) determines how long a message will stay in RabbitMQ before being discarded. If a message stays in a queue for longer than this time, then it is removed from the queue.

Implications

To increase the chance that tasks are not aborted/lost due to restarts (e.g. deployments):

  • Design each task to finish as soon as possible. Spawning additional smaller tasks if necessary.
  • The entire task is designed to survive a partial completion state and is able to completely restart without ending up in a bad state (e.g. duplicate records created, abandoned/orphaned/partial updates).
  • Enable task_acks_late so that RabbitMQ ACKs are not sent until tasks are finished.
  • Extend the TTL / message TTL for RabbitMQ so that we have enough time to consume messages (in case the Celery task needs to be paused for some time to fix bugs).

There is no support to retry failed tasks. In fact, unless there is a Result Backend set up, there won’t even be a good way to audit which tasks failed (other than application logs).

Setting up Git repo on Dreamhost

programming, Uncategorized

Create an empty repo

Create an empty repo off of /home/username where username is your user name.
The following will create an empty repo named “myrepo” in the subdirectory /home/username/myrepo.git

cd ~
git init --bare myrepo.git

Adding content

Go to where the source code are and initialize a git repository. Then add the files and configure as necessary:

git init
git add ...
git commit ...

Configure a remote repository to map to the repo created earlier in order to push contents to:

git remote add dreamhost ssh://username@server.dreamhost.com/home/username/myrepo.git

The above sets up a remote repo called “dreamhost” that tracks the repo created above. The URL component ssh://username@server.dreamhost.com indicates how to access the server where the repo is. The user name and server names can be found by following the docs from Dreamhost:

https://help.dreamhost.com/hc/en-us/articles/115000675027-FTP-overview-and-credentials#Finding_your_FTP_server_hostname

Finally, push the change up:

git push -u dreamhost master

Pulling content

Most likely on a different machine, use git clone to pull content and start tracking changes:

git clone ssh://username@server.dreamhost.com/home/username/myrepo.git
Cloning into 'myrepo'...
username@server.dreamhost.com's password: xxxxxxxxx
remote: Counting objects: xx, done.
remote: Compressing objects: 100% (10/10), done.
remote: Total xx (delta 1), reused 0 (delta 0)
Receiving objects: 100% (xx/xx), 3.28 KiB | 3.28 MiB/s, done.
Resolving deltas: 100% (1/1), done.

The URL component ssh://username@server.dreamhost.com indicates how to access the server where the repo is as is the case before. The path /home/username/myrepo.git is the path to the remote repo that you create in the first step above.

Now you can use git add, git commit, and git push to add content:

git add ...
...
git commit
...
git push origin master

Or, for a specific branch (mybranch in this example):

git checkout -b mybranch
git add ...
...
git commit
...
git push origin mybranch

Spring Boot on Scala

programming, Scala, Spring Boot, Uncategorized

Using Spring Boot on Scala

Assumption

This example uses a project generated/seeded via Activator using the “minimal-scala” template. However, any project using SBT will probably work just as well.

Add Spring Boot to dependencies

Add Spring Boot dependencies to build.sbt:

libraryDependencies += "org.springframework.boot" % "spring-boot-starter" % "1.3.2.RELEASE"
libraryDependencies += "org.springframework.boot" % "spring-boot-starter-test" % "1.3.2.RELEASE"

Create a SpringBootApplication and run it

package com.example

@SpringBootApplication
class HelloApp {

}

object Hello {
 def main(args: Array[String]): Unit = {
   SpringApplication run classOf[HelloApp]
 }
}

This class serves as a package “anchor” for component scanning. The singleton exports an equivalence of a “public static void main(String args[])” entry point.

At this point, “sbt run” will bring up the app, but it will quit soon afterward.

Time to decide if this will be a Web app or a Command line app.

Branch to Web or Command line (console)

Web app

To create a Web app, add the spring-boot-starter-web dependency. Add this to build.sbt:

libraryDependencies += "org.springframework.boot" % "spring-boot-starter-web" % "1.3.2.RELEASE"

That’s it! Now running “sbt run” will bring up the Web app listening on port 8080.

Command line app

To create a command line app (or to add a command line runner to a Web app such as above), modify the main app class:

@SpringBootApplication
class HelloApp extends CommandLineRunner {
  override def run(args: String*): Unit = {
    println("Hello from Command line runner!")
  }
}

Extending CommandLineRunner in Scala is the same as implementing CommandLineRunner in Java. So just implement the run method.

Try Autowiring something

Assuming the app class is com.example.HelloApp. The component scanning starts at com.example. So create a service class under that package (or a subpackage thereof). I will create one under com.example.services:

package com.example.services

import org.springframework.stereotype.Service

@Service
class HelloService {
  def hello(): String = {
    "Hello from HelloService!"
  }
}

Autowire into a bean using the service (HelloApp in this example):

@SpringBootApplication
class HelloApp extends CommandLineRunner {

  @Autowired
  var helloSvc: HelloService = null

  override def run(args: String*): Unit = {
    println(helloSvc.hello())
  }
}

Configuration properties

Configuration properties can go into src/main/resources/application.properties, same as in Java world:

titlemsg=Hello App

Accessing properties is also pretty much the same (e.g. shown here using @Value):

@SpringBootApplication
class HelloApp extends CommandLineRunner {
  ...

  @Value("${titlemsg}")
  var titleMsg: String = null

  override def run(args: String*): Unit = {
    println(titleMsg)
  }
}

There are several implementations of the @Value annotation available, be sure to use the one from Spring:

org.springframework.beans.factory.annotation.Value

Logging

And logging. Adding src/main/resources/logback.xml will allow tweaking of logging behavior:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/base.xml"/>
    <logger name="com.example" level="DEBUG"/>
</configuration>

I use Log4j, so here’s how to add a logger into the class to log with:

class HelloApp extends CommandLineRunner {

  val logger = Logger getLogger classOf[HelloApp]

  override def run(args: String*): Unit = {
    logger info "Hello using logger"
  }
}