Asynchronous Task Management with Celery

Patrick O'Neill
3 min readJun 8, 2020

Previous blog here.

After getting SQLAlchemy up and running in my project I then worked on creating the restful routes in the back-end and the fetches and forms to use them in the front-end.

So the next interesting and new to me piece of the puzzle was asynchronous task management and the reason for using it in my project is that I want to send emails.

When creating a user account I want to be able to send a welcome email to the new user. The whole process involves database operations to see if the username/email exists already, creating a new user and adding them to the database and then sending an email to the new user. Running these functions synchronously (one after the other) would take a long time (relatively, people are impatient) to get back to the user and confirm that they are registered.

The solution I will use for this problem is to make the email part of these functions asynchronous. Once the user has been created in the database I will then call the email function asynchronously and without having to wait for that to finish return a success message to the front-end that the user has been created.

I will implement this by using a message queue, this is where messages are sent to a server and stored in a queue until they can be processed and then deleted. Message queue server examples include RabbitMQ, Redis and Amazon SQS.

To use the message queue server I will be using Celery which is a Python package for managing this whole process. I’ll break it down into three stages, getting everything in place and setting up celery, sending a task to the message queue and picking up and executing a task from the message queue.

Celery

Setting it up…

First from the command line you will need to install the package.

pip install celery

Then in your Python program you will need to import Celery and then create a celery instance.

from celery import Celery
...
celery = Celery( app.name, backend=['REDIS_URL'], broker=['REDIS_URL'])

To get a Redis server up and running here is an article with instructions or as I am currently hosting on Heroku I used the Heroku Redis add on and got the server address from there.

The next thing we need to do it tell Python which functions can be Celery tasks. We do this by using a @celery.task decorator.

@celery.task
def example_function():
return "test"

Sending a task to the MQ…

The last thing we need to do is when we call the example_function is to use the .delay() method. Any arguments passed into to the delay method will be passed to the example_function.

example_function.delay()

At this stage we have set up Celery and we should have sent a message to our Redis server but it is currently just sat there waiting and twiddling its thumbs.

Setting up the Worker…

We now need something to pick up and crunch the numbers on the tasks we have sent to our message server. We need to create another process that will do that and in Celery that is called a worker. To do this we can use the command below.

celery -A app.celery worker --loglevel=info

If in another terminal you now run your Python app and call the example_function you should see that in the Celery terminal it picks up and executes the task!

So I now had the building blocks in place, I was able to go and implement Celery and decorate my function that sends the welcome email to the new user!

As I was hosting on Heroku I also needed to change my procfile and then turn on the second dyno and run the celery worker as well as the flask application.

web: gunicorn app:app
worker: celery -A app.celery worker

If you’re interested in looking at my code click here.

--

--