RabbitMQ ist ein zuverlässiger Message Broker für asynchrone Kommunikation. Er ermöglicht lose Kopplung zwischen Services und garantierte Nachrichtenzustellung.

Installation

# Debian/Ubuntu
apt install rabbitmq-server

# RHEL/CentOS
dnf install rabbitmq-server

# Starten
systemctl enable rabbitmq-server
systemctl start rabbitmq-server

Docker

docker run -d \
    --name rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    rabbitmq:3-management

Management-Plugin

# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_management

# Web-UI: http://localhost:15672
# Default: guest/guest (nur localhost)

User-Verwaltung

# Admin-User erstellen
rabbitmqctl add_user admin secret123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# Anwendungs-User
rabbitmqctl add_user app_user password123
rabbitmqctl set_permissions -p / app_user "^app\." "^app\." "^app\."

# User auflisten
rabbitmqctl list_users

# Guest-Zugriff einschränken
rabbitmqctl delete_user guest

Virtual Hosts

# vhost erstellen
rabbitmqctl add_vhost production
rabbitmqctl add_vhost staging

# Berechtigungen
rabbitmqctl set_permissions -p production app_user ".*" ".*" ".*"

# vhosts auflisten
rabbitmqctl list_vhosts

Queues

CLI

# Queue erstellen (besser über Code)
rabbitmqctl declare queue name=tasks durable=true

# Queues auflisten
rabbitmqctl list_queues name messages consumers

# Queue löschen
rabbitmqctl delete_queue tasks

Queue-Typen

| Typ | Beschreibung | |-----|--------------| | Classic | Standard-Queue | | Quorum | Hochverfügbar (Raft) | | Stream | Log-ähnlich (Append-only) |

Exchanges

| Typ | Routing | |-----|---------| | direct | Exakter Routing Key | | fanout | An alle Queues | | topic | Pattern-basiert | | headers | Nach Header-Werten |

Exchange erstellen

rabbitmqctl declare exchange name=logs type=topic durable=true

Python-Beispiel

Producer

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Queue erstellen
channel.queue_declare(queue='tasks', durable=True)

# Nachricht senden
channel.basic_publish(
    exchange='',
    routing_key='tasks',
    body='Hello World!',
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent
    )
)

connection.close()

Consumer

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

channel.queue_declare(queue='tasks', durable=True)

def callback(ch, method, properties, body):
    print(f"Received: {body}")
    # Verarbeitung...
    ch.basic_ack(delivery_tag=method.delivery_tag)

# Fair Dispatch
channel.basic_qos(prefetch_count=1)

channel.basic_consume(
    queue='tasks',
    on_message_callback=callback
)

channel.start_consuming()

Pub/Sub mit Exchange

# Publisher
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='Log message')

# Subscriber
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

Topic Exchange

# Exchange
channel.exchange_declare(exchange='events', exchange_type='topic')

# Publisher
channel.basic_publish(
    exchange='events',
    routing_key='user.created',
    body='New user created'
)

# Consumer für user.*
channel.queue_bind(exchange='events', queue=queue_name, routing_key='user.*')

# Consumer für *.error
channel.queue_bind(exchange='events', queue=queue_name, routing_key='*.error')

Clustering

Cluster erstellen

# Auf Node 2 und 3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# Cluster-Status
rabbitmqctl cluster_status

Mirrored Queues (Classic)

rabbitmqctl set_policy ha-all ".*" \
    '{"ha-mode":"all"}' \
    --apply-to queues

Quorum Queues (empfohlen)

channel.queue_declare(
    queue='tasks',
    durable=True,
    arguments={'x-queue-type': 'quorum'}
)

Monitoring

# Übersicht
rabbitmqctl status

# Queues mit Nachrichten
rabbitmqctl list_queues name messages messages_ready messages_unacknowledged

# Verbindungen
rabbitmqctl list_connections user peer_host state

# Channels
rabbitmqctl list_channels connection consumer_count messages_unacknowledged

Prometheus

# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_prometheus

# Metriken: http://localhost:15692/metrics

Konfiguration

rabbitmq.conf

# /etc/rabbitmq/rabbitmq.conf

# Listener
listeners.tcp.default = 5672

# Management
management.tcp.port = 15672

# Speicher-Limits
vm_memory_high_watermark.relative = 0.6
disk_free_limit.absolute = 2GB

# Logging
log.file.level = info

Umgebungsvariablen

# /etc/rabbitmq/rabbitmq-env.conf

RABBITMQ_NODENAME=rabbit@hostname
RABBITMQ_NODE_PORT=5672
RABBITMQ_DIST_PORT=25672

TLS

# rabbitmq.conf

listeners.ssl.default = 5671

ssl_options.cacertfile = /etc/rabbitmq/ca.crt
ssl_options.certfile = /etc/rabbitmq/server.crt
ssl_options.keyfile = /etc/rabbitmq/server.key
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = false

Dead Letter

# Dead Letter Exchange
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='')

# Queue mit DLX
channel.queue_declare(
    queue='tasks',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-message-ttl': 60000  # 60 Sekunden
    }
)

Shovel (Replikation)

# Plugin aktivieren
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management

# Shovel per Policy
rabbitmqctl set_parameter shovel my-shovel \
    '{"src-uri": "amqp://localhost", "src-queue": "source", "dest-uri": "amqp://remote-host", "dest-queue": "destination"}'

Backup

# Definitions exportieren
rabbitmqctl export_definitions /backup/definitions.json

# Definitions importieren
rabbitmqctl import_definitions /backup/definitions.json

Zusammenfassung

| Befehl | Funktion | |--------|----------| | rabbitmqctl add_user | User erstellen | | rabbitmqctl list_queues | Queues anzeigen | | rabbitmqctl cluster_status | Cluster-Status | | rabbitmqctl set_policy | Policy setzen | | rabbitmq-plugins enable | Plugin aktivieren |

| Port | Dienst | |------|--------| | 5672 | AMQP | | 5671 | AMQPS (TLS) | | 15672 | Management UI | | 25672 | Cluster |

| Konzept | Beschreibung | |---------|--------------| | Queue | Nachrichten-Warteschlange | | Exchange | Routing-Komponente | | Binding | Verbindung Exchange-Queue | | vhost | Virtuelle Trennung |

Fazit

RabbitMQ ist zuverlässig für asynchrone Kommunikation. Quorum Queues bieten Hochverfügbarkeit. Exchanges ermöglichen flexibles Routing. Das Management-Plugin vereinfacht die Verwaltung. Für sehr hohe Durchsätze kann Kafka besser geeignet sein.