ETL/ELT mit Pandas

Für die Verwendung mit SKOOR-Dashboards können Daten aus verschiedenen Quellen wie Datenbanken, REST-APIs oder Dateien gelesen und in eine lokale Datenbank geladen werden. Bei Bedarf werden sie vor oder nach dem Laden transformiert. Diese Verarbeitung, die als ETL (Extract, Transform, Load) oder ELT (Extract, Load, Transform) bezeichnet wird, erfolgt durch sogenannte Konverter in der SKOOR-Lösung. SKOOR-Konverter werden in der Regel mit Talend oder Pandas erstellt. Entsprechend einer bestimmten Verzeichnisstruktur können sie erstellt und zum Hochladen in den Bereich „Datenimport“ komprimiert werden. Dieser Leitfaden behandelt die grundlegende Einrichtung eines Pandas-Konverters.

Weitere Informationen zum Produkt, zur Befehlsreferenz usw. finden Sie auf der Pandas-Dokumentationswebsite.

Voraussetzungen

Die folgenden Python-Module werden in der Regel von Konvertern verwendet:

  • pandas (die Pandas-Software)

  • pyarrow (eine hilfreiche Bibliothek)

  • openpyxl (Bibliothek zum Lesen/Schreiben von Microsoft Excel-Dateien)

  • sqlalchemy (Bibliothek für den Zugriff auf verschiedene Datenbanken)

  • psycopg2-binary (PostgreSQL-Datenbankadapter)

Beispielkonverter

Grundlegende Einrichtung

Pandas-Skript-Header:

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

Excel-Datei lesen und in Datenbank schreiben

Der folgende Teil des Skripts kann als Ausgangspunkt für die Verarbeitung einer Excel-Datei verwendet werden und kann unterhalb des oben genannten Skript-Headers hinzugefügt werden:

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

Daten in PostgreSQL einfügen oder aktualisieren

Die Pandas-Funktion „to_sql“ kann zum Einfügen oder Ersetzen von Daten verwendet werden. Wenn Daten aktualisiert werden müssen, kann die SQLAlchemy-Bibliothek verwendet werden. Der folgende Codeausschnitt zeigt, wie dies erreicht werden kann:

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

Konverter erstellen

In jedem Konverterverzeichnis sucht der SKOOR ETL-Dienst nach einem Skript namens <Verzeichnisname>_run.sh, das bei Ausführung des Konverters ausgeführt wird. Erstellen Sie ein Verzeichnis namens <Verzeichnisname> mit diesem Skript und dem Python-Skript, das die Verarbeitungslogik enthält:

$ ls -1 customers/
customers.py
customers_run.sh

Beispielskript customers_run.sh:

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

Nun muss eine ZIP-Datei erstellt werden, die das Konverterverzeichnis mit den Skripten enthält. Diese ZIP-Datei kann im Abschnitt „Daten laden” des SKOOR Dashboard geladen werden.

Erstellen Sie einen SKOOR-Job, um einen Konverter zu automatisieren

Bei Bedarf können Konverter automatisch durch einen Job ausgeführt werden. Konfigurieren Sie einen Ausführungsjob, um das oben genannte Skript *_run.sh in der Verzeichnisstruktur von SKOOR etl auszuführen:

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

Beispiel:

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh