RabbitMQ ist ein zuverlässiger Message Broker für asynchrone Kommunikation. Er ermöglicht lose Kopplung zwischen Services und garantierte Nachrichtenzustellung.
Installation
# Debian/Ubuntu
apt install rabbitmq-server
# RHEL/CentOS
dnf install rabbitmq-server
# Starten
systemctl enable rabbitmq-server
systemctl start rabbitmq-serverDocker
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-managementManagement-Plugin
# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_management
# Web-UI: http://localhost:15672
# Default: guest/guest (nur localhost)User-Verwaltung
# Admin-User erstellen
rabbitmqctl add_user admin secret123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# Anwendungs-User
rabbitmqctl add_user app_user password123
rabbitmqctl set_permissions -p / app_user "^app\." "^app\." "^app\."
# User auflisten
rabbitmqctl list_users
# Guest-Zugriff einschränken
rabbitmqctl delete_user guestVirtual Hosts
# vhost erstellen
rabbitmqctl add_vhost production
rabbitmqctl add_vhost staging
# Berechtigungen
rabbitmqctl set_permissions -p production app_user ".*" ".*" ".*"
# vhosts auflisten
rabbitmqctl list_vhostsQueues
CLI
# Queue erstellen (besser über Code)
rabbitmqctl declare queue name=tasks durable=true
# Queues auflisten
rabbitmqctl list_queues name messages consumers
# Queue löschen
rabbitmqctl delete_queue tasksQueue-Typen
| Typ | Beschreibung | |-----|--------------| | Classic | Standard-Queue | | Quorum | Hochverfügbar (Raft) | | Stream | Log-ähnlich (Append-only) |
Exchanges
| Typ | Routing | |-----|---------| | direct | Exakter Routing Key | | fanout | An alle Queues | | topic | Pattern-basiert | | headers | Nach Header-Werten |
Exchange erstellen
rabbitmqctl declare exchange name=logs type=topic durable=truePython-Beispiel
Producer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Queue erstellen
channel.queue_declare(queue='tasks', durable=True)
# Nachricht senden
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
connection.close()Consumer
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
channel.queue_declare(queue='tasks', durable=True)
def callback(ch, method, properties, body):
print(f"Received: {body}")
# Verarbeitung...
ch.basic_ack(delivery_tag=method.delivery_tag)
# Fair Dispatch
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue='tasks',
on_message_callback=callback
)
channel.start_consuming()Pub/Sub mit Exchange
# Publisher
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='Log message')
# Subscriber
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)Topic Exchange
# Exchange
channel.exchange_declare(exchange='events', exchange_type='topic')
# Publisher
channel.basic_publish(
exchange='events',
routing_key='user.created',
body='New user created'
)
# Consumer für user.*
channel.queue_bind(exchange='events', queue=queue_name, routing_key='user.*')
# Consumer für *.error
channel.queue_bind(exchange='events', queue=queue_name, routing_key='*.error')Clustering
Cluster erstellen
# Auf Node 2 und 3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# Cluster-Status
rabbitmqctl cluster_statusMirrored Queues (Classic)
rabbitmqctl set_policy ha-all ".*" \
'{"ha-mode":"all"}' \
--apply-to queuesQuorum Queues (empfohlen)
channel.queue_declare(
queue='tasks',
durable=True,
arguments={'x-queue-type': 'quorum'}
)Monitoring
# Übersicht
rabbitmqctl status
# Queues mit Nachrichten
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged
# Verbindungen
rabbitmqctl list_connections user peer_host state
# Channels
rabbitmqctl list_channels connection consumer_count messages_unacknowledgedPrometheus
# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_prometheus
# Metriken: http://localhost:15692/metricsKonfiguration
rabbitmq.conf
# /etc/rabbitmq/rabbitmq.conf
# Listener
listeners.tcp.default = 5672
# Management
management.tcp.port = 15672
# Speicher-Limits
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB
# Logging
log.file.level = infoUmgebungsvariablen
# /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODENAME=rabbit@hostname
RABBITMQ_NODE_PORT=5672
RABBITMQ_DIST_PORT=25672TLS
# rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca.crt
ssl_options.certfile = /etc/rabbitmq/server.crt
ssl_options.keyfile = /etc/rabbitmq/server.key
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = falseDead Letter
# Dead Letter Exchange
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='')
# Queue mit DLX
channel.queue_declare(
queue='tasks',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx',
'x-message-ttl': 60000 # 60 Sekunden
}
)Shovel (Replikation)
# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management
# Shovel per Policy
rabbitmqctl set_parameter shovel my-shovel \
'{"src-uri": "amqp://localhost", "src-queue": "source", "dest-uri": "amqp://remote-host", "dest-queue": "destination"}'Backup
# Definitions exportieren
rabbitmqctl export_definitions /backup/definitions.json
# Definitions importieren
rabbitmqctl import_definitions /backup/definitions.jsonZusammenfassung
| Befehl | Funktion | |--------|----------| | rabbitmqctl add_user | User erstellen | | rabbitmqctl list_queues | Queues anzeigen | | rabbitmqctl cluster_status | Cluster-Status | | rabbitmqctl set_policy | Policy setzen | | rabbitmq-plugins enable | Plugin aktivieren |
| Port | Dienst | |------|--------| | 5672 | AMQP | | 5671 | AMQPS (TLS) | | 15672 | Management UI | | 25672 | Cluster |
| Konzept | Beschreibung | |---------|--------------| | Queue | Nachrichten-Warteschlange | | Exchange | Routing-Komponente | | Binding | Verbindung Exchange-Queue | | vhost | Virtuelle Trennung |
Fazit
RabbitMQ ist zuverlässig für asynchrone Kommunikation. Quorum Queues bieten Hochverfügbarkeit. Exchanges ermöglichen flexibles Routing. Das Management-Plugin vereinfacht die Verwaltung. Für sehr hohe Durchsätze kann Kafka besser geeignet sein.