RabbitMQ ist ein leistungsstarker Message Broker für asynchrone Kommunikation. Er ermöglicht zuverlässige Nachrichtenübermittlung zwischen Anwendungen und Microservices.
Warum RabbitMQ?
Anwendungsfälle
- Asynchrone Task-Verarbeitung
- Microservice-Kommunikation
- Event-Driven Architecture
- Workload Distribution
- Notification Systems
- Log AggregationVorteile
- Zuverlässige Zustellung
- Flexible Routing-Optionen
- Clustering und Hochverfügbarkeit
- Management-UI integriert
- Viele Client-BibliothekenInstallation
Debian/Ubuntu
# Erlang Repository hinzufügen
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.deb.sh' | sudo bash
# RabbitMQ Repository hinzufügen
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.deb.sh' | sudo bash
# Installation
apt update
apt install erlang-base erlang-nox rabbitmq-server
# Service starten
systemctl enable --now rabbitmq-serverCentOS/RHEL
# Erlang und RabbitMQ Repositories
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.rpm.sh' | sudo bash
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.rpm.sh' | sudo bash
dnf install erlang rabbitmq-server
systemctl enable --now rabbitmq-serverDocker
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=sicheres-passwort \
rabbitmq:3-managementManagement-Plugin aktivieren
Plugin aktivieren
rabbitmq-plugins enable rabbitmq_managementWeb-Interface
URL: http://server-ip:15672
Benutzer: guest (nur localhost)
Passwort: guestAdmin-Benutzer erstellen
# Benutzer erstellen
rabbitmqctl add_user admin sicheres-passwort
# Administrator-Rechte
rabbitmqctl set_user_tags admin administrator
# Berechtigungen für alle vhosts
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
# Guest-Benutzer entfernen (Produktion!)
rabbitmqctl delete_user guestGrundkonzepte
Komponenten
Producer → Exchange → Queue → Consumer
Producer: Sendet Nachrichten
Exchange: Routet Nachrichten
Queue: Speichert Nachrichten
Consumer: Empfängt Nachrichten
Binding: Verbindet Exchange mit QueueExchange-Typen
| Typ | Routing | |-----|---------| | direct | Exakte Routing-Key-Übereinstimmung | | fanout | An alle gebundenen Queues | | topic | Pattern-Matching mit Wildcards | | headers | Basierend auf Header-Attributen |
Konfiguration
Konfigurationsdatei
# /etc/rabbitmq/rabbitmq.conf
# Netzwerk
listeners.tcp.default = 5672
# Management
management.tcp.port = 15672
# Speicherlimit (40% RAM)
vm_memory_high_watermark.relative = 0.4
# Disk-Limit
disk_free_limit.absolute = 2GB
# Logging
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = infoUmgebungsvariablen
# /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODENAME=rabbit@hostname
RABBITMQ_NODE_PORT=5672
RABBITMQ_DIST_PORT=25672CLI-Verwaltung
rabbitmqctl
# Status
rabbitmqctl status
rabbitmqctl cluster_status
# Benutzer
rabbitmqctl list_users
rabbitmqctl add_user username password
rabbitmqctl delete_user username
rabbitmqctl change_password username newpassword
# Berechtigungen
rabbitmqctl list_permissions
rabbitmqctl set_permissions -p vhost user "conf" "write" "read"
# Queues
rabbitmqctl list_queues
rabbitmqctl list_queues name messages consumers
# Exchanges
rabbitmqctl list_exchanges
# Verbindungen
rabbitmqctl list_connections
rabbitmqctl list_channels
# Queue leeren
rabbitmqctl purge_queue queue_namerabbitmqadmin
# Installation
wget http://localhost:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
mv rabbitmqadmin /usr/local/bin/
# Queue erstellen
rabbitmqadmin declare queue name=my-queue durable=true
# Exchange erstellen
rabbitmqadmin declare exchange name=my-exchange type=direct
# Binding erstellen
rabbitmqadmin declare binding source=my-exchange destination=my-queue routing_key=my-key
# Nachricht senden
rabbitmqadmin publish exchange=my-exchange routing_key=my-key payload="Hello"
# Nachricht empfangen
rabbitmqadmin get queue=my-queuePython-Beispiel
Installation
pip install pikaProducer
# producer.py
import pika
import json
# Verbindung herstellen
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
credentials=pika.PlainCredentials('admin', 'password')
)
)
channel = connection.channel()
# Queue deklarieren
channel.queue_declare(queue='task_queue', durable=True)
# Nachricht senden
message = {'task': 'process_image', 'image_id': 123}
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json'
)
)
print(f"Sent: {message}")
connection.close()Consumer
# consumer.py
import pika
import json
import time
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Received: {message}")
# Aufgabe verarbeiten
time.sleep(2) # Simulation
print("Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
# Verbindung
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
credentials=pika.PlainCredentials('admin', 'password')
)
)
channel = connection.channel()
# Queue deklarieren
channel.queue_declare(queue='task_queue', durable=True)
# Fair Dispatch
channel.basic_qos(prefetch_count=1)
# Consumer starten
channel.basic_consume(
queue='task_queue',
on_message_callback=callback
)
print('Waiting for messages...')
channel.start_consuming()PHP-Beispiel
Installation
composer require php-amqplib/php-amqplibProducer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'password');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$data = json_encode(['task' => 'send_email', 'to' => 'user@example.com']);
$msg = new AMQPMessage($data, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_type' => 'application/json'
]);
$channel->basic_publish($msg, '', 'task_queue');
echo "Sent: {$data}\n";
$channel->close();
$connection->close();Consumer
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'password');
$channel = $connection->channel();
$channel->queue_declare('task_queue', false, true, false, false);
$callback = function ($msg) {
$data = json_decode($msg->body, true);
echo "Received: " . print_r($data, true);
// Task verarbeiten
sleep(2);
echo "Done\n";
$msg->ack();
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
echo "Waiting for messages...\n";
$channel->consume();Exchange-Patterns
Direct Exchange
# Routing basierend auf Routing-Key
channel.exchange_declare(exchange='logs', exchange_type='direct')
# Binding
channel.queue_bind(queue='error_logs', exchange='logs', routing_key='error')
channel.queue_bind(queue='info_logs', exchange='logs', routing_key='info')
# Publish
channel.basic_publish(exchange='logs', routing_key='error', body='Error message')Fanout Exchange (Broadcast)
# An alle Queues senden
channel.exchange_declare(exchange='notifications', exchange_type='fanout')
# Alle gebundenen Queues erhalten die Nachricht
channel.basic_publish(exchange='notifications', routing_key='', body='Broadcast message')Topic Exchange
# Pattern-basiertes Routing
channel.exchange_declare(exchange='events', exchange_type='topic')
# Bindings mit Wildcards
# * = genau ein Wort
# # = null oder mehr Wörter
channel.queue_bind(queue='all_orders', exchange='events', routing_key='order.#')
channel.queue_bind(queue='eu_orders', exchange='events', routing_key='order.eu.*')
# Publish
channel.basic_publish(exchange='events', routing_key='order.eu.created', body='New EU order')Clustering
Cluster einrichten
# Auf Node 2 und 3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# Cluster-Status
rabbitmqctl cluster_statusErlang Cookie
# Alle Nodes müssen denselben Cookie haben
# /var/lib/rabbitmq/.erlang.cookieMirrored Queues (HA)
# Policy für gespiegelte Queues
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'Monitoring
Prometheus-Plugin
rabbitmq-plugins enable rabbitmq_prometheusMetriken verfügbar unter: http://localhost:15692/metrics
Wichtige Metriken
rabbitmq_queue_messages # Nachrichten in Queue
rabbitmq_queue_consumers # Consumer pro Queue
rabbitmq_connections_total # Verbindungen
rabbitmq_channel_messages_acked # Bestätigte NachrichtenHealth Check
# CLI
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_local_alarms
rabbitmq-diagnostics check_port_connectivity
# HTTP API
curl -u admin:password http://localhost:15672/api/healthchecks/nodeSicherheit
TLS aktivieren
# /etc/rabbitmq/rabbitmq.conf
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ssl/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/ssl/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = trueVirtual Hosts
# vhost erstellen
rabbitmqctl add_vhost production
rabbitmqctl add_vhost staging
# Berechtigungen setzen
rabbitmqctl set_permissions -p production app_user ".*" ".*" ".*"Firewall
ufw allow 5672/tcp # AMQP
ufw allow 15672/tcp # Management (nur intern!)
ufw allow 25672/tcp # ClusterBackup
Definitionen exportieren
# Über API
curl -u admin:password \
http://localhost:15672/api/definitions > rabbitmq-backup.json
# Wiederherstellen
curl -u admin:password \
-H "Content-Type: application/json" \
-X POST \
-d @rabbitmq-backup.json \
http://localhost:15672/api/definitionsDatenverzeichnis
# Stoppen
systemctl stop rabbitmq-server
# Backup
tar -czf rabbitmq-data.tar.gz /var/lib/rabbitmq/
# Starten
systemctl start rabbitmq-serverTroubleshooting
Service startet nicht
# Logs prüfen
journalctl -u rabbitmq-server
cat /var/log/rabbitmq/rabbit*.log
# Erlang-Version prüfen
erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().'Queue wächst
# Consumer prüfen
rabbitmqctl list_queues name messages consumers
# Queue leeren
rabbitmqctl purge_queue queue_nameMemory Alarm
# Speicher prüfen
rabbitmqctl status | grep memory
# Limit erhöhen
# rabbitmq.conf
vm_memory_high_watermark.relative = 0.6Zusammenfassung
| Befehl | Funktion | |--------|----------| | rabbitmqctl status | Status anzeigen | | rabbitmqctl list_queues | Queues auflisten | | rabbitmqctl add_user | Benutzer erstellen | | rabbitmqctl set_permissions | Rechte setzen | | rabbitmqctl purge_queue | Queue leeren | | rabbitmq-plugins enable | Plugin aktivieren |
| Exchange-Typ | Verwendung | |--------------|------------| | direct | Gezieltes Routing | | fanout | Broadcast | | topic | Pattern-Matching | | headers | Header-basiert |
Fazit
RabbitMQ ist ein zuverlässiger Message Broker für asynchrone Kommunikation. Die verschiedenen Exchange-Typen ermöglichen flexible Routing-Szenarien. Für Produktivumgebungen empfiehlt sich ein Cluster-Setup mit gespiegelten Queues. Das Management-Plugin bietet eine komfortable Web-Oberfläche für Monitoring und Administration.