Datenbank-Änderungen mit PostgreSQL verfolgen
Autor: Ronny Zimmermann
Einführung
In diesem Artikel zeige ich, wie man Änderungen an einer PostgreSQL-Datenbank verfolgen kann. Dazu nutzen wir ein Python-Skript, das Änderungen zwischen zwei Snapshots einer Datenbank analysiert, die betroffenen Tabellen ausgibt und die Änderungen in einer Log-Datei speichert. Zusätzlich werden SQL-Befehle generiert, die die Änderungen nachvollziehbar machen.
Installation der benötigten Pakete
Bevor wir das Skript ausführen können, müssen einige Pakete installiert werden. Diese Pakete beinhalten sowohl systemweite Abhängigkeiten als auch Python-Module.
1. Systempakete über apt
installieren
sudo apt update
sudo apt install python3-psycopg2 python3-sqlparse libpq-dev python3-dev
2. Python-Pakete mit pip
installieren
Falls du eine virtuelle Umgebung nutzt, aktiviere sie zuerst:
python3 -m venv venv
source venv/bin/activate
Dann installiere die notwendigen Pakete:
pip install psycopg2 sqlparse
Konfiguration der PostgreSQL-Datenbank
Für die Analyse der Änderungen müssen wir pg_stat_statements
aktivieren. Füge in der Datei postgresql.conf
folgende Zeile hinzu:
shared_preload_libraries = 'pg_stat_statements'
Starte anschließend den PostgreSQL-Dienst neu:
sudo systemctl restart postgresql
Erstelle dann die Erweiterung in der Datenbank:
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
Python-Skript zur Analyse von Änderungen
Nachfolgend findest du das Python-Skript, das Änderungen zwischen zwei Zeitpunkten vergleicht und die betroffenen Tabellen sowie die entsprechenden SQL-Befehle protokolliert.
import psycopg2
import difflib
import time
import sqlparse
import os
import json
LOG_FILE = "db_changes.log"
CONFIG_FILE = "db_config.json"
def load_db_config():
"""Lädt die Zugangsdaten aus einer Konfigurationsdatei."""
with open(CONFIG_FILE, "r") as file:
return json.load(file)
def check_pg_stat_statements(conn):
"""Überprüft, ob pg_stat_statements in shared_preload_libraries aktiviert ist."""
cursor = conn.cursor()
cursor.execute("SHOW shared_preload_libraries;")
result = cursor.fetchone()
if result and "pg_stat_statements" not in result[0]:
print("\nWARNUNG: pg_stat_statements ist nicht in shared_preload_libraries aktiviert.")
print("Bitte fügen Sie folgende Zeile zu Ihrer postgresql.conf hinzu und starten Sie PostgreSQL neu:")
print("\n shared_preload_libraries = 'pg_stat_statements'\n")
print("Anschließend können Sie das Skript erneut ausführen.")
return False
return True
def enable_pg_stat_statements(conn):
"""Aktiviert pg_stat_statements falls es noch nicht aktiv ist."""
cursor = conn.cursor()
cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_stat_statements;")
conn.commit()
def disable_pg_stat_statements(conn):
"""Deaktiviert pg_stat_statements nach der Skriptausführung."""
cursor = conn.cursor()
cursor.execute("DROP EXTENSION IF EXISTS pg_stat_statements;")
conn.commit()
def get_recent_sql_queries(conn):
"""Holt die letzten ausgeführten INSERT, UPDATE oder DELETE SQL-Statements."""
cursor = conn.cursor()
cursor.execute("""
SELECT query FROM pg_stat_statements
WHERE query LIKE 'INSERT%'
OR query LIKE 'UPDATE%'
OR query LIKE 'DELETE%'
ORDER BY calls DESC LIMIT 10;
""")
return [sqlparse.format(q[0], reindent=True) for q in cursor.fetchall()]
def get_db_snapshot(conn):
"""Erstellt einen Snapshot der aktuellen Datenbanktabellen."""
snapshot = {}
cursor = conn.cursor()
cursor.execute("SELECT tablename FROM pg_tables WHERE schemaname='public'")
tables = [table[0] for table in cursor.fetchall()]
for table in tables:
cursor.execute(f"SELECT * FROM {table} ORDER BY ctid")
snapshot[table] = cursor.fetchall()
return snapshot
def compare_snapshots(snapshot1, snapshot2):
"""Vergleicht zwei Snapshots und gibt die Unterschiede aus."""
diffs = {}
for table in set(snapshot1.keys()).union(snapshot2.keys()):
old_data = snapshot1.get(table, [])
new_data = snapshot2.get(table, [])
diff = list(difflib.unified_diff(
[str(row) for row in old_data],
[str(row) for row in new_data],
lineterm=""
))
if diff:
diffs[table] = diff
return diffs
def get_table_structure(conn, table):
"""Gibt die Spaltennamen und Datentypen einer Tabelle zurück."""
cursor = conn.cursor()
cursor.execute(f"""
SELECT column_name FROM information_schema.columns
WHERE table_name = '{table}' ORDER BY ordinal_position;
""")
return [row[0] for row in cursor.fetchall()]
def generate_sql_statements(diffs, conn):
"""Erstellt SQL-Befehle basierend auf den festgestellten Änderungen."""
sql_statements = []
cursor = conn.cursor()
for table, diff in diffs.items():
columns = get_table_structure(conn, table)
for change in diff:
if change.startswith("+"):
values = change[1:].strip("() ").split(", ")
sql = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({', '.join(values)});"
elif change.startswith("-"):
values = change[1:].strip("() ").split(", ")
conditions = " AND ".join([f"{col}={val}" for col, val in zip(columns, values)])
sql = f"DELETE FROM {table} WHERE {conditions};"
else:
continue
sql_statements.append(sql)
return sql_statements
def log_changes(diffs, sql_queries, generated_sql):
"""Speichert die Änderungen und SQL-Befehle in einer Log-Datei."""
with open(LOG_FILE, "a") as log:
log.write("\n===== Änderungen erkannt =====\n")
for table in diffs.keys():
log.write(f"\n-- Änderungen in Tabelle: {table}\n")
print(f"Tabelle geändert: {table}")
log.write("\n===== SQL-Befehle aus pg_stat_statements =====\n")
log.writelines("\n".join(sql_queries) + "\n")
log.write("\n===== Generierte SQL-Befehle =====\n")
log.writelines("\n".join(generated_sql) + "\n")
print("Änderungen und SQL-Befehle wurden in die Log-Datei geschrieben.")
def main():
config = load_db_config()
conn = psycopg2.connect(dbname=config["dbname"], user=config["user"], password=config["password"], host=config["host"])
conn.autocommit = True # Verhindert Sperren in der DB
if not check_pg_stat_statements(conn):
conn.close()
return
enable_pg_stat_statements(conn)
print("Drücken Sie 'q' + Enter, um das Programm zu beenden.")
while True:
print("Erster Snapshot wird erstellt...")
snapshot1 = get_db_snapshot(conn)
input("Drücken Sie Enter, nachdem Sie Änderungen an der Datenbank vorgenommen haben...")
print("Zweiter Snapshot wird erstellt...")
snapshot2 = get_db_snapshot(conn)
print("Vergleiche Snapshots...")
diffs = compare_snapshots(snapshot1, snapshot2)
generated_sql = generate_sql_statements(diffs, conn)
print("Hole ausgeführte SQL-Befehle...")
sql_queries = get_recent_sql_queries(conn)
log_changes(diffs, sql_queries, generated_sql)
user_input = input("Drücken Sie 'q' + Enter, um das Programm zu beenden oder Enter zum Fortfahren: ")
if user_input.lower() == 'q':
break
disable_pg_stat_statements(conn)
conn.close()
print("Programm beendet.")
if __name__ == "__main__":
main()
Speichere das Skript unter db_diff.py
und erstelle eine Konfigurationsdatei db_config.json
mit folgendem Inhalt:
{
"dbname": "testfirma",
"user": "postgres",
"password": "dein_passwort",
"host": "localhost"
}
Nutzung des Skripts
Starte das Skript mit:
python db_diff.py testfirma
Das Skript wird:
- Einen Snapshot der aktuellen Datenbank speichern.
- Warten, bis der Nutzer Änderungen in der Datenbank vorgenommen hat.
- Einen zweiten Snapshot speichern und die Unterschiede ermitteln.
- Die betroffenen Tabellen und generierte SQL-Befehle in der Konsole und einer Log-Datei ausgeben.
Conclusio
Mit diesem Skript lassen sich Änderungen an einer PostgreSQL-Datenbank einfach nachverfolgen. Es ist besonders nützlich für Debugging-Zwecke und das Erstellen von Änderungsprotokollen.