RabbitMQ ist ein leistungsstarker Message Broker für asynchrone Kommunikation. Er ermöglicht zuverlässige Nachrichtenübermittlung zwischen Anwendungen und Microservices.

Warum RabbitMQ?

Anwendungsfälle

- Asynchrone Task-Verarbeitung
- Microservice-Kommunikation
- Event-Driven Architecture
- Workload Distribution
- Notification Systems
- Log Aggregation

Vorteile

- Zuverlässige Zustellung
- Flexible Routing-Optionen
- Clustering und Hochverfügbarkeit
- Management-UI integriert
- Viele Client-Bibliotheken

Installation

Debian/Ubuntu

# Erlang Repository hinzufügen
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.deb.sh' | sudo bash

# RabbitMQ Repository hinzufügen
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.deb.sh' | sudo bash

# Installation
apt update
apt install erlang-base erlang-nox rabbitmq-server

# Service starten
systemctl enable --now rabbitmq-server

CentOS/RHEL

# Erlang und RabbitMQ Repositories
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/setup.rpm.sh' | sudo bash
curl -1sLf 'https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/setup.rpm.sh' | sudo bash

dnf install erlang rabbitmq-server

systemctl enable --now rabbitmq-server

Docker

docker run -d --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=sicheres-passwort \
  rabbitmq:3-management

Management-Plugin aktivieren

Plugin aktivieren

rabbitmq-plugins enable rabbitmq_management

Web-Interface

URL: http://server-ip:15672
Benutzer: guest (nur localhost)
Passwort: guest

Admin-Benutzer erstellen

# Benutzer erstellen
rabbitmqctl add_user admin sicheres-passwort

# Administrator-Rechte
rabbitmqctl set_user_tags admin administrator

# Berechtigungen für alle vhosts
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# Guest-Benutzer entfernen (Produktion!)
rabbitmqctl delete_user guest

Grundkonzepte

Komponenten

Producer → Exchange → Queue → Consumer

Producer:   Sendet Nachrichten
Exchange:   Routet Nachrichten
Queue:      Speichert Nachrichten
Consumer:   Empfängt Nachrichten
Binding:    Verbindet Exchange mit Queue

Exchange-Typen

| Typ | Routing | |-----|---------| | direct | Exakte Routing-Key-Übereinstimmung | | fanout | An alle gebundenen Queues | | topic | Pattern-Matching mit Wildcards | | headers | Basierend auf Header-Attributen |

Konfiguration

Konfigurationsdatei

# /etc/rabbitmq/rabbitmq.conf

# Netzwerk
listeners.tcp.default = 5672

# Management
management.tcp.port = 15672

# Speicherlimit (40% RAM)
vm_memory_high_watermark.relative = 0.4

# Disk-Limit
disk_free_limit.absolute = 2GB

# Logging
log.console = true
log.console.level = info
log.file = /var/log/rabbitmq/rabbit.log
log.file.level = info

Umgebungsvariablen

# /etc/rabbitmq/rabbitmq-env.conf

RABBITMQ_NODENAME=rabbit@hostname
RABBITMQ_NODE_PORT=5672
RABBITMQ_DIST_PORT=25672

CLI-Verwaltung

rabbitmqctl

# Status
rabbitmqctl status
rabbitmqctl cluster_status

# Benutzer
rabbitmqctl list_users
rabbitmqctl add_user username password
rabbitmqctl delete_user username
rabbitmqctl change_password username newpassword

# Berechtigungen
rabbitmqctl list_permissions
rabbitmqctl set_permissions -p vhost user "conf" "write" "read"

# Queues
rabbitmqctl list_queues
rabbitmqctl list_queues name messages consumers

# Exchanges
rabbitmqctl list_exchanges

# Verbindungen
rabbitmqctl list_connections
rabbitmqctl list_channels

# Queue leeren
rabbitmqctl purge_queue queue_name

rabbitmqadmin

# Installation
wget http://localhost:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
mv rabbitmqadmin /usr/local/bin/

# Queue erstellen
rabbitmqadmin declare queue name=my-queue durable=true

# Exchange erstellen
rabbitmqadmin declare exchange name=my-exchange type=direct

# Binding erstellen
rabbitmqadmin declare binding source=my-exchange destination=my-queue routing_key=my-key

# Nachricht senden
rabbitmqadmin publish exchange=my-exchange routing_key=my-key payload="Hello"

# Nachricht empfangen
rabbitmqadmin get queue=my-queue

Python-Beispiel

Installation

pip install pika

Producer

# producer.py
import pika
import json

# Verbindung herstellen
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        credentials=pika.PlainCredentials('admin', 'password')
    )
)
channel = connection.channel()

# Queue deklarieren
channel.queue_declare(queue='task_queue', durable=True)

# Nachricht senden
message = {'task': 'process_image', 'image_id': 123}

channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=json.dumps(message),
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent
        content_type='application/json'
    )
)

print(f"Sent: {message}")
connection.close()

Consumer

# consumer.py
import pika
import json
import time

def callback(ch, method, properties, body):
    message = json.loads(body)
    print(f"Received: {message}")

    # Aufgabe verarbeiten
    time.sleep(2)  # Simulation

    print("Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Verbindung
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        credentials=pika.PlainCredentials('admin', 'password')
    )
)
channel = connection.channel()

# Queue deklarieren
channel.queue_declare(queue='task_queue', durable=True)

# Fair Dispatch
channel.basic_qos(prefetch_count=1)

# Consumer starten
channel.basic_consume(
    queue='task_queue',
    on_message_callback=callback
)

print('Waiting for messages...')
channel.start_consuming()

PHP-Beispiel

Installation

composer require php-amqplib/php-amqplib

Producer

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'password');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = json_encode(['task' => 'send_email', 'to' => 'user@example.com']);

