Asynchronous Tasks with Celery

What is Celery

Django is designed for short-lived requests. Consequently, if you ever need to perform a long-running operation, you should always perform it outside of the request-response cycle.

The following are some examples of long-running operations:

  • sending emails or mass email
  • transcoding media files
  • image processing
  • generating reports

and so on.

Trying to perform these operations within the request-response cycle will significantly increase the waiting time and size of HTTP response. This could eventually, drive the users away from the site. Furthermore, these operations are also subject to failures. For this reason, we might want to have a retry policy.

The solution is to maintain a queue of long-running operations and launch them asynchronously.

Fortunately, we have a great package called Celery which does all the heavy-lighting for us.

Quoting from the Celery documentation:

Celery is an asynchronous task queue based on distributed message passing. It is focused on real-time operation but supports scheduling as well.

In addition to running asynchronous tasks, Celery also gives you option to execute task periodically.

To install Celery execute the following command:

Next, you need a message broker. A broker is where your tasks get stored as a queue. The broker sends these messages to Celery workers, which then executes the task and provides a response. By default, Celery uses RabbitMQ as a message broker. To install RabbitMQ execute the following command:

After the installation rabbitmq-server will start automatically. You can check the status of the rabbitmq-server using the following command:

If for some reason rabbitmq-server doesn’t start automatically, type the following command in the shell:

To stop the rabbitmq-server use the following command:

Integrating Celery with Django

To add Celery to the Django project, we have to first create an instance of Celery app along with some configurations.

Create a new file named celery.py next to settings.py in the Django configuration directory and add the following code to it:

djangobin/django_project/django_project/celery.py

In line 1-3, we are importing necessary functions and modules.

In line 6, we set DJANGO_SETTINGS_MODULE environment variable for the celery command line application.

In line 8, we create an instance of Celery by passing it the name of the Django application.

In line 14, we load any custom configurations from project’s settings.py file.

Finally, in line 17, we tell Celery to auto discover tasks form the applications listed in INSTALLED_APPS setting. With this line in place, Celery will look for a module named tasks.py in every installed app to load tasks in it.

Next, we have to load the Celery instance every time the Django starts. To do so, add the following lines in __init__.py file in Django configuration directory.

djangobin/django_project/django_project/__init__.py

Now, we are ready to define some tasks.

Creating Asynchronous Tasks

In this section, we are going to create asynchronous tasks to send emails.

As things stand, there are three points in our application where we are sending emails.

  • Contact Form
  • Password Reset
  • Account Activation

The Contact form and Account Activation uses mail_admins() and send_mail() functions to send mail. Both of them are synchronous functions which means they will block the execution of the program until they are done working.

However, if you have used the application thoroughly, you will find that we are not experiencing any delay at all. This is because we are currently using the console backend, if we switch to SMTP backend the delay will be significant.

The Password Reset mechanism also uses send_mail() function to send the reset password link but to keep things simple and easy to follow we will not going change it.

Let’s define some tasks now.

Create a new file named tasks.py in the djangobin app directory and add the following code to it:

djangobin/django_project/djangobin/tasks.py

A task is just an ordinary function defined using @task decorator. The send_activation_mail() function accepts user_id and context arguments and sends an email using the send_mail() function.

The arguments you pass to the task functions are serialized and stored in the broker. The newer versions (since 4.0) of Celery uses JSON as default serialization method. The JSON serializer can only serialize simple objects like int, string, list etc. It can’t serialize complex objects like Model instances, HttpRequest instance, bytestring and so on.

This is the reason why we are sending user_id to the send_activation_mail() function instead of the User instance. For the same reason, we are creation half of the context data in the send_activation_mail() instead of passing it directly from save() method of CreateUserForm form.

Note: We can pass a complex object to the task function by changing the default serializer to pickle. In fact, in an older version of Celery (before 4.0), pickle was the default serializer. However, because of some security vulnerabilities, pickle is not the recommended way to do things.

The next step is to call this task, which we do using the delay() method. Here is the updated version of the save() method.

djangobin/django_project/djangobin/forms.py

The delay() method only puts the task in the queue. It is the job of celery worker to execute it as soon as possible.

Everything is in place now, all we need is to start the Celery worker.

In the terminal, make sure your current working directory is set to project root directory, and then start the Celery worker by typing:

The output will be something like this:

Celery worker is now up and running. Start the Django development server if not already running and create a new user by visiting http://localhost:8000/signup/.

Enter data in all the fields and hit submit. In the shell, running the Celery worker you will get output like this:

Our asynchronous task has been executed. To activate the user copy and paste the URL into the browser address bar.

Let’s create another task to send feedback email to the site admins. Open tasks.py and add send_feedback_mail() task just below the send_activation_mail() task as follows:

djangobin/django_project/djangobin/tasks.py

Next, modify contact() view to use send_feedback_mail() function as follows:

djangobin/django_project/djangobin/views.py

Restart the Celery worker for the changes to take effect and visit http://localhost:8000/contact/.

Fill the form and hit submit. This time, in the shell, running the Celery worker you will get an output like this:

Removing Snippets via Periodic Tasks

In this section, we will add a periodic task to delete the expired snippets from the database.

Open tasks.py and add remove_snippets() function towards the end of the file as follows:

  1. The function starts with a for loop which iterates over snippets whose expiration attribute contains a value other than never.
  2. In lines 19-26, we use if-elif statement to create a datetime.timedelta object. The datetime.timedelta object is used to perform basic arithmetic on datetime.datetime and datetime.date objects. If we add a datetime.timedelta object to a datetime.datetime object, the result will be a new datetime.datetime object. On the other hand, if we subtract two datetime.datetime objects, the result will be a datetime.timedelta.
  3. In line 29, we calculate snippet expiration time by adding snippet creation time and timedelta.
  4. In line 32, we use now() method of datetime.datetime class to get the current date and time.
  5. In line 35, we calculate the difference between deletion_time and now. The difference yields a new datetime.timedelta object. If the days attribute of the datetime.timedelta is 0 or negative then it’s time to delete the snippet, this exactly what we are doing in the if statement in line 37.

We have defined a periodic task. To execute it we use something called Celery beat. The Celery beat is a scheduler which kicks off at regular intervals to execute tasks.

Celery beat reads periodic tasks from beat_schedule setting. Open celery.py and define beat_schedule setting as follows:

The task field refers to the name of the task to execute and the schedule field refers to the frequency of execution. The crontab(minute=0, hour=0) indicates that the remove_snippets task will be run daily at midnight. We can pass many other arguments to crontab instance, this page shows some examples.

To start the celery beat scheduler type the following command:

The output will be something like this:

It is important to note that this command only starts the scheduler. To execute the tasks you’d have to start the Celery worker on a separate terminal:

You can also start Celery worker along with beat using -B option:

Now the expired snippets will be deleted automatically at midnight daily.

Monitoring Tasks using Flower

Flower is a web-based tool for monitoring and administering celery. To install Flower type the following command:

To launch the web-based tool type the following:

By default, this command starts a web server at post 5555. To change the port use --port option.

Open your browser and navigate to http://localhost:5555/. You should see a dashboard like this:

Leave a Comment