Logstash ist eine Datenverarbeitungs-Pipeline, die Daten aus verschiedenen Quellen sammelt, transformiert und an Ziele wie Elasticsearch weiterleitet.
Architektur
Pipeline-Konzept
Input → Filter → Output
Input: Daten empfangen (Dateien, Beats, Syslog, ...)
Filter: Daten parsen und transformieren
Output: Daten weiterleiten (Elasticsearch, Dateien, ...)Mehrere Pipelines
Pipeline 1: Syslog → Filter → Elasticsearch
Pipeline 2: Apache → Filter → Elasticsearch
Pipeline 3: Beats → Filter → ElasticsearchInstallation
Debian/Ubuntu
# Repository (falls nicht vorhanden)
wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | gpg --dearmor -o /usr/share/keyrings/elasticsearch.gpg
echo "deb [signed-by=/usr/share/keyrings/elasticsearch.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | tee /etc/apt/sources.list.d/elastic-8.x.list
# Installieren
apt update
apt install logstashService starten
systemctl enable --now logstash
# Status
systemctl status logstashKonfigurationsdateien
/etc/logstash/logstash.yml # Hauptkonfiguration
/etc/logstash/jvm.options # JVM-Einstellungen
/etc/logstash/pipelines.yml # Pipeline-Definitionen
/etc/logstash/conf.d/ # Pipeline-KonfigurationenGrundkonfiguration
logstash.yml
# /etc/logstash/logstash.yml
path.data: /var/lib/logstash
path.config: /etc/logstash/conf.d
path.logs: /var/log/logstash
pipeline.workers: 4
pipeline.batch.size: 125
pipeline.batch.delay: 50
config.reload.automatic: true
config.reload.interval: 3sJVM-Einstellungen
# /etc/logstash/jvm.options
-Xms1g
-Xmx1gPipeline-Syntax
Grundstruktur
# /etc/logstash/conf.d/example.conf
input {
# Datenquellen
}
filter {
# Verarbeitung
}
output {
# Ziele
}Bedingte Verarbeitung
filter {
if [type] == "syslog" {
# Syslog-spezifische Filter
}
if [status] >= 400 {
mutate { add_tag => ["error"] }
}
if "error" in [tags] {
# Fehlerbehandlung
}
}Inputs
File Input
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb_nginx"
type => "nginx"
}
}Beats Input
input {
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
}Syslog Input
input {
syslog {
port => 514
type => "syslog"
}
}
# Oder TCP/UDP separat
input {
tcp {
port => 5000
type => "syslog"
}
udp {
port => 5000
type => "syslog"
}
}HTTP Input
input {
http {
port => 8080
type => "webhook"
}
}Kafka Input
input {
kafka {
bootstrap_servers => "kafka1:9092,kafka2:9092"
topics => ["logs"]
group_id => "logstash"
codec => "json"
}
}Filter
Grok (Pattern Matching)
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
# Custom Pattern
filter {
grok {
match => { "message" => "%{IP:client_ip} %{WORD:method} %{URIPATH:request}" }
}
}
# Mehrere Patterns
filter {
grok {
match => { "message" => [
"%{COMBINEDAPACHELOG}",
"%{COMMONAPACHELOG}"
]}
}
}Wichtige Grok-Patterns
| Pattern | Beschreibung | |---------|--------------| | %{IP} | IP-Adresse | | %{WORD} | Einzelnes Wort | | %{NUMBER} | Zahl | | %{DATA} | Beliebige Zeichen (nicht-greedy) | | %{GREEDYDATA} | Beliebige Zeichen (greedy) | | %{TIMESTAMP_ISO8601} | ISO-Zeitstempel | | %{COMBINEDAPACHELOG} | Apache Combined Log | | %{SYSLOGLINE} | Syslog-Zeile |
Grok Debugger
https://grokdebugger.com/
https://grokconstructor.appspot.com/Date Filter
filter {
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
target => "@timestamp"
}
}
# Mehrere Formate
filter {
date {
match => [ "timestamp",
"ISO8601",
"yyyy-MM-dd HH:mm:ss",
"dd/MMM/yyyy:HH:mm:ss Z"
]
}
}Mutate Filter
filter {
mutate {
# Feld umbenennen
rename => { "old_field" => "new_field" }
# Feld entfernen
remove_field => [ "unwanted_field", "another_field" ]
# Feld hinzufügen
add_field => { "environment" => "production" }
# Typ konvertieren
convert => { "response_code" => "integer" }
# Lowercase
lowercase => [ "http_method" ]
# String ersetzen
gsub => [ "message", "\n", " " ]
# Split
split => { "tags" => "," }
}
}GeoIP Filter
filter {
geoip {
source => "client_ip"
target => "geoip"
database => "/usr/share/GeoIP/GeoLite2-City.mmdb"
}
}DNS Filter
filter {
dns {
reverse => [ "client_ip" ]
action => "replace"
}
}JSON Filter
filter {
json {
source => "message"
target => "parsed"
}
}
# Root-Level parsen
filter {
json {
source => "message"
}
}KV Filter (Key-Value)
filter {
kv {
source => "message"
field_split => "&"
value_split => "="
}
}Drop Filter
filter {
if [level] == "DEBUG" {
drop { }
}
}Outputs
Elasticsearch Output
output {
elasticsearch {
hosts => ["https://localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
user => "elastic"
password => "password"
ssl_certificate_verification => true
cacert => "/etc/logstash/certs/ca.crt"
}
}Index-Routing
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
# Nach Typ
output {
if [type] == "nginx" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-%{+YYYY.MM.dd}"
}
} else if [type] == "syslog" {
elasticsearch {
hosts => ["localhost:9200"]
index => "syslog-%{+YYYY.MM.dd}"
}
}
}File Output
output {
file {
path => "/var/log/logstash/output-%{+YYYY-MM-dd}.log"
codec => "json_lines"
}
}Stdout Output (Debugging)
output {
stdout {
codec => rubydebug
}
}Mehrere Outputs
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
if "error" in [tags] {
email {
to => "admin@example.com"
subject => "Error in logs"
body => "%{message}"
}
}
}Praktische Beispiele
Nginx Access Logs
# /etc/logstash/conf.d/nginx.conf
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
type => "nginx-access"
}
}
filter {
if [type] == "nginx-access" {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
geoip {
source => "clientip"
}
mutate {
convert => { "bytes" => "integer" }
convert => { "response" => "integer" }
remove_field => [ "message", "timestamp" ]
}
}
}
output {
if [type] == "nginx-access" {
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx-access-%{+YYYY.MM.dd}"
}
}
}Syslog
# /etc/logstash/conf.d/syslog.conf
input {
syslog {
port => 514
type => "syslog"
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
mutate {
remove_field => [ "syslog_timestamp" ]
}
}
}
output {
if [type] == "syslog" {
elasticsearch {
hosts => ["localhost:9200"]
index => "syslog-%{+YYYY.MM.dd}"
}
}
}JSON Application Logs
# /etc/logstash/conf.d/app.conf
input {
file {
path => "/var/log/myapp/*.json"
codec => "json"
type => "application"
}
}
filter {
if [type] == "application" {
date {
match => [ "timestamp", "ISO8601" ]
}
if [level] == "ERROR" or [level] == "FATAL" {
mutate { add_tag => ["alert"] }
}
}
}
output {
if [type] == "application" {
elasticsearch {
hosts => ["localhost:9200"]
index => "app-logs-%{+YYYY.MM.dd}"
}
if "alert" in [tags] {
email {
to => "oncall@example.com"
subject => "Application Error: %{[level]}"
body => "%{message}"
}
}
}
}Mehrere Pipelines
pipelines.yml
# /etc/logstash/pipelines.yml
- pipeline.id: nginx
path.config: "/etc/logstash/conf.d/nginx.conf"
pipeline.workers: 2
- pipeline.id: syslog
path.config: "/etc/logstash/conf.d/syslog.conf"
pipeline.workers: 2
- pipeline.id: application
path.config: "/etc/logstash/conf.d/app.conf"
pipeline.workers: 4Monitoring
API
# Node Info
curl -X GET "localhost:9600/_node"
# Pipeline Stats
curl -X GET "localhost:9600/_node/stats/pipelines"
# Hot Threads
curl -X GET "localhost:9600/_node/hot_threads"Logging
# /etc/logstash/logstash.yml
log.level: info
# debug, info, warn, error
log.format: json
path.logs: /var/log/logstashTroubleshooting
Konfiguration testen
# Syntax prüfen
/usr/share/logstash/bin/logstash --config.test_and_exit -f /etc/logstash/conf.d/
# Mit Debug-Output
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/test.conf --config.reload.automaticHäufige Probleme
# Grok-Pattern matcht nicht
# → Grok Debugger verwenden
# → Pattern schrittweise aufbauen
# Datum wird nicht geparst
# → Format genau prüfen
# → Timezone beachten
# Felder fehlen
# → Filter-Reihenfolge prüfen
# → Bedingungen prüfen
# Hohe CPU-Last
# → Grok-Anchoring verwenden (^...$)
# → Weniger greedy PatternsZusammenfassung
| Komponente | Funktion | |------------|----------| | Input | Daten empfangen | | Filter | Daten transformieren | | Output | Daten weiterleiten | | Codec | Encoding/Decoding |
| Filter | Verwendung | |--------|------------| | grok | Pattern Matching | | date | Datum parsen | | mutate | Felder ändern | | geoip | IP-Geolocation | | json | JSON parsen |
| Port | Dienst | |------|--------| | 5044 | Beats | | 514 | Syslog | | 9600 | API |
Fazit
Logstash ist ein mächtiges Werkzeug für Log-Verarbeitung. Die Plugin-Architektur ermöglicht Flexibilität bei Inputs, Filtern und Outputs. Grok ist das wichtigste Tool zum Parsen unstrukturierter Logs. Für einfache Anwendungsfälle können Beats mit Ingest-Pipelines eine leichtere Alternative sein. In komplexen Szenarien mit Transformationen ist Logstash jedoch unverzichtbar.