ETL/ELT mit Pandas

Für die Verwendung mit SKOOR Dashboards können Daten aus verschiedenen Quellen wie Datenbanken, REST-APIs oder Dateien gelesen und in eine lokale Datenbank geladen werden. Falls erforderlich, werden sie entweder vor oder nach dem Laden transformiert. Diese Verarbeitung, die als ETL (Extrahieren, Transformieren, Laden) oder ELT (Extrahieren, Laden, Transformieren) bezeichnet wird, wird von so genannten Konvertern in der SKOOR-Lösung durchgeführt. SKOOR-Konverter werden typischerweise mit Talend oder Pandas erstellt. Nach einer bestimmten Verzeichnisstruktur können sie erstellt und für den Upload in den Datenimportbereich gezippt werden. Dieser Leitfaden behandelt die Grundeinstellungen für einen Pandas-Konverter.

Bitte besuchen Sie die Website der Pandas-Dokumentation, um mehr über das Produkt, die Befehlsreferenz usw. zu erfahren.

Voraussetzungen

Die folgenden Python-Module werden typischerweise von Konvertern verwendet:

  • pandas (die Pandas-Software)

  • pyarrow (eine hilfreiche Bibliothek)

  • openpyxl (Bibliothek zum Lesen/Schreiben von Microsoft Excel-Dateien)

  • sqlalchemy (Bibliothek für den Zugriff auf verschiedene Datenbanken)

  • psycopg2-binary (Adapter für PostgreSQL-Datenbanken)

Beispiel Konverter

Grundlegende Einrichtung

Pandas Skript-Kopfzeile:

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

Excel-Datei lesen und in die Datenbank schreiben

Der folgende Teil des Skripts kann als Ausgangspunkt für die Verarbeitung einer Excel-Datei verwendet werden und kann unterhalb des obigen Skriptkopfes eingefügt werden:

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

Daten in PostgreSQL einfügen oder aktualisieren

Die Funktion pandas to_sql kann verwendet werden, um Daten einzufügen oder zu ersetzen. Wenn Daten aktualisiert werden müssen, kann die SQLAlchemy-Bibliothek verwendet werden. Der folgende Codeschnipsel zeigt, wie dies erreicht werden kann:

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

Erstellen eines Konverters

In jedem Konverterverzeichnis sucht der SKOOR ETL-Service nach einem Skript mit dem Namen <Verzeichnisname>_run.sh, das ausgeführt wird, wenn der Konverter gestartet wird. Erstellen Sie ein Verzeichnis namens <Verzeichnisname> mit diesem Skript und dem Python-Skript, das die Verarbeitungslogik enthält:

$ ls -1 customers/
customers.py
customers_run.sh

Beispielskript customers_run.sh:

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

Nun muss eine Zip-Datei erstellt werden, die das Konverterverzeichnis mit den Skripten enthält. Diese Zip-Datei kann im Abschnitt Daten laden des SKOOR Dashboards geladen werden.

Erstellen eines SKOOR-Jobs zur Automatisierung eines Konverters

Bei Bedarf können Konverter automatisch durch einen Job ausgeführt werden. Konfigurieren Sie einen Ausführungsjob, um das obige Skript *_run.sh in der Verzeichnisstruktur von SKOOR etl auszuführen:

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

Beispiel:

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh