Logstash ist eine Datenverarbeitungs-Pipeline, die Daten aus verschiedenen Quellen sammelt, transformiert und an Ziele wie Elasticsearch weiterleitet.

Architektur

Pipeline-Konzept

Input → Filter → Output

Input:   Daten empfangen (Dateien, Beats, Syslog, ...)
Filter:  Daten parsen und transformieren
Output:  Daten weiterleiten (Elasticsearch, Dateien, ...)

Mehrere Pipelines

Pipeline 1: Syslog → Filter → Elasticsearch
Pipeline 2: Apache → Filter → Elasticsearch
Pipeline 3: Beats  → Filter → Elasticsearch

Installation

Debian/Ubuntu

# Repository (falls nicht vorhanden)
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | gpg --dearmor -o /usr/share/keyrings/elasticsearch.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | tee /etc/apt/sources.list.d/elastic-8.x.list

# Installieren
apt update
apt install logstash

Service starten

systemctl enable --now logstash

# Status
systemctl status logstash

Konfigurationsdateien

/etc/logstash/logstash.yml       # Hauptkonfiguration
/etc/logstash/jvm.options        # JVM-Einstellungen
/etc/logstash/pipelines.yml      # Pipeline-Definitionen
/etc/logstash/conf.d/            # Pipeline-Konfigurationen

Grundkonfiguration

logstash.yml

# /etc/logstash/logstash.yml

path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d
path.logs: /var/log/logstash

pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50

config.reload.automatic: true
config.reload.interval: 3s

JVM-Einstellungen

# /etc/logstash/jvm.options

-Xms1g
-Xmx1g

Pipeline-Syntax

Grundstruktur

# /etc/logstash/conf.d/example.conf

input {
  # Datenquellen
}

filter {
  # Verarbeitung
}

output {
  # Ziele
}

Bedingte Verarbeitung

filter {
  if [type] == "syslog" {
    # Syslog-spezifische Filter
  }

  if [status] >= 400 {
    mutate { add_tag => ["error"] }
  }

  if "error" in [tags] {
    # Fehlerbehandlung
  }
}

Inputs

File Input

input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    sincedb_path => "/var/lib/logstash/sincedb_nginx"
    type => "nginx"
  }
}

Beats Input

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
  }
}

Syslog Input

input {
  syslog {
    port => 514
    type => "syslog"
  }
}

# Oder TCP/UDP separat
input {
  tcp {
    port => 5000
    type => "syslog"
  }
  udp {
    port => 5000
    type => "syslog"
  }
}

HTTP Input

input {
  http {
    port => 8080
    type => "webhook"
  }
}

Kafka Input

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092"
    topics => ["logs"]
    group_id => "logstash"
    codec => "json"
  }
}

Filter

Grok (Pattern Matching)

filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}

# Custom Pattern
filter {
  grok {
    match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATH:request}" }
  }
}

# Mehrere Patterns
filter {
  grok {
    match => { "message" => [
      "%{COMBINEDAPACHELOG}",
      "%{COMMONAPACHELOG}"
    ]}
  }
}

Wichtige Grok-Patterns

| Pattern | Beschreibung | |---------|--------------| | %{IP} | IP-Adresse | | %{WORD} | Einzelnes Wort | | %{NUMBER} | Zahl | | %{DATA} | Beliebige Zeichen (nicht-greedy) | | %{GREEDYDATA} | Beliebige Zeichen (greedy) | | %{TIMESTAMP_ISO8601} | ISO-Zeitstempel | | %{COMBINEDAPACHELOG} | Apache Combined Log | | %{SYSLOGLINE} | Syslog-Zeile |

Grok Debugger

https://grokdebugger.com/
https://grokconstructor.appspot.com/

Date Filter

filter {
  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
  }
}

# Mehrere Formate
filter {
  date {
    match => [ "timestamp",
      "ISO8601",
      "yyyy-MM-dd HH:mm:ss",
      "dd/MMM/yyyy:HH:mm:ss Z"
    ]
  }
}

Mutate Filter

filter {
  mutate {
    # Feld umbenennen
    rename => { "old_field" => "new_field" }

    # Feld entfernen
    remove_field => [ "unwanted_field", "another_field" ]

    # Feld hinzufügen
    add_field => { "environment" => "production" }

    # Typ konvertieren
    convert => { "response_code" => "integer" }

    # Lowercase
    lowercase => [ "http_method" ]

    # String ersetzen
    gsub => [ "message", "\n", " " ]

    # Split
    split => { "tags" => "," }
  }
}

GeoIP Filter

filter {
  geoip {
    source => "client_ip"
    target => "geoip"
    database => "/usr/share/GeoIP/GeoLite2-City.mmdb"
  }
}

DNS Filter

filter {
  dns {
    reverse => [ "client_ip" ]
    action => "replace"
  }
}

JSON Filter

filter {
  json {
    source => "message"
    target => "parsed"
  }
}

# Root-Level parsen
filter {
  json {
    source => "message"
  }
}

KV Filter (Key-Value)

filter {
  kv {
    source => "message"
    field_split => "&"
    value_split => "="
  }
}

Drop Filter

filter {
  if [level] == "DEBUG" {
    drop { }
  }
}

Outputs

Elasticsearch Output

output {
  elasticsearch {
    hosts => ["https://localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
    user => "elastic"
    password => "password"
    ssl_certificate_verification => true
    cacert => "/etc/logstash/certs/ca.crt"
  }
}

Index-Routing

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

# Nach Typ
output {
  if [type] == "nginx" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "nginx-%{+YYYY.MM.dd}"
    }
  } else if [type] == "syslog" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "syslog-%{+YYYY.MM.dd}"
    }
  }
}

File Output

output {
  file {
    path => "/var/log/logstash/output-%{+YYYY-MM-dd}.log"
    codec => "json_lines"
  }
}

Stdout Output (Debugging)

output {
  stdout {
    codec => rubydebug
  }
}

Mehrere Outputs

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }

  if "error" in [tags] {
    email {
      to => "admin@example.com"
      subject => "Error in logs"
      body => "%{message}"
    }
  }
}

Praktische Beispiele

Nginx Access Logs

# /etc/logstash/conf.d/nginx.conf

input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    type => "nginx-access"
  }
}

filter {
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }

    date {
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    }

    geoip {
      source => "clientip"
    }

    mutate {
      convert => { "bytes" => "integer" }
      convert => { "response" => "integer" }
      remove_field => [ "message", "timestamp" ]
    }
  }
}

output {
  if [type] == "nginx-access" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "nginx-access-%{+YYYY.MM.dd}"
    }
  }
}

Syslog

# /etc/logstash/conf.d/syslog.conf

input {
  syslog {
    port => 514
    type => "syslog"
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
    }

    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }

    mutate {
      remove_field => [ "syslog_timestamp" ]
    }
  }
}

output {
  if [type] == "syslog" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "syslog-%{+YYYY.MM.dd}"
    }
  }
}

JSON Application Logs

# /etc/logstash/conf.d/app.conf

input {
  file {
    path => "/var/log/myapp/*.json"
    codec => "json"
    type => "application"
  }
}

filter {
  if [type] == "application" {
    date {
      match => [ "timestamp", "ISO8601" ]
    }

    if [level] == "ERROR" or [level] == "FATAL" {
      mutate { add_tag => ["alert"] }
    }
  }
}

output {
  if [type] == "application" {
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "app-logs-%{+YYYY.MM.dd}"
    }

    if "alert" in [tags] {
      email {
        to => "oncall@example.com"
        subject => "Application Error: %{[level]}"
        body => "%{message}"
      }
    }
  }
}

Mehrere Pipelines

pipelines.yml

# /etc/logstash/pipelines.yml

- pipeline.id: nginx
  path.config: "/etc/logstash/conf.d/nginx.conf"
  pipeline.workers: 2

- pipeline.id: syslog
  path.config: "/etc/logstash/conf.d/syslog.conf"
  pipeline.workers: 2

- pipeline.id: application
  path.config: "/etc/logstash/conf.d/app.conf"
  pipeline.workers: 4

Monitoring

API

# Node Info
curl -X GET "localhost:9600/_node"

# Pipeline Stats
curl -X GET "localhost:9600/_node/stats/pipelines"

# Hot Threads
curl -X GET "localhost:9600/_node/hot_threads"

Logging

# /etc/logstash/logstash.yml

log.level: info
# debug, info, warn, error

log.format: json
path.logs: /var/log/logstash

Troubleshooting

Konfiguration testen

# Syntax prüfen
/usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/

# Mit Debug-Output
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --config.reload.automatic

Häufige Probleme

# Grok-Pattern matcht nicht
# → Grok Debugger verwenden
# → Pattern schrittweise aufbauen

# Datum wird nicht geparst
# → Format genau prüfen
# → Timezone beachten

# Felder fehlen
# → Filter-Reihenfolge prüfen
# → Bedingungen prüfen

# Hohe CPU-Last
# → Grok-Anchoring verwenden (^...$)
# → Weniger greedy Patterns

Zusammenfassung

| Komponente | Funktion | |------------|----------| | Input | Daten empfangen | | Filter | Daten transformieren | | Output | Daten weiterleiten | | Codec | Encoding/Decoding |

| Filter | Verwendung | |--------|------------| | grok | Pattern Matching | | date | Datum parsen | | mutate | Felder ändern | | geoip | IP-Geolocation | | json | JSON parsen |

| Port | Dienst | |------|--------| | 5044 | Beats | | 514 | Syslog | | 9600 | API |

Fazit

Logstash ist ein mächtiges Werkzeug für Log-Verarbeitung. Die Plugin-Architektur ermöglicht Flexibilität bei Inputs, Filtern und Outputs. Grok ist das wichtigste Tool zum Parsen unstrukturierter Logs. Für einfache Anwendungsfälle können Beats mit Ingest-Pipelines eine leichtere Alternative sein. In komplexen Szenarien mit Transformationen ist Logstash jedoch unverzichtbar.