$msg = new AMQPMessage($data, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
    'content_type' => 'application/json'
]);

$channel->basic_publish($msg, '', 'task_queue');

echo "Sent: {$data}\n";

$channel->close();
$connection->close();

Consumer

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'password');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$callback = function ($msg) {
    $data = json_decode($msg->body, true);
    echo "Received: " . print_r($data, true);

    // Task verarbeiten
    sleep(2);

    echo "Done\n";
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

echo "Waiting for messages...\n";
$channel->consume();

Exchange-Patterns

Direct Exchange

# Routing basierend auf Routing-Key
channel.exchange_declare(exchange='logs', exchange_type='direct')

# Binding
channel.queue_bind(queue='error_logs', exchange='logs', routing_key='error')
channel.queue_bind(queue='info_logs', exchange='logs', routing_key='info')

# Publish
channel.basic_publish(exchange='logs', routing_key='error', body='Error message')

Fanout Exchange (Broadcast)

# An alle Queues senden
channel.exchange_declare(exchange='notifications', exchange_type='fanout')

# Alle gebundenen Queues erhalten die Nachricht
channel.basic_publish(exchange='notifications', routing_key='', body='Broadcast message')

Topic Exchange

# Pattern-basiertes Routing
channel.exchange_declare(exchange='events', exchange_type='topic')

# Bindings mit Wildcards
# * = genau ein Wort
# # = null oder mehr Wörter
channel.queue_bind(queue='all_orders', exchange='events', routing_key='order.#')
channel.queue_bind(queue='eu_orders', exchange='events', routing_key='order.eu.*')

# Publish
channel.basic_publish(exchange='events', routing_key='order.eu.created', body='New EU order')

Clustering

Cluster einrichten

# Auf Node 2 und 3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# Cluster-Status
rabbitmqctl cluster_status
# Alle Nodes müssen denselben Cookie haben
# /var/lib/rabbitmq/.erlang.cookie

Mirrored Queues (HA)

# Policy für gespiegelte Queues
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Monitoring

Prometheus-Plugin

rabbitmq-plugins enable rabbitmq_prometheus

Metriken verfügbar unter: http://localhost:15692/metrics

Wichtige Metriken

rabbitmq_queue_messages           # Nachrichten in Queue
rabbitmq_queue_consumers          # Consumer pro Queue
rabbitmq_connections_total        # Verbindungen
rabbitmq_channel_messages_acked   # Bestätigte Nachrichten

Health Check

# CLI
rabbitmq-diagnostics check_running
rabbitmq-diagnostics check_local_alarms
rabbitmq-diagnostics check_port_connectivity

# HTTP API
curl -u admin:password http://localhost:15672/api/healthchecks/node

Sicherheit

TLS aktivieren

# /etc/rabbitmq/rabbitmq.conf

listeners.ssl.default = 5671

ssl_options.cacertfile = /etc/rabbitmq/ssl/ca_certificate.pem
ssl_options.certfile   = /etc/rabbitmq/ssl/server_certificate.pem
ssl_options.keyfile    = /etc/rabbitmq/ssl/server_key.pem
ssl_options.verify     = verify_peer
ssl_options.fail_if_no_peer_cert = true

Virtual Hosts

# vhost erstellen
rabbitmqctl add_vhost production
rabbitmqctl add_vhost staging

# Berechtigungen setzen
rabbitmqctl set_permissions -p production app_user ".*" ".*" ".*"

Firewall

ufw allow 5672/tcp   # AMQP
ufw allow 15672/tcp  # Management (nur intern!)
ufw allow 25672/tcp  # Cluster

Backup

Definitionen exportieren

# Über API
curl -u admin:password \
  http://localhost:15672/api/definitions > rabbitmq-backup.json

# Wiederherstellen
curl -u admin:password \
  -H "Content-Type: application/json" \
  -X POST \
  -d @rabbitmq-backup.json \
  http://localhost:15672/api/definitions

Datenverzeichnis

# Stoppen
systemctl stop rabbitmq-server

# Backup
tar -czf rabbitmq-data.tar.gz /var/lib/rabbitmq/

# Starten
systemctl start rabbitmq-server

Troubleshooting

Service startet nicht

# Logs prüfen
journalctl -u rabbitmq-server
cat /var/log/rabbitmq/rabbit*.log

# Erlang-Version prüfen
erl -eval 'erlang:display(erlang:system_info(otp_release)), halt().'

Queue wächst

# Consumer prüfen
rabbitmqctl list_queues name messages consumers

# Queue leeren
rabbitmqctl purge_queue queue_name

Memory Alarm

# Speicher prüfen
rabbitmqctl status | grep memory

# Limit erhöhen
# rabbitmq.conf
vm_memory_high_watermark.relative = 0.6

Zusammenfassung

| Befehl | Funktion | |--------|----------| | rabbitmqctl status | Status anzeigen | | rabbitmqctl list_queues | Queues auflisten | | rabbitmqctl add_user | Benutzer erstellen | | rabbitmqctl set_permissions | Rechte setzen | | rabbitmqctl purge_queue | Queue leeren | | rabbitmq-plugins enable | Plugin aktivieren |

| Exchange-Typ | Verwendung | |--------------|------------| | direct | Gezieltes Routing | | fanout | Broadcast | | topic | Pattern-Matching | | headers | Header-basiert |

Fazit

RabbitMQ ist ein zuverlässiger Message Broker für asynchrone Kommunikation. Die verschiedenen Exchange-Typen ermöglichen flexible Routing-Szenarien. Für Produktivumgebungen empfiehlt sich ein Cluster-Setup mit gespiegelten Queues. Das Management-Plugin bietet eine komfortable Web-Oberfläche für Monitoring und Administration.