Welcome to aio-task

Asynchronous task manager for asyncio and Python.

Source code

The project is hosted on GitLab

Please feel free to file an issue on the bug tracker if you have found a bug or have some suggestion in order to improve the library.

Key Features

  • A simple worker interface to register coroutines as tasks.

  • A simple broker interface to produce and fetch tasks.

  • Broker and worker(s) can be setup in a single program avoiding external service dependencies (by using dummies queue and cache).

  • Task is not lost if worker crash during processing it, it’s keep in queue and re-processed until a worker ack.

  • Task exceptions are not lost: you will retrieve them in the task’s result.

  • Works out of the box with rabbitmq and redis.

  • Easily hackable to add new queuing/caching services

Library Installation

$ pip install aio-task

Getting Started

See examples here: https://gitlab.com/cdlr75/aio-task/-/tree/master/examples

Worker (a.k.a. tasks consumer) example:

import asyncio
from aio_task import Worker

async def addition(a, b):
    """ Task example. """
    return a + b

async def start_worker():
    # setup worker
    rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
                       "routing_key": "tasks_queue"}
    redis_config = {"address": "redis://localhost"}
    worker = await Worker.create("rabbitmq", rabbitmq_config,
                                 "redis", redis_config)

    # register your tasks...
    worker.register_handler(addition)

    # start to consume tasks
    await worker.start()

    return worker

loop = asyncio.get_event_loop()
worker = loop.run_until_complete(start_worker())

try:
    loop.run_forever()
except KeyboardInterrupt:
    loop.run_until_complete(worker.close())  # gracefull shutdown

loop.close()

Broker (a.k.a. tasks producer) example:

import asyncio
from aio_task import Broker

async def sample_addition():
    # setup broker
    rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
                       "routing_key": "tasks_queue"}
    redis_config = {"address": "redis://localhost"}
    broker = await Broker.create("rabbitmq", rabbitmq_config,
                                 "redis", redis_config)
    # produce task
    task_id = await broker.create_task("addition", {"a": 1, "b": 2})
    await asyncio.sleep(0.1)
    # fetch task
    task = await broker.get_task(task_id)
    print(task)
    await broker.close()  # gracefull shutdown

loop = asyncio.get_event_loop()
loop.run_until_complete(sample_addition())
loop.run_until_complete(broker.close())

Dependencies

  • Python 3.6+

Contributing

Please fell free to fork the project and make a Pull Request.

Authors and License

The aio-task package is written mostly by Constantin De La Roche.

It’s Apache 2 licensed and freely available.

Feel free to improve this package and send a pull request to GitLab.