# Broker A broker produce and fetch tasks. It rely on a [queue][] and a [cache][]. Queue available: - **rabbitmq** - **dummy** (Storage in RAM - not persistant) - *Write your own!* Cache available: - **redis** - **dummy** (Storage in RAM - not persistant) - *Write your own!* A [worker][] has to be setup to consume tasks. You may encounter errors if you try to create tasks from the broker without having setup a [worker] first. With many workers, tasks are load balanced accross workers. Load balacing algorithm is Round Robin. ## Setup a broker ```python from aio_task import Broker broker = await Broker.create( queue_type="rabbitmq", # desired type of queue queue_conf={"url": "amqp://guest:guest@localhost:5672", "routing_key": "tasks_queue"}, # dict with the queue configuration - depends on the queue type. cache_type="redis", # desired type of cache cache_conf={"address": "redis://localhost"}, # dict with the cache configuration - depends on the cache type. ) ``` **These params should match with the [worker] ones.** ## Produce a task ```python task_id = await broker.create_task("my_task", {"arg1": 1, "arg2": 2}) ``` - If task "my_task" was never registred by a worker before, it will produce an error. - If a task "my_task" was previously register by a worker then task is pushed to the queue *even if no worker is currently up*. The task_id is a str, generated with uuid.uuid4(). ## Fetch a task ```python task = await broker.get_task(task_id) ``` Tasks are stored in a cache that have a TTL (default 86400 seconds, i.e. 1 day). You can change this value by setting `ttl` in cache_conf. If task is not in Cache, this raise a ValueError. Otherwise, it returns a [Task][] object. ## Shutdown ```python await broker.close() ``` That's it! [Task]: [worker]: [queue]: [cache]: