RabbitMQ Adapter¶
Module: Messaging — Module Guide Package:
pyfly.messaging.adapters.rabbitmqBackend: aio-pika 9.0+
Quick Start¶
Installation¶
Minimal Configuration¶
Minimal Example¶
from pyfly.messaging import message_listener
@message_listener(topic="orders", group="order-service")
async def handle_order(msg: Message) -> None:
print(f"Received order: {msg.value}")
Configuration Reference¶
| Key | Type | Default | Description |
|---|---|---|---|
pyfly.messaging.provider |
str |
"memory" |
Adapter selection (auto, kafka, rabbitmq, memory) |
pyfly.messaging.rabbitmq.url |
str |
"amqp://guest:guest@localhost/" |
RabbitMQ connection URL (AMQP) |
When provider is "auto", PyFly selects the adapter based on which library is installed. If aio-pika is found, the RabbitMQ adapter is used.
Note: The exchange name defaults to
"pyfly"and is not configurable viapyfly.yamlin the auto-configured adapter. To customise it, constructRabbitMQAdapter(url=..., exchange_name=...)manually as a@bean.
Adapter-Specific Features¶
RabbitMQAdapter¶
Implements MessageBrokerPort using aio_pika.connect_robust().
- Exchange: Uses a single direct exchange (default name:
"pyfly") with topics as routing keys - Queues: Declares durable queues per consumer group
- Publishing: Serializes messages to JSON and publishes with optional headers
- Subscribing: Creates async consumers with acknowledgment support
Consumer Groups¶
Consumer groups are mapped to RabbitMQ queues. Multiple instances with the same group share the queue for competing-consumer load balancing.
Lifecycle¶
start()— Establishes a robust connection, declares exchange and queues, starts consumersstop()— Closes the connection gracefully
Testing¶
When no broker library is installed, PyFly auto-configures InMemoryMessageBroker — no RabbitMQ needed for unit tests.
See Also¶
- Messaging Module Guide — Full API reference: publishing, consuming, message listeners
- Kafka Adapter — Alternative messaging backend
- Adapter Catalog