Welcome to aio-task

Asynchronous task manager for asyncio and Python.

Key Features

  • A simple worker interface to register coroutines as tasks.

  • A simple broker interface to produce and fetch tasks.

  • Broker and worker(s) can be setup in a single program avoiding external service dependencies (by using dummies queue and cache).

  • Task is not lost if worker crash during processing it, it’s keep in queue and re-processed until a worker ack.

  • Task exceptions are not lost: you will retreive them in the task’s result.

  • Works out of the box with rabbitmq and redis.

  • Easily hackable to add new queuing/caching services

Library Installation

$ pip install aio-task

Getting Started

Worker (a.k.a. tasks consumer) example:

import asyncio
from aio_task import Worker

async def addition(a, b):
    """ Task example. """
    return a + b

async def start_worker():
    # setup worker
    rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
                       "routing_key": "tasks_queue"}
    redis_config = {"address": "redis://localhost"}
    worker = await Worker.create("rabbitmq", rabbitmq_config,
                                 "redis", redis_config)

    # register your tasks...
    worker.register_handler(addition)

    # start to consume tasks
    await worker.start()

    return worker

loop = asyncio.get_event_loop()
worker = loop.run_until_complete(start_worker())

try:
    loop.run_forever()
except KeyboardInterrupt:
    loop.run_until_complete(worker.close())  # gracefull shutdown

loop.close()

Broker (a.k.a. tasks producer) example:

import asyncio
from aio_task import Broker

async def sample_addition():
    # setup broker
    rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
                       "routing_key": "tasks_queue"}
    redis_config = {"address": "redis://localhost"}
    broker = await Broker.create("rabbitmq", rabbitmq_config,
                                 "redis", redis_config)
    # produce task
    task_id = await broker.create_task("addition", {"a": 1, "b": 2})
    await asyncio.sleep(0.1)
    # fetch task
    task = await broker.get_task(task_id)
    print(task)
    await broker.close()  # gracefull shutdown

loop = asyncio.get_event_loop()
loop.run_until_complete(sample_addition())
loop.run_until_complete(broker.close())

Source code

The project is hosted on GitLab

Please feel free to file an issue on the bug tracker if you have found a bug or have some suggestion in order to improve the library.

Dependencies

  • Python 3.6+

Contributing

Please fell free to fork the project and make a Pull Request.

Authors and License

The aio-task package is written mostly by Constantin De La Roche.

It’s Apache 2 licensed and freely available.

Feel free to improve this package and send a pull request to GitLab.