Skip to content

Message Brokers

  • Back to Bootstrap Overview

    Return to the Bootstrap overview page with all configuration options.

    Back to Overview


Overview

Message brokers are used to publish NotificationEvent and ECSTEvent events to external systems. The python-cqrs package supports multiple message broker implementations.

Broker Type Use Case Production Ready
DevnullMessageBroker Testing, development ❌ No
KafkaMessageBroker High-throughput, distributed systems ✅ Yes
AMQPMessageBroker RabbitMQ, traditional message queues ✅ Yes
Choosing a Message Broker
  • DevnullMessageBroker: Use for testing and development when you don't need actual message publishing
  • KafkaMessageBroker: Use for high-throughput, distributed systems with strong ordering guarantees
  • AMQPMessageBroker: Use for traditional message queue patterns, RabbitMQ integration

DevnullMessageBroker

The DevnullMessageBroker is a no-op broker used for testing. It doesn't actually send messages anywhere but logs warnings.

Development Only

Never use DevnullMessageBroker in production. It's designed for testing and development only.

from cqrs.message_brokers import devnull

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    message_broker=devnull.DevnullMessageBroker(),
)

KafkaMessageBroker

The KafkaMessageBroker publishes events to Apache Kafka topics.

from cqrs.message_brokers import kafka
from cqrs.adapters.kafka import KafkaProducerAdapter

# Create Kafka producer adapter
kafka_producer = KafkaProducerAdapter(
    bootstrap_servers=["localhost:9092"],
    client_id="my-app",
    acks="all",  # Wait for all replicas
    enable_idempotence=True,
)

# Create message broker
kafka_broker = kafka.KafkaMessageBroker(
    producer=kafka_producer,
    aiokafka_log_level="ERROR",  # Suppress verbose logging
)

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=events_mapper,
    message_broker=kafka_broker,
)

AMQPMessageBroker

The AMQPMessageBroker publishes events to RabbitMQ or other AMQP-compatible brokers.

from cqrs.message_brokers import amqp
from cqrs.adapters.amqp import AMQPPublisherAdapter
import aio_pika

# Create AMQP publisher
amqp_publisher = AMQPPublisherAdapter(
    dsn="amqp://user:password@localhost/",
)

# Create message broker
amqp_broker = amqp.AMQPMessageBroker(
    publisher=amqp_publisher,
    exchange_name="events",
    pika_log_level="ERROR",
)

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    domain_events_mapper=events_mapper,
    message_broker=amqp_broker,
)

Custom Message Broker

You can create custom message brokers by implementing the MessageBroker protocol:

from cqrs.message_brokers import protocol

class CustomMessageBroker(protocol.MessageBroker):
    async def send_message(self, message: protocol.Message) -> None:
        # Custom implementation
        print(f"Sending {message.message_name} to {message.topic}")
        # Send to your custom broker

mediator = bootstrap.bootstrap(
    di_container=di.Container(),
    commands_mapper=commands_mapper,
    message_broker=CustomMessageBroker(),
)