ETL/ELT mit Pandas
Für die Verwendung mit SKOOR Dashboards können Daten aus verschiedenen Quellen wie Datenbanken, REST-APIs oder Dateien gelesen und in eine lokale Datenbank geladen werden. Falls erforderlich, werden sie entweder vor oder nach dem Laden transformiert. Diese Verarbeitung, die als ETL (Extrahieren, Transformieren, Laden) oder ELT (Extrahieren, Laden, Transformieren) bezeichnet wird, wird von so genannten Konvertern in der SKOOR-Lösung durchgeführt. SKOOR-Konverter werden typischerweise mit Talend oder Pandas erstellt. Nach einer bestimmten Verzeichnisstruktur können sie erstellt und für den Upload in den Datenimportbereich gezippt werden. Dieser Leitfaden behandelt die Grundeinstellungen für einen Pandas-Konverter.
Bitte besuchen Sie die Website der Pandas-Dokumentation, um mehr über das Produkt, die Befehlsreferenz usw. zu erfahren.
Voraussetzungen
Die folgenden Python-Module werden typischerweise von Konvertern verwendet:
pandas (die Pandas-Software)
pyarrow (eine hilfreiche Bibliothek)
openpyxl (Bibliothek zum Lesen/Schreiben von Microsoft Excel-Dateien)
sqlalchemy (Bibliothek für den Zugriff auf verschiedene Datenbanken)
psycopg2-binary (Adapter für PostgreSQL-Datenbanken)
Beispiel Konverter
Grundlegende Einrichtung
Pandas Skript-Kopfzeile:
import pandas as pd from sqlalchemy import create_engine import numpy as np import sys, getopt import datetime # Variables # ========================================================================================= # local PostgreSQL database to load data pgUser = '<db user>' pgPass = '<db password>' pgDatabase = '<database name>' pgHost = 'localhost' # Change this host if necessary pgPort = 5432 # Change this port if necessary # Functions # ========================================================================================= # process SKOOR ETL service args def getArg(argument): try: opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="]) for arg in args[1:]: splitArg = arg.split('=') if splitArg[0] == argument: return splitArg[1] except getopt.GetoptError: print(args[0], 'sourceFile=<source.file>') sys.exit(1)
Excel-Datei lesen und in die Datenbank schreiben
Der folgende Teil des Skripts kann als Ausgangspunkt für die Verarbeitung einer Excel-Datei verwendet werden und kann unterhalb des obigen Skriptkopfes eingefügt werden:
# Main script # ========================================================================================= source_file = getArg("sourceFile") # Check if a filename argument is available if isinstance(source_file, str): print("Processing file " + source_file) else: print("No input file defined!") exit(1) # Create SqlAlchemy engine engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase) # Read raw data from a sheet called "customer" customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer') # Transform data if required # ------------------------------------------------------------------------ <transformation code> print(customers.head()) # Write data into table "customers" of PostgreSQL database # ------------------------------------------------------------------------ print("Writing data to database...") customers.to_sql(name='customers', con=engine, if_exists='replace') print("Done")
Daten in PostgreSQL einfügen oder aktualisieren
Die Funktion pandas to_sql kann verwendet werden, um Daten einzufügen oder zu ersetzen. Wenn Daten aktualisiert werden müssen, kann die SQLAlchemy-Bibliothek verwendet werden. Der folgende Codeschnipsel zeigt, wie dies erreicht werden kann:
# Write DataFrame to temporary table on database customers.to_sql(name='customers_temp', con=engine, if_exists='replace') # Update target table using temporary table values update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address) SELECT customer_id, customer_name, customer_address FROM customers_temp t ON CONFLICT (customer_id) DO UPDATE SET customer_name = EXCLUDED.customer_name, customer_address = EXCLUDED.customer_address''' with engine.begin() as conn: conn.execute(text(update_sql))
Erstellen eines Konverters
In jedem Konverterverzeichnis sucht der SKOOR ETL-Service nach einem Skript mit dem Namen <Verzeichnisname>_run.sh, das ausgeführt wird, wenn der Konverter gestartet wird. Erstellen Sie ein Verzeichnis namens <Verzeichnisname> mit diesem Skript und dem Python-Skript, das die Verarbeitungslogik enthält:
$ ls -1 customers/ customers.py customers_run.sh
Beispielskript customers_run.sh:
#!/bin/bash cd $(dirname $0) home=$(pwd) /opt/eranger/python3-env/bin/python3 $home/customers.py $@
Nun muss eine Zip-Datei erstellt werden, die das Konverterverzeichnis mit den Skripten enthält. Diese Zip-Datei kann im Abschnitt Daten laden des SKOOR Dashboards geladen werden.
Erstellen eines SKOOR-Jobs zur Automatisierung eines Konverters
Bei Bedarf können Konverter automatisch durch einen Job ausgeführt werden. Konfigurieren Sie einen Ausführungsjob, um das obige Skript *_run.sh in der Verzeichnisstruktur von SKOOR etl auszuführen:
/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh
Beispiel:
/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh