🚀

🚀

Navigate back to the homepage
Work with me!

Python Celery for first time users

Wojtek Kulikowski
November 30th, 2019 · 4 min read

This is the blogpost I would love to read a year ago when I was first asked to integrate Stripe into our Django app. My introduction to asynchronous world of Python was rough, but it doesn’t have to be. Enjoy the ride! 🏄🏻‍♂️

What is Celery and why do I need it?

Formal definition of Celery states that it is a distributed task queue, focused on real-time processing, while also supporting task scheduling. 1

But what does it really mean? Let’s break it down:

  • distributed - the system can run on a remote machine, independently from the client requesting tasks
  • task - an instruction (in form of a Python function) delegated from the client to the Celery
  • queue - tasks requested by the client are stored in queue and executed by workers picking them up

Essentially, Celery is being used to offload the WSGI server which should handle http requests as quickly as possible. Every compute- or time- heavy action should be put on the queue instead of being handled in the main process. If the user doesn’t need to get their response immediately, it makes a perfect candidate for a task put on the Celery queue. Some standard examples include:

  • sending an email
  • making a request to an external APIs like Stripe or AWS
  • performing a compute heavy database query

But why can’t we just use our powerful multi threaded CPU to handle this tasks? Where is the event loop when we need it? The answer is simple - Python is not really an asynchronous friendly language and most popular web frameworks like Django and Flask will have yet to come a long way to achieve their full asynchronous potential.

Why isn’t Python async friendly?

The short answer is: it is now The longer answer is: it wasn’t always like that.

First proposal for integrating asyncio is dated to 2012, over 20 years after first language release! Since introduction of the event loop in Python 3.5 it is possible to handle incoming http requests asynchronously, but not all web frameworks are ready to implement the async / await into their middlewares.

Django with the release of 3.0 officially includes ASGI support, but this came only in December 2019 and there is still much more to be done before Django apps handle asynchronous requests out of the box.

Another problem with Python, although not necessarily breaking the async features is limitation to running only on one thread at a time. One of the assumptions behind CPython (the most widely used implementation of Python) is to be easily extendible with C libraries, even not thread-safe ones 2. To achieve that, CPython makes use of GIL - Global Interpreter Lock. In simple words - it makes Python always use only one thread at the time, by design.

Passing tasks from server to tasks using message queues

When you want to delegate a task to the worker you need to put it on a task queue first. Usually it is as simple as calling the task function with .delay() added:

1from celery import Celery
2
3import requests
4
5app = Celery()
6
7@app.task
8def send_tweet(username, content):
9 raise NotImplementedError
10
11if __name__ == '__main__':
12 send_tweet.delay("wkulikowski1", "Nice haircut! 🔥")

This is the basic example. So what happens now?

Function and arguments are being serialized to a form understandable for for the queue backend. Celery defaults to the JSON serialization, so arguments need to consist from lists, dicts or primitives. Passing a python object will result in an exception, unless you change the serialization method to something that supports it, like pickle.

Then the tasks are stored in the queue. Your backend stores and exposes them to workers ready to execute.

Now the sweet automation kicks in and workers will pass tasks in the predefined order. You can learn more about priorities here

Once the task is ready, you can pass the results to the result store.

Most common backends for storing tasks and results are rabbitMQ and Redis. While they differ in certain aspects, this is out of scope for this post and you are probably good going with either.

Celery in Django examples

Single task delegation

Let’s analyze the simples example first. Let’s assume we run a book store that charges $1 every time when user opens the book. We pride ourselves with an immediate delivery, so we can’t wait for the successful charge confirmation. Let’s implement this in Django.

1from rest_framework import mixins
2from rest_framework import viewsets
3
4from .models import Book
5from .serializers import BookSerializer
6from .payments import charge_user
7
8class BookViewset(mixins.ListModelMixin,
9 mixins.RetrieveModelMixin,
10 viewsets.GenericViewSet):
11
12 queryset = Book.objects.all()
13 serializer_class = AccountSerializer
14
15 def retrieve(self, request, *args, **kwargs):
16 charge_user.delay(request.user)
17 return super().list(request, *args, **kwargs)

Done!

Project configuration

It is easy to overthink where to put your config files. There is so many options and while you probably can just put it anywhere, one practice I saw the most is creating a separate Django app called taskapp. In project_name/taskapp/celery.py you can then put code like that:

1import os
2
3from django.apps import AppConfig, apps
4from django.conf import settings
5
6from celery import Celery
7
8if not settings.configured:
9 # set the default Django settings module for the 'celery' program.
10 os.environ.setdefault(
11 "DJANGO_SETTINGS_MODULE", "config.settings.local"
12 )
13
14
15app = Celery()
16
17
18class CeleryConfig(AppConfig):
19 name = "project_name.taskapp"
20 verbose_name = "Celery Config"
21
22 def ready(self):
23 app.config_from_object("django.conf:settings", namespace="CELERY")
24 installed_apps = [app_config.name for app_config in apps.get_app_configs()]
25 app.autodiscover_tasks(lambda: installed_apps, force=True)

Line app.autodiscover_tasks(lambda: installed_apps, force=True) assures that all Celery tasks located in files called tasks.py across all the folders are discovered.

Celery beat

If you want to run periodic tasks (like sending a weekly report every Monday) you can take advantage of Celery beat - scheduler included in Celery package which kicks off tasks in regular intervals. You have plenty options to define the them as cron jobs, specific timestamp or periods, among many.

You can define such schedule by defining app.conf.beat_schedule inside the ready function

1app.conf.beat_schedule = {
2 "send_conversations_details": {
3 "task": "project_name.report.tasks.send_weekly_report",
4 "schedule": crontab(day_of_week='monday', hour=0, minute=0),
5 }
6 }

Race conditions and other threads

The most common problem with developing concurrent programs is the risk of running into race conditions. What are those?

A race condition is the condition of a system where the system’s substantive behavior is dependent on the sequence or timing of other uncontrollable events. It becomes a bug when one or more of the possible behaviors is undesirable. — Wikipedia 3

Essentially, it happens when 2 processes rely on each other and their not coordinated well. Here is an example which actually happened to me once (and hopefully will never happen again 😉)

1def respond_to_messenge(chatbot_message.id, conversation_id):
2 conversation = Conversation.objects.get(id=conversation_id)
3
4 send_fb_seen()
5 send_fb_writing_bubbles()
6 send_fb_text_message(chatbot_message.id)
7
8
9conversation = Conversation.objects.create()
10respond_to_messenger.delay(chatbot_message.id, conversation.id)

Can you spot the bug? First we create the object in the database and want to access it immediately in the celery task. On system that we use, the object has not yet been written to the database and accessing it will trigger the DoesNotExist exception.

Monitoring your Celery workers

Celery has a built in monitoring environment that you definitely should use. It is called flower and it allows you to discover how many tasks are being processed, what workers are currently doing and if there is failures in execution.

Have a look!

flower dashboard

Horizontal scaling

Scaling up with celery is dramatically easy. If there is more tasks in the queue than 1 worker is able to operate, you can just add workers and forget about your problems! Now the main limitation is the power of your machine, but this can be solved as easily as picking a more powerful EC2 instance.

Summary

This Celery thing is going places, not gonna lie. Summary needs to be yet finished, but so far so good. If you’re reading this, I probably asked you for reading a draft on my work, so I will be flattered if you text me some feedback 🔥

Do you want to get notified about the next artice? In May I will write about functional programming and its use cases across the industry

Once a month I send a newsletter with a personal note, my current research areas and Twitter accounts I found lately. Sign up and and grow with me 🌱

More articles from Software Engineering by Wojtek

Yes, you should monitor your system

While some projects may slack on it, logging and monitoring are the best ways to improve the stability of your app

October 22nd, 2019 · 6 min read

How does $PYTHONPATH work?

Every language needs a system to manage the dependencies and it can be tricky in the python world

April 8th, 2019 · 3 min read
© 2019–2020 Software Engineering by Wojtek
Link to $https://twitter.com/wkulikowski1Link to $https://github.com/wkulikowskiLink to $https://www.linkedin.com/in/wkulikowski