Queue
Queues
Queue's are generally used to ensure some sort of buffer between producers and consumers. They can be thought of as a place to keep messages between the two groups so that there's not overload or connectivity issues between the two sets
They also, generally, provide durability, single point connectivity, scaling, and other useful traits like priorities, acknowledgements, and retries
Queue's are useful when you have producers and consumers that have different workloads, where you might want to scale the consumers to comply with new messages coming in from producers - the most typical example I see is an API fielding requests and sending messages to workers to run complicated ML pipelines and models
Example:
- Message comes in with JSON field holding a document, or an S3 URL that holds a large text based document
- API fields the request, checks priority, and sends to a queue
- To ensure timely response, consumer pool may scale to 2-3x the size of the producer pool since each request is light on producer and very heavy on consumer
- Consumer may need to run multiple NLP pipelines such as Entity Extraction, Sentiment Analysis, and/or Categorization
Queues In The Wild
Some examples below of queue's typically used "in the wild"
SQS
Redis
RabbitMQ
Celery
Celery is somewhere between a queue, a message broker, and a serializable function passer - Celery, IMO, is not actually a queue, but it's often described as one
At it's core, Celery is probably best described as an Distributed Producer-Consumer Architecture SDK or something in that vein
Celery helps developers to write tightly coupled producer.py
and consumer.py
scripts, along with celery.config
files, which allow producers to push to a backend message queue, typically Redis or RabbitMQ, which then gets pulled by consumers
Celery sits at the application plane, and it's "scaling" is done on Celery Workers which are tightly coupled to the compute / infra plane
Celery cannot ask your infra plane to stand up more computes, it can simply scale up more processes on each of those computes
Celery Structure
Celery has a few core components that make it work
- Brokers - The message queue that Celery uses to pass messages between producers and consumers - typically Redis or RabbitMQ
- Producers - The code that produces tasks and sends them to the broker
- Consumers - The code that consumes tasks from the broker
- Consumers are the actual workers that process the tasks, and they typically run in separate processes or containers
- You do not specifically create a
consumer.py
file, your producers calltasks.add.delay()
and then eventually they will get a returned result - Therefore, our producers call a worker and then consume the output
- Workers - The processes (actual computes) that run the consumer code and execute the tasks
- Tasks - The functions that are executed by the workers (and passed between producers and consumers)
These components work together to create a distributed task queue system that allows for asynchronous task execution and scaling, BUT they are also tightly coupled in terms of repo setup, and code structure
In a typical setup, there are 3 main components
Tasks / App
tasks.py
- This file contains the Celery app configuration and the task definitions. Both producers and consumers will import this file to access the Celery app and tasks- This file is the "glue" that holds the producer and consumer code together, and so it tightly couples your sender (producer) and receiver (consumer)
- A number of implementations will make this a proper separate package that both producer and consumer repos can import
The Celery()
instance below is commonly referred to as the "Celery app"
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
- After this, you're able to run Celery via
celery -A tasks worker --loglevel=INFO
- Form:
celery -A <module_name> <type> --loglevel=INFO
- A crap load of other CLI options for running as daemon, setting up worker pools, etc...
- Form:
You can even have multiple Celery Apps and Brokers inside of a Project
from celery import Celery
app1 = Celery('app1', broker='redis://localhost:6379/0')
app2 = Celery('app2', broker='amqp://guest@localhost//')
@app1.task
def task_a():
pass
@app2.task
def task_b():
pass
Calling The Task
Calling an actual task simply involves importing the module in tasks.py
and running the .delay()
method
In our tasks
module, since there's a broker involved, what ends up happening is this task gets scheduled on our broker (queue), and the worker pool will pick it up for execution
from tasks import add
result = add.delay(4, 4)
Results
The results backend stores state of tasks running
You can use Mongo, SQLAlchemy + DJango, Redis, or even use RPC to send things back to the broker to go back to the consumer / producer
From above, we can then call something like
result.ready() # True or False
Configuration
From the Celery Docs: "The input must be connected to a broker, and the output can be optionally connected to a result backend."
Things are mostly this simple
Then it says "However, if you look closely at the back, there’s a lid revealing loads of sliders, dials, and buttons: this is the configuration."
Because Celery has a ton of stuff to tweak to make things actually run in parallel with proper CPU saturation - are your tasks I/O bound, or CPU bound? Are there certain datasets and volumes that need to be attached? How does everything interact with the backend results set? How are retries done?! Etc
In large scale use cases, there are entire configuration modules with routing + task definitions
celeryconfig.py
broker_url = 'pyamqp://'
result_backend = 'rpc://'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
task_routes = {
'tasks.add': 'low-priority',
}
task_annotations = {
'tasks.add': {'rate_limit': '10/m'}
}
Examples
I/O Bound Task
Let's say we have a setup where we have an API request that comes in, like for a user registration, and the API needs to call a handle()
function that:
- Creates a new user
- Creates their profile and setting configuration
- Queue's a welcome email in SQS
- And finally, after ensuring all of these are done, returns a response to the user
In this scenario, we can have a lightweight FastAPI in front that accepts new requests and calls a task.handle()
function that does everything
The actual workers themselves would be I/O bound, and running on the Celery worker pool, so how does everything look?
src/
├── tasks.py
└── main.py
# tasks.py
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def handle(user_id):
async new_user(user_id)
async new_profile(user_id)
async new_settings(user_id)
@app.task
async def new_user(user_id):
await call_database(user_id, CREATE_USER)
@app.task
async def new_profile(user_id):
await call_database(user_id, CREATE_PROFILE)
@app.task
async def new_settings(user_id):
await call_database(user_id, CREATE_SETTINGS)
from tasks import handle
app = FastAPI()
@app.post("/create")
def create_add_task(user_id: int):
result = handle.delay(user_id)
return {"task_id": result.id}
From here, any API call to /create
will enqueue a new task for processing the user creation workflow. The FastAPI application will immediately respond with the task ID, allowing the client to check the status of the task later if needed
Some people will do the following:
@app.post("/create")
async def create_add_task(user_id: int):
result = await handle.delay(user_id)
return {"task_id": result.id}
Which will block until the task is finished and return the result directly to the client. This approach is generally not recommended for long-running tasks, as it defeats the purpose of using a task queue. Instead, it's better to return the task ID immediately and let the client poll for the result later.
delay()
isn't an async function, and doesn't return an awaitable so you can't use await
with it.
AsyncResult.get()
is truly async call, and can be awaited - most documentation talks about not doing this because "task queue could take a long time to return a result, and you don't want to block your application while waiting for it."
While an event loop (like in FastAPI) can handle other requests while awaiting, the user’s request is still "pending" until the task is done. The main benefit of a task queue is to decouple the request/response cycle from the background work, allowing you to return immediately and let the client poll for the result
Therefore, if you're truly using Celery for I/O bound work you will most likely have to return taskID
to the client and let them poll for the result.
@app.post("/create")
async def create_add_task(user_id: int):
result = handle.delay(user_id)
return {"task_id": result.id}
@app.post("/result/{task_id}")
async def get_task_result(task_id: str):
result = AsyncResult(task_id)
if result.ready():
return {"status": "completed", "result": result.get()}
else:
return {"status": "pending"}