Celery
1. What is Celery?
Celery is a distributed system to process messages asynchronously by running tasks in the background. It uses a message broker to deliver messages between the main application and worker nodes. It is designed to handle distributed task processing at scale.
Key Features:
- Asynchronous Task Execution: Run tasks in the background, freeing up your main application.
- Task Scheduling: Execute tasks at specific intervals or after certain delays (like cron jobs).
- Distributed Processing: Run tasks across multiple worker nodes for scalability and fault tolerance.
- Task Retrying: Tasks can be automatically retried if they fail.
2. Celery Architecture
a. Workers
Workers are background processes that execute tasks. You can have multiple workers across different machines to handle tasks in parallel.
b. Message Broker
A message broker (such as Redis, RabbitMQ, or Amazon SQS) is used to send messages from your main application to Celery workers. The broker handles task queuing and routing.
c. Result Backend
The result backend stores the results of tasks. Celery supports several backends like Redis, Django ORM, AMQP, Memcached, and SQLAlchemy.
3. Installation and Setup
a. Install Celery
To install Celery in your project, use pip
:
pip install celery
b. Basic Celery Setup for a Django Project
Create a
celery.py
file in your Django project directory (wheresettings.py
resides):# project/celery.py from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. 'DJANGO_SETTINGS_MODULE', 'project.settings') os.environ.setdefault( = Celery('project') app # Using a string here means the worker doesn’t have to serialize # the configuration object to child processes. 'django.conf:settings', namespace='CELERY') app.config_from_object( # Load task modules from all registered Django app configs. app.autodiscover_tasks()
Update
__init__.py
to ensure that Celery is imported when Django starts.# project/__init__.py from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app = ('celery_app',) __all__
Configure Celery in
settings.py
:Add the configuration for the message broker (for example, Redis):
# project/settings.py # Celery settings = 'redis://localhost:6379/0' # Redis as message broker CELERY_BROKER_URL = 'redis://localhost:6379/0' # Redis to store task results CELERY_RESULT_BACKEND = ['json'] CELERY_ACCEPT_CONTENT = 'json' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'UTC' CELERY_TIMEZONE
c. Starting Celery Workers
Once you’ve configured Celery, you need to run the Celery workers to process the tasks. Run this command from the Django project root:
celery -A project worker --loglevel=info
This command starts a Celery worker that will process tasks defined in your project. The --loglevel=info
flag ensures you see task output.
4. Defining and Executing Tasks
Tasks in Celery are just Python functions that you register as tasks using the @task
decorator. You can create tasks in any app within the Django project.
a. Defining a Task
# tasks.py in any Django app
from celery import shared_task
from time import sleep
@shared_task
def add(x, y):
10) # Simulate a time-consuming task
sleep(return x + y
@shared_task
: This decorator marks the function as a Celery task, meaning it can be executed asynchronously.
b. Calling a Task
To execute a Celery task asynchronously, you use the delay()
method:
from .tasks import add
# Call the task asynchronously
= add.delay(4, 6)
result
# You can also retrieve the result (if needed)
print(result.get()) # This will block until the task is done
This will queue the add
task, and Celery workers will execute it in the background.
c. Task Results
You can track task execution status and retrieve the result using the task’s ID:
# Retrieve the result using the task ID
from celery.result import AsyncResult
= AsyncResult(task_id)
result if result.successful():
print(result.result)
5. Task Retries and Error Handling
Celery allows you to automatically retry tasks that fail due to transient errors.
a. Retrying a Task
To make a task retryable, use the retry()
method in the task function:
@shared_task(bind=True, max_retries=3)
def example_task(self):
try:
# Simulate a task that may fail
risky_operation()except SomeError as exc:
raise self.retry(exc=exc, countdown=5)
max_retries=3
: The task will be retried a maximum of 3 times.countdown=5
: The task will wait 5 seconds before retrying.
b. Error Handling
If a task raises an exception, it can be caught and retried or handled with a fallback:
@shared_task(bind=True)
def risky_task(self):
try:
risky_operation()except Exception as exc:
self.update_state(state='FAILURE', meta={'error': str(exc)})
raise exc
self.update_state()
: Updates the task’s state manually. You can store custom error metadata using themeta
argument.
6. Periodic and Scheduled Tasks
You can schedule tasks to run at specific times or intervals, similar to cron jobs.
a. Using Celery Beat
Celery Beat is a scheduler that kicks off tasks at regular intervals. It works alongside your Celery workers.
Install the necessary package:
pip install django-celery-beat
Add
django_celery_beat
to yourINSTALLED_APPS
insettings.py
:= [ INSTALLED_APPS # other apps 'django_celery_beat', ]
Run migrations for Celery Beat:
python manage.py migrate django_celery_beat
Configure a Periodic Task using the Django Admin or in code:
Example of a periodic task running every 10 minutes:
from django_celery_beat.models import PeriodicTask, IntervalSchedule # Create an interval schedule (every 10 minutes) = IntervalSchedule.objects.get_or_create( schedule, created =10, every=IntervalSchedule.MINUTES, period ) # Create the periodic task PeriodicTask.objects.create(=schedule, # Use the schedule created above interval='My periodic task', name='myapp.tasks.my_task', # The name of the task function task )
Start the Celery Beat Scheduler:
celery -A project beat --loglevel=info
This command will run the scheduler, which triggers periodic tasks based on your schedule.
b. Using the crontab
Scheduler
You can also configure tasks to run at specific times using cron-like syntax:
from celery.schedules import crontab
= {
app.conf.beat_schedule 'task_name': {
'task': 'myapp.tasks.my_task',
'schedule': crontab(hour=7, minute=30, day_of_week=1), # Every Monday at 7:30 AM
}, }
7. Optimizing Celery for Production
a. Concurrency
You can control how many worker processes or threads Celery spawns using the --concurrency
option when starting workers:
celery -A project worker --concurrency=4
This will start 4 worker processes.
b. Prefetch Limit
Celery workers can fetch multiple tasks in advance, but this may overload workers in some cases. You can limit the number of prefetched tasks with CELERYD_PREFETCH_MULTIPLIER
:
= 1 # Pref
app.conf.worker_prefetch_multiplier
1 task at a time etch only
c. Task Time Limits
If a task takes too long, you can set time limits to prevent worker exhaustion:
= 60 # Raise exception after 60 seconds
app.conf.task_soft_time_limit = 120 # Hard limit (kill task after 120 seconds) app.conf.task_time_limit
d. Monitoring and Admin
Use Flower, a web-based tool to monitor Celery workers and tasks:
Install Flower:
pip install flower
Start Flower:
celery -A project flower
Flower provides a web interface that shows the current status of workers, tasks, and queues. You can view task progress, task failures, and retry counts.
8. Broker Options
Celery supports a wide range of message brokers, including:
- Redis: A simple key-value store that can act as a message broker.
- RabbitMQ: A robust, feature-rich message broker that supports advanced features like routing and exchanges.
- Amazon SQS: A scalable message queue service.
Redis Example:
= 'redis://localhost:6379/0'
CELERY_BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND
RabbitMQ Example:
= 'amqp://guest:guest@localhost:5672//' CELERY_BROKER_URL
9. Best Practices
Use Task Queues: Use task queues to categorize and separate different types of tasks, especially if they have different priority or resource requirements.
Example:
-A project worker -Q queue_name celery
Idempotent Tasks: Ensure tasks are idempotent, meaning they can run multiple times without unintended side effects. This is essential because Celery retries tasks after failures.
Error Handling: Always handle potential errors inside tasks using
try/except
and consider usingretry()
for transient issues.Monitor Tasks: Use monitoring tools like Flower or Prometheus to track worker health and task performance.