Queue API

Queue is responsible to forward messages between brokers and consumers.

Submodules

aio_task.queue.base module

Define Queue interface.

class aio_task.queue.base.Consumer[source]

Bases: abc.ABC

Queue interface used to consume tasks.

async ack(message)[source]

Acknowledge a message was processed → remove it from the queue.

async close()[source]

Shutdown.

async classmethod create(queue_conf, callback)[source]

Create a new queue instance.

Parameters
  • queue_conf (dict) –

  • callback – coroutine called on new message with Message as param.

Return type

Queue

async start()[source]

Start to consum messsages.

class aio_task.queue.base.Message(task)[source]

Bases: object

Items in queue.

property params
set_task_result(result)[source]

Set result for a task.

property task_name
class aio_task.queue.base.Producer[source]

Bases: abc.ABC

Queue interface used to produce tasks.

async close()[source]

Shutdown.

async classmethod create(queue_conf)[source]

Create a new queue instance.

Parameters

queue_conf (dict) –

Return type

Queue

async push(task)[source]

Add a task to the queue.

Parameters

task (Task) –

aio_task.queue.dummy module

Implement Queue interface.

class aio_task.queue.dummy.Consumer(callback)[source]

Bases: aio_task.queue.base.Consumer

async ack(message)[source]

Remove a message from queue.

async close()[source]

Shutdown.

async classmethod create(queue_conf, callback)[source]

Create a new queue instance that consumer messages.

Parameters
  • queue_conf (dict) – not used

  • callback – coroutine called on new message with Message as param.

Return type

Queue

async start()[source]

Start to consume messages.

class aio_task.queue.dummy.Producer[source]

Bases: aio_task.queue.base.Producer

Dummy implementation for the producer interface.

async close()[source]

Shutdown.

async classmethod create(queue_conf)[source]

Create a new queue instance.

Parameters

queue_conf (dict) –

Return type

Queue

async push(task)[source]

Add a task to the queue.

Parameters

task (Task) –

aio_task.queue.rabbitmq module

Implement Queue for rabbitmq using aio_pika.

class aio_task.queue.rabbitmq.Consumer(rabbitmq_url, routing_key, callback)[source]

Bases: aio_task.queue.base.Consumer

Queue interface used to consume tasks.

async ack(message)[source]

Acknoledge a message was processed → remove it from the queue.

Parameters

message (Message) – arg given to the callback

async close()[source]

Shutdown.

async connect()[source]
async classmethod create(queue_conf, callback)[source]

Create a new queue instance.

Parameters
  • queue_conf (dict) – See create()

  • callback – coroutine called on new message with Message as param.

Return type

Queue

async start()[source]

Start to consum messsages.

class aio_task.queue.rabbitmq.Message(task, rmq_message)[source]

Bases: aio_task.queue.base.Message

Items in queue.

class aio_task.queue.rabbitmq.Producer(rabbitmq_url, routing_key)[source]

Bases: aio_task.queue.base.Producer

Queue interface used to produce tasks.

async close()[source]

Shutdown.

async connect()[source]

Acquire required connections.

async classmethod create(queue_conf)[source]

Create a new queue instance.

Parameters

queue_conf (dict) – see below

Return type

Queue

queue_conf = {
    "url": str,  # rabbitmq url
    "routing_key": str,  # default "tasks_queue"
}
async push(task)[source]

Add a task to the queue.

Parameters

task (Task) –

async aio_task.queue.rabbitmq.acquire_connection(rabbitmq_url)[source]

Try to connect until success. Wait 1 seconds before retry.

Parameters

rabbitmq_url (str) – rabbitmq url to connect to.

Returns

aio_pika.connection