A worker consume tasks. It register coroutines as tasks, under a task_name. It use by default the coroutine name - but be carefull of decorators because of the __name__ attribute.

Like the broker, worker rely on a queue and a cache. Check the broker page for available queues and caches.

It’s better to setup a worker first (before the broker), so you’re sure the queue is setup.

Setup a worker

from aio_task import Worker

worker = await Worker.create(
    queue_type="rabbitmq",  # desired type of queue
    queue_conf={"url": "amqp://guest:guest@localhost:5672",
                "routing_key": "tasks_queue"},  # dict with the queue configuration - depends on the queue type.
    cache_type="redis",  # desired type of cache
    cache_conf={"address": "redis://localhost"},  # dict with the cache configuration - depends on the cache type.

These params should match with the broker ones.

Register tasks

async def coro(**kwargs):  # task example

async def coro2(**kwargs):

    coro=coro,  # the coroutine corresponding to the task.
    task_name="coro"  # optional → use coro.__name__
worker.register_handler(coro2)  # register another task
worker.register_handler(coro2, task_name="task1")  # register task under another name

When you have register all your tasks, start the worker to consume tasks.

await worker.start()

Once the worker is started, you cannot register other tasks.

Error management

If an exception occure while processing the task, the exception will be logged by the worker and set as task’s result.

async def coro():  # example of task giving an error
    raise ValueError("Oops")

# setup and start the worker
await worker.start()
# Then, from the broker perspective
task_id = await broker.create_task("coro")
await asyncio.sleep(0.1)  # time for task to be processed
task = await broker.get_task(task_id)  # fetch task from the cache

assert task.result == {"exception": {"class": "ValueError",
                                     "args": ("Oops",),
                                     "str": "Oops"}}

The task result has to be serializable (with json.dumps), otherwise worker will log an error and set the error as task’s result.

# task result if the result is not serializable.
{'exception': {'args': ('Task result is not serializable',),
               'class': 'ValueError',
               'str': 'Task result is not serializable'}}


await worker.close()

That’s it!