Python ist ideal für Systemadministration: lesbar, mächtig und mit einer riesigen Standardbibliothek ausgestattet. Dieser Artikel zeigt praktische Anwendungen für den Admin-Alltag.
Warum Python für Sysadmin?
Vorteile:
- Lesbare Syntax
- Umfangreiche Standardbibliothek
- Plattformübergreifend
- Viele System-Module
- Gute Fehlerbehandlung
- Remote-Automation (Paramiko, Fabric)Grundlegende Systemoperationen
Betriebssystem-Informationen
#!/usr/bin/env python3
import os
import platform
import psutil
# System-Informationen
print(f"Hostname: {platform.node()}")
print(f"System: {platform.system()} {platform.release()}")
print(f"Architektur: {platform.machine()}")
print(f"Python: {platform.python_version()}")
# Ressourcen
print(f"CPUs: {os.cpu_count()}")
print(f"RAM: {psutil.virtual_memory().total / (1024**3):.1f} GB")
print(f"Disk: {psutil.disk_usage('/').total / (1024**3):.1f} GB")Umgebungsvariablen
import os
# Lesen
home = os.environ.get('HOME')
path = os.environ.get('PATH')
# Setzen
os.environ['MY_VAR'] = 'value'
# Alle ausgeben
for key, value in os.environ.items():
print(f"{key}={value}")Prozesse ausführen
import subprocess
# Einfacher Befehl
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
# Mit Shell
result = subprocess.run('echo $HOME', shell=True, capture_output=True, text=True)
# Fehlerbehandlung
try:
result = subprocess.run(['command'], check=True, capture_output=True)
except subprocess.CalledProcessError as e:
print(f"Fehler: {e.returncode}")
# Timeout
result = subprocess.run(['sleep', '10'], timeout=5)Dateioperationen
Dateien und Verzeichnisse
import os
import shutil
from pathlib import Path
# Pathlib (modern)
path = Path('/var/log')
# Verzeichnis erstellen
Path('/tmp/mydir').mkdir(parents=True, exist_ok=True)
# Dateien auflisten
for file in path.glob('*.log'):
print(f"{file.name}: {file.stat().st_size} bytes")
# Rekursiv
for file in path.rglob('*.log'):
print(file)
# Datei kopieren
shutil.copy('/etc/passwd', '/tmp/passwd.bak')
# Verzeichnis kopieren
shutil.copytree('/etc/nginx', '/tmp/nginx-backup')
# Löschen
os.remove('/tmp/file.txt')
shutil.rmtree('/tmp/mydir')Dateien lesen und schreiben
from pathlib import Path
# Lesen
content = Path('/etc/hostname').read_text()
# Schreiben
Path('/tmp/test.txt').write_text('Hello World')
# Mit Encoding
content = Path('/etc/passwd').read_text(encoding='utf-8')
# Zeile für Zeile
with open('/var/log/syslog') as f:
for line in f:
if 'error' in line.lower():
print(line.strip())
# Große Dateien
def tail(filename, lines=10):
with open(filename) as f:
return list(f)[-lines:]Konfigurationsdateien
import configparser
# INI-Dateien lesen
config = configparser.ConfigParser()
config.read('/etc/myapp.conf')
value = config.get('section', 'key')
# Mit Default
value = config.get('section', 'key', fallback='default')
# Schreiben
config['newsection'] = {'key': 'value'}
with open('/etc/myapp.conf', 'w') as f:
config.write(f)JSON und YAML
import json
import yaml # pip install pyyaml
# JSON lesen
with open('config.json') as f:
data = json.load(f)
# JSON schreiben
with open('output.json', 'w') as f:
json.dump(data, f, indent=2)
# YAML lesen
with open('config.yaml') as f:
data = yaml.safe_load(f)
# YAML schreiben
with open('output.yaml', 'w') as f:
yaml.dump(data, f, default_flow_style=False)Systemüberwachung
CPU und Memory
#!/usr/bin/env python3
import psutil
import time
def monitor_system(interval=5):
while True:
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory()
disk = psutil.disk_usage('/')
print(f"CPU: {cpu}%")
print(f"RAM: {mem.percent}% ({mem.used / (1024**3):.1f} GB)")
print(f"Disk: {disk.percent}%")
print("-" * 40)
time.sleep(interval)
if __name__ == '__main__':
monitor_system()Prozesse überwachen
import psutil
# Alle Prozesse
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
print(proc.info)
# Top 10 nach CPU
processes = sorted(psutil.process_iter(['name', 'cpu_percent']),
key=lambda p: p.info['cpu_percent'],
reverse=True)[:10]
for p in processes:
print(f"{p.info['name']}: {p.info['cpu_percent']}%")
# Bestimmten Prozess finden
for proc in psutil.process_iter(['name']):
if proc.info['name'] == 'nginx':
print(f"Nginx PID: {proc.pid}")Festplattennutzung
#!/usr/bin/env python3
import psutil
import shutil
# Alle Partitionen
for partition in psutil.disk_partitions():
try:
usage = psutil.disk_usage(partition.mountpoint)
print(f"{partition.mountpoint}:")
print(f" Total: {usage.total / (1024**3):.1f} GB")
print(f" Used: {usage.used / (1024**3):.1f} GB ({usage.percent}%)")
print(f" Free: {usage.free / (1024**3):.1f} GB")
except PermissionError:
continue
# Verzeichnisgröße
def get_dir_size(path):
total = 0
for entry in os.scandir(path):
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += get_dir_size(entry.path)
return totalLog-Analyse
#!/usr/bin/env python3
import re
from collections import Counter
from datetime import datetime
def analyze_apache_log(logfile):
ip_counter = Counter()
status_counter = Counter()
# Apache Log Format
pattern = r'^(\d+\.\d+\.\d+\.\d+).*\[(.+?)\].*" (\d{3})'
with open(logfile) as f:
for line in f:
match = re.match(pattern, line)
if match:
ip = match.group(1)
status = match.group(3)
ip_counter[ip] += 1
status_counter[status] += 1
print("Top 10 IPs:")
for ip, count in ip_counter.most_common(10):
print(f" {ip}: {count}")
print("\nStatus Codes:")
for status, count in status_counter.most_common():
print(f" {status}: {count}")
if __name__ == '__main__':
analyze_apache_log('/var/log/apache2/access.log')Netzwerk
Netzwerk-Informationen
import psutil
import socket
# Hostname
print(f"Hostname: {socket.gethostname()}")
# IP-Adressen
for interface, addresses in psutil.net_if_addrs().items():
for addr in addresses:
if addr.family == socket.AF_INET:
print(f"{interface}: {addr.address}")
# Netzwerk-Traffic
io = psutil.net_io_counters()
print(f"Bytes sent: {io.bytes_sent / (1024**2):.1f} MB")
print(f"Bytes recv: {io.bytes_recv / (1024**2):.1f} MB")
# Verbindungen
for conn in psutil.net_connections():
if conn.status == 'ESTABLISHED':
print(f"{conn.laddr} -> {conn.raddr}")Port-Check
import socket
def check_port(host, port, timeout=3):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
return True
except (socket.timeout, socket.error):
return False
finally:
sock.close()
# Mehrere Ports prüfen
services = {
'SSH': 22,
'HTTP': 80,
'HTTPS': 443,
'MySQL': 3306
}
for name, port in services.items():
status = "UP" if check_port('localhost', port) else "DOWN"
print(f"{name} ({port}): {status}")HTTP-Requests
import requests # pip install requests
# GET
response = requests.get('https://api.example.com/data')
print(response.status_code)
print(response.json())
# POST
data = {'key': 'value'}
response = requests.post('https://api.example.com/submit', json=data)
# Mit Authentifizierung
response = requests.get('https://api.example.com/secure',
auth=('user', 'password'))
# Timeout
response = requests.get('https://example.com', timeout=5)
# SSL-Prüfung deaktivieren (nicht empfohlen)
response = requests.get('https://self-signed.example.com', verify=False)SSH und Remote-Ausführung
Mit Paramiko
import paramiko # pip install paramiko
def ssh_command(hostname, username, password, command):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname, username=username, password=password)
stdin, stdout, stderr = client.exec_command(command)
return stdout.read().decode()
finally:
client.close()
# Mit SSH-Key
def ssh_with_key(hostname, username, keyfile, command):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
key = paramiko.RSAKey.from_private_key_file(keyfile)
client.connect(hostname, username=username, pkey=key)
stdin, stdout, stderr = client.exec_command(command)
return stdout.read().decode()Dateien per SFTP
import paramiko
def sftp_download(hostname, username, password, remote, local):
transport = paramiko.Transport((hostname, 22))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.get(remote, local)
sftp.close()
transport.close()
def sftp_upload(hostname, username, password, local, remote):
transport = paramiko.Transport((hostname, 22))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(local, remote)
sftp.close()
transport.close()Backup-Skript
#!/usr/bin/env python3
"""
Backup-Skript mit Rotation
"""
import os
import shutil
import tarfile
from datetime import datetime
from pathlib import Path
CONFIG = {
'source_dirs': ['/var/www', '/etc'],
'backup_dir': '/backup',
'keep_days': 7,
}
def create_backup():
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"backup_{timestamp}.tar.gz"
backup_path = Path(CONFIG['backup_dir']) / backup_name
# Backup-Verzeichnis erstellen
backup_path.parent.mkdir(parents=True, exist_ok=True)
# Tar-Archiv erstellen
with tarfile.open(backup_path, 'w:gz') as tar:
for source in CONFIG['source_dirs']:
if os.path.exists(source):
print(f"Sichere {source}...")
tar.add(source, arcname=os.path.basename(source))
print(f"Backup erstellt: {backup_path}")
print(f"Größe: {backup_path.stat().st_size / (1024**2):.1f} MB")
return backup_path
def cleanup_old_backups():
backup_dir = Path(CONFIG['backup_dir'])
cutoff = datetime.now().timestamp() - (CONFIG['keep_days'] * 86400)
for backup_file in backup_dir.glob('backup_*.tar.gz'):
if backup_file.stat().st_mtime < cutoff:
print(f"Lösche altes Backup: {backup_file}")
backup_file.unlink()
if __name__ == '__main__':
create_backup()
cleanup_old_backups()Service-Überwachung
#!/usr/bin/env python3
"""
Service-Monitor mit E-Mail-Benachrichtigung
"""
import subprocess
import smtplib
from email.mime.text import MIMEText
import time
SERVICES = ['nginx', 'mysql', 'redis']
EMAIL_CONFIG = {
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'username': 'alert@example.com',
'password': 'password',
'recipient': 'admin@example.com',
}
def check_service(name):
result = subprocess.run(
['systemctl', 'is-active', name],
capture_output=True,
text=True
)
return result.stdout.strip() == 'active'
def send_alert(service, status):
msg = MIMEText(f"Service {service} is {status}")
msg['Subject'] = f"Alert: {service} {status}"
msg['From'] = EMAIL_CONFIG['username']
msg['To'] = EMAIL_CONFIG['recipient']
with smtplib.SMTP(EMAIL_CONFIG['smtp_server'], EMAIL_CONFIG['smtp_port']) as server:
server.starttls()
server.login(EMAIL_CONFIG['username'], EMAIL_CONFIG['password'])
server.send_message(msg)
def monitor():
last_status = {s: True for s in SERVICES}
while True:
for service in SERVICES:
is_active = check_service(service)
if is_active != last_status[service]:
status = "UP" if is_active else "DOWN"
print(f"{service}: {status}")
send_alert(service, status)
last_status[service] = is_active
time.sleep(60)
if __name__ == '__main__':
monitor()Nützliche Module
| Modul | Funktion | |-------|----------| | os, shutil | Dateioperationen | | pathlib | Moderne Pfad-Operationen | | subprocess | Befehle ausführen | | psutil | System-Monitoring | | paramiko | SSH/SFTP | | requests | HTTP-Requests | | logging | Logging | | argparse | Kommandozeilen-Argumente | | schedule | Task-Scheduling |
Fazit
Python ist perfekt für Systemadministration. Die Standardbibliothek deckt die meisten Anforderungen ab, und mit Paketen wie psutil und paramiko erweitern Sie die Möglichkeiten erheblich. Beginnen Sie mit kleinen Skripten und bauen Sie Ihre Toolbox schrittweise aus. Python-Skripte sind wartbar und können leicht erweitert werden.