Celery Architecture

Celery is a distributed task queue for asynchronous execution. It does not make your code perform better. But It improves the throughput of the system by taking advantage of machine power you have.

Introduction

Celery is a distributed task queue for asynchronous execution. It does not make your code perform better. But It improves the throughput of the system by taking advantage of machine power you have.

Throughput is a measure, in simpler term, how much work can be done in specific period.

For example: If one person can do X amount of work in 1hr, so the throughput is X per hour. When you can add one more person to do the work, Y amount of work can be done in 1hr. Y can be lesser than X or greater than X. But that is how much work can be done in 1hr. That is throughput.

Celery executes task passing messages from client to worker. For messaging celery uses Kombu messaging library which take care of Producer, consumer communication with server as per AMQP specification.

RabbitMQ and Redis are messaging server which implements queuing, routing and routing functions as per AMQP specification. So most of the terms involved in celery are from AMQP specification.

Out of box, Celery supports scheduling as well. But we are not discussing that in this post.

Celery Terms

Before getting into how celery executes the task. Let's get familiar with terms used in the celery to follow up any celery kind of discussion.

Task

Tasks are building blocks for celery and executed by the worker processes. Yes, simple add function can be converted into celery task with simple task decorator.

@app.task
def add(x, y):
return x + y

When creating a task, one important thing to know about is Idempotent. So that you can decide, if the task can be re-run with retry options for know application errors.

The function can be called n no. of times and the result will be same. for example, the above function takes x=2 and y=2, output will be x + y = 4. Result of the function for x=2 and y=2 will be same always.

Producer

Producer is nothing but a client who requests for the task execution. You will be calling add function to execute in celery worker. That main process, in simpler terms, which calls like below is the producer.

result = add.delay(4, 4)  # send message to exchange for execution

Consumer (Worker)

Consumer is the worker in the celery. Worker will not execute the task, but gives the task to worker process (execution pool) to execute the task. You will be able to understand when you look at the architecture diagram below.

You will have to create a celery worker as below. We are creating worker with additional parameter.

(celery)pycelery celery -A celery_demo worker -l info -Q demo --concurrency=3 -n worker1@%h --autoscale=3,1

Message

Task message is passed from producer to broker to queue the task and broker delegates the task to worker based on the queues. Typically, message has two parts. Header contains the content type (serialization) of the message. Body contains the name, task id and arguments of the task.

{
'task': 'myapp.tasks.add',
'id': '54086c5e-6193-4575-8308-dbab76798756',
'args': [4, 4],
'kwargs': {}
}

Broker

Broker is the message server which routes the message between producer and worker. It implements AMQP message queuing and routing functions. RabbitMQ and Redis can be used as message brokers in celery

Exchange

Exchange accepts messages from a producer and routes messages to one or more queues according to routing key (bindings with queue). Exchange is a matching and routing engine. It is basically do the job of post office.

Direct is the default exchange type used in celery. Exchange type implements a specific routing algorithm. Fanout, Topic, Headers are other exchange types that can be used.

Exchange(name='demo', type='direct')

Routing key

Routing key is called binding key (or address). Exchange may use route the task message to one or more queues. Exchange type like fanout and header will ignore routing key information.

Queue

Queue keeps the message received from the exchange and messages will be processed by the worker. Exchange will be tagged to queue with routing_key. That is called binding.

demo_exchange = Exchange(name='ex_demo', type='direct')
Queue('demo', demo_exchange, routing_key='demo')

Result Backend

Result backend is used for storing task states and results in celery. Celery supports SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc), and Redis as result backends in celery.

Content type

Content type is used for serialization and de-serilization in Celery to send/receive messages. Celery supports JSON, pickle, YAML, msgpack. But only JSON serialization type is enabled by default.
You can enable different type of serialization in the celery application. Only accepted serialized message will be passed to the consumer. Thus, you can not send objects like python Enum in the message. To send such a type, you should enable pickle serialization type.
Pickle is disabled for security reason from celery 4.0. If your application generates messages from web application request and you want to send Enum like objects, it is advisable to create custom serializer and provide in the configuration.

app.conf['CELERY_TASK_SERIALIZER'] = 'pickle'
app.conf['CELERY_ACCEPT_CONTENT'] = ['json', 'pickle']

Acknowledgement

Worker sends acknowledgement to broker upon receiving message, even before executing the task. This is called early-acknowledgement.
There are times you need acknowledgement after executing the task. This can be configured at the task level or while calling the task using acks_late parameter.

# Task level
@app.task(acks_late=True)
def add(x, y):
return x + y

Execution Pool

Pool implementation for execution in celery. By default, Prefork (multi processing using billiard package) is used of execution. Celery supports gevent, eventlet and solo.
For IO bound application Eventlet and GEvent will be appropriate. For CPU bound application Prefork (multi processing) will be appropriate. AsyncIO is not support natively with celery. But it will be supported with Next major Celery release in the future.

Celery Architecture

We know about the terms used in the celery. It will be easy to understand how those components been laid out to perform parallel execution.


Message flow

Numbers noted up in the diagram is associated with below steps and tell how the message flow from producer to worker.

  1. Producer sends the task message to exchange.
  2. Exchange routes the message to desired queue upon examining the task message for routing information.
  3. Consumer takes the messages from Queue for processing.
  4. Consumer sends the acknowledgement to broker.
  5. Broker deletes the message from the queue.
  6. Consumer completes the task and send the results to result backend.

Hope this helps for you to understand the execution flow in the celery.


Feel free to leave a comment if you want more explanation on the topic or any other queries. I am happy to help. Happy coding!!

Related posts
Configuring celery application

Configuring celery application

Durai Pandian Mar 28, 2020

Celery application should be configured and instantiated before starting workers and executing tasks. Creating celery ap...
Continue reading...

Comments
We'll never share your email with anyone else.