Broker¶
A broker produce and fetch tasks. It rely on a queue and a cache.
Queue available:
rabbitmq
dummy (Storage in RAM - not persistant)
Write your own!
Cache available:
redis
dummy (Storage in RAM - not persistant)
Write your own!
A worker has to be setup to consume tasks. You may encounter errors if you try to create tasks from the broker without having setup a worker first.
With many workers, tasks are load balanced accross workers. Load balacing algorithm is Round Robin.
Setup a broker¶
from aio_task import Broker
broker = await Broker.create(
queue_type="rabbitmq", # desired type of queue
queue_conf={"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}, # dict with the queue configuration - depends on the queue type.
cache_type="redis", # desired type of cache
cache_conf={"address": "redis://localhost"}, # dict with the cache configuration - depends on the cache type.
)
These params should match with the worker ones.
Produce a task¶
task_id = await broker.create_task("my_task", {"arg1": 1, "arg2": 2})
If task “my_task” was never registred by a worker before, it will produce an error.
If a task “my_task” was previously register by a worker then task is pushed to the queue even if no worker is currently up.
The task_id is a str, generated with uuid.uuid4().