RabbitMQ ist ein zuverlässiger Message Broker für asynchrone Kommunikation. Er ermöglicht lose Kopplung zwischen Services und garantierte Nachrichtenzustellung.
Installation
# Debian/Ubuntu
apt install rabbitmq-server
# RHEL/CentOS
dnf install rabbitmq-server
# Starten
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
Docker
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
Management-Plugin
# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_management
# Web-UI: http://localhost:15672
# Default: guest/guest (nur localhost)
User-Verwaltung
# Admin-User erstellen
rabbitmqctl add_user admin secret123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# Anwendungs-User
rabbitmqctl add_user app_user password123
rabbitmqctl set_permissions -p / app_user "^app\." "^app\." "^app\."
# User auflisten
rabbitmqctl list_users
# Guest-Zugriff einschränken
rabbitmqctl delete_user guest
Virtual Hosts
# vhost erstellen
rabbitmqctl add_vhost production
rabbitmqctl add_vhost staging
# Berechtigungen
rabbitmqctl set_permissions -p production app_user ".*" ".*" ".*"
# vhosts auflisten
rabbitmqctl list_vhosts
Queues
CLI
# Queue erstellen (besser über Code)
rabbitmqctl declare queue name=tasks durable=true
# Queues auflisten
rabbitmqctl list_queues name messages consumers
# Queue löschen
rabbitmqctl delete_queue tasks
Queue-Typen
| Typ | Beschreibung |
|---|
| Classic | Standard-Queue |
| Quorum | Hochverfügbar (Raft) |
| Stream | Log-ähnlich (Append-only) |
Exchanges
| Typ | Routing |
|---|
| direct | Exakter Routing Key |
| fanout | An alle Queues |
| topic | Pattern-basiert |
| headers | Nach Header-Werten |
Exchange erstellen
rabbitmqctl declare exchange name=logs type=topic durable=true
Python-Beispiel
Producer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Queue erstellen
channel.queue_declare(queue='tasks', durable=True)
# Nachricht senden
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
connection.close()
Consumer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
def callback(ch, method, properties, body):
print(f"Received: {body}")
# Verarbeitung...
ch.basic_ack(delivery_tag=method.delivery_tag)
# Fair Dispatch
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='tasks',
on_message_callback=callback
)
channel.start_consuming()
Pub/Sub mit Exchange
# Publisher
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='Log message')
# Subscriber
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
Topic Exchange
# Exchange
channel.exchange_declare(exchange='events', exchange_type='topic')
# Publisher
channel.basic_publish(
exchange='events',
routing_key='user.created',
body='New user created'
)
# Consumer für user.*
channel.queue_bind(exchange='events', queue=queue_name, routing_key='user.*')
# Consumer für *.error
channel.queue_bind(exchange='events', queue=queue_name, routing_key='*.error')
Clustering
Cluster erstellen
# Auf Node 2 und 3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# Cluster-Status
rabbitmqctl cluster_status
Mirrored Queues (Classic)
rabbitmqctl set_policy ha-all ".*" \
'{"ha-mode":"all"}' \
--apply-to queues
Quorum Queues (empfohlen)
channel.queue_declare(
queue='tasks',
durable=True,
arguments={'x-queue-type': 'quorum'}
)
Monitoring
# Übersicht
rabbitmqctl status
# Queues mit Nachrichten
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
# Verbindungen
rabbitmqctl list_connections user peer_host state
# Channels
rabbitmqctl list_channels connection consumer_count messages_unacknowledged
Prometheus
# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_prometheus
# Metriken: http://localhost:15692/metrics
Konfiguration
rabbitmq.conf
# /etc/rabbitmq/rabbitmq.conf
# Listener
listeners.tcp.default = 5672
# Management
management.tcp.port = 15672
# Speicher-Limits
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB
# Logging
log.file.level = info
Umgebungsvariablen
# /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODENAME=rabbit@hostname
RABBITMQ_NODE_PORT=5672
RABBITMQ_DIST_PORT=25672
TLS
# rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca.crt
ssl_options.certfile = /etc/rabbitmq/server.crt
ssl_options.keyfile = /etc/rabbitmq/server.key
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false
Dead Letter
# Dead Letter Exchange
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='')
# Queue mit DLX
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-message-ttl': 60000 # 60 Sekunden
}
)
Shovel (Replikation)
# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
# Shovel per Policy
rabbitmqctl set_parameter shovel my-shovel \
'{"src-uri": "amqp://localhost", "src-queue": "source", "dest-uri": "amqp://remote-host", "dest-queue": "destination"}'
Backup
# Definitions exportieren
rabbitmqctl export_definitions /backup/definitions.json
# Definitions importieren
rabbitmqctl import_definitions /backup/definitions.json
Zusammenfassung
| Befehl | Funktion |
|---|
| rabbitmqctl add_user | User erstellen |
| rabbitmqctl list_queues | Queues anzeigen |
| rabbitmqctl cluster_status | Cluster-Status |
| rabbitmqctl set_policy | Policy setzen |
| rabbitmq-plugins enable | Plugin aktivieren |
| Port | Dienst |
|---|
| 5672 | AMQP |
| 5671 | AMQPS (TLS) |
| 15672 | Management UI |
| 25672 | Cluster |
| Konzept | Beschreibung |
|---|
| Queue | Nachrichten-Warteschlange |
| Exchange | Routing-Komponente |
| Binding | Verbindung Exchange-Queue |
| vhost | Virtuelle Trennung |
Fazit
RabbitMQ ist zuverlässig für asynchrone Kommunikation. Quorum Queues bieten Hochverfügbarkeit. Exchanges ermöglichen flexibles Routing. Das Management-Plugin vereinfacht die Verwaltung. Für sehr hohe Durchsätze kann Kafka besser geeignet sein.