Kafka Adapter¶
Module: Messaging — Module Guide Package:
pyfly.messaging.adapters.kafkaBackend: aiokafka 0.10+
Quick Start¶
Installation¶
Minimal Configuration¶
Minimal Example¶
from pyfly.messaging import message_listener
@message_listener(topic="orders", group="order-service")
async def handle_order(msg: Message) -> None:
print(f"Received order: {msg.value}")
Configuration Reference¶
| Key | Type | Default | Description |
|---|---|---|---|
pyfly.messaging.provider |
str |
"memory" |
Adapter selection (auto, kafka, rabbitmq, memory) |
pyfly.messaging.kafka.bootstrap-servers |
str |
"localhost:9092" |
Comma-separated Kafka broker addresses |
When provider is "auto", PyFly selects the adapter based on which library is installed. If aiokafka is found, the Kafka adapter is used.
Adapter-Specific Features¶
KafkaAdapter¶
Implements MessageBrokerPort using AIOKafkaProducer and AIOKafkaConsumer.
- Publishing: Serializes messages to JSON and sends to the specified topic with optional headers
- Subscribing: Creates async consumer loops with consumer group support
- Headers: Encodes/decodes message headers (string values)
Consumer Groups¶
The group parameter on @message_listener maps directly to Kafka consumer groups for load-balanced consumption across instances.
Lifecycle¶
start()— Starts the Kafka producer and all consumer loopsstop()— Gracefully stops consumers and flushes the producer
Testing¶
When no broker library is installed, PyFly auto-configures InMemoryMessageBroker — no Kafka needed for unit tests.
See Also¶
- Messaging Module Guide — Full API reference: publishing, consuming, message listeners
- RabbitMQ Adapter — Alternative messaging backend
- Adapter Catalog