Broker

A broker produce and fetch tasks. It rely on a queue and a cache.

Queue available:

  • rabbitmq

  • dummy (Storage in RAM - not persistant)

  • Write your own!

Cache available:

  • redis

  • dummy (Storage in RAM - not persistant)

  • Write your own!

A worker has to be setup to consume tasks. You may encounter errors if you try to create tasks from the broker without having setup a worker first.

With many workers, tasks are load balanced accross workers. Load balacing algorithm is Round Robin.

Setup a broker

from aio_task import Broker

broker = await Broker.create(
    queue_type="rabbitmq",  # desired type of queue
    queue_conf={"url": "amqp://guest:guest@localhost:5672",
                "routing_key": "tasks_queue"},  # dict with the queue configuration - depends on the queue type.
    cache_type="redis",  # desired type of cache
    cache_conf={"address": "redis://localhost"},  # dict with the cache configuration - depends on the cache type.
)

These params should match with the worker ones.

Produce a task

task_id = await broker.create_task("my_task", {"arg1": 1, "arg2": 2})
  • If task “my_task” was never registred by a worker before, it will produce an error.

  • If a task “my_task” was previously register by a worker then task is pushed to the queue even if no worker is currently up.

The task_id is a str, generated with uuid.uuid4().

Fetch a task

task = await broker.get_task(task_id)

Tasks are stored in a cache that have a TTL (default 86400 seconds, i.e. 1 day). You can change this value by setting ttl in cache_conf.

If task is not in Cache, this raise a ValueError.

Otherwise, it returns a Task object.

Shutdown

await broker.close()

That’s it!