Welcome to aio-task¶
Asynchronous task manager for asyncio and Python.
Source code¶
The project is hosted on GitLab
Please feel free to file an issue on the bug tracker if you have found a bug or have some suggestion in order to improve the library.
Key Features¶
A simple worker interface to register coroutines as tasks.
A simple broker interface to produce and fetch tasks.
Broker and worker(s) can be setup in a single program avoiding external service dependencies (by using dummies queue and cache).
Task is not lost if worker crash during processing it, it’s keep in queue and re-processed until a worker ack.
Task exceptions are not lost: you will retrieve them in the task’s result.
Works out of the box with rabbitmq and redis.
Easily hackable to add new queuing/caching services
Library Installation¶
$ pip install aio-task
Getting Started¶
See examples here: https://gitlab.com/cdlr75/aio-task/-/tree/master/examples
Worker (a.k.a. tasks consumer) example:
import asyncio
from aio_task import Worker
async def addition(a, b):
""" Task example. """
return a + b
async def start_worker():
# setup worker
rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}
redis_config = {"address": "redis://localhost"}
worker = await Worker.create("rabbitmq", rabbitmq_config,
"redis", redis_config)
# register your tasks...
worker.register_handler(addition)
# start to consume tasks
await worker.start()
return worker
loop = asyncio.get_event_loop()
worker = loop.run_until_complete(start_worker())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(worker.close()) # gracefull shutdown
loop.close()
Broker (a.k.a. tasks producer) example:
import asyncio
from aio_task import Broker
async def sample_addition():
# setup broker
rabbitmq_config = {"url": "amqp://guest:guest@localhost:5672",
"routing_key": "tasks_queue"}
redis_config = {"address": "redis://localhost"}
broker = await Broker.create("rabbitmq", rabbitmq_config,
"redis", redis_config)
# produce task
task_id = await broker.create_task("addition", {"a": 1, "b": 2})
await asyncio.sleep(0.1)
# fetch task
task = await broker.get_task(task_id)
print(task)
await broker.close() # gracefull shutdown
loop = asyncio.get_event_loop()
loop.run_until_complete(sample_addition())
loop.run_until_complete(broker.close())
Dependencies¶
Python 3.6+
Contributing¶
Please fell free to fork the project and make a Pull Request.
Authors and License¶
The aio-task
package is written mostly by Constantin De La Roche.
It’s Apache 2 licensed and freely available.
Feel free to improve this package and send a pull request to GitLab.