ETL/ELT con panda
Per l'utilizzo con i dashboard SKOOR, i dati possono essere letti da varie fonti, come database, API REST o file, e caricati in un database locale. Se necessario, vengono trasformati prima o dopo il caricamento. Questa elaborazione, chiamata ETL (extract, transform, load) o ELT (extract, load, transform), viene effettuata dai cosiddetti convertitori della soluzione SKOOR. I convertitori SKOOR sono creati tipicamente con Talend o Pandas. Seguendo una specifica struttura di directory, possono essere costruiti e zippati per essere caricati nella sezione Importazione dati. Questa guida illustra la configurazione di base di un convertitore pandas.
Per maggiori informazioni sul prodotto, sui comandi di riferimento e così via, consultare il sito web della documentazione di pandas.
Prerequisiti
I seguenti moduli Python sono tipicamente utilizzati dai convertitori:
pandas (il software pandas)
pyarrow (una libreria utile)
openpyxl (libreria per leggere/scrivere file Microsoft Excel)
sqlalchemy (libreria per accedere a vari database)
psycopg2-binary (adattatore per database PostgreSQL)
Esempio di convertitore
Configurazione di base
Intestazione dello script Pandas:
import pandas as pd from sqlalchemy import create_engine import numpy as np import sys, getopt import datetime # Variables # ========================================================================================= # local PostgreSQL database to load data pgUser = '<db user>' pgPass = '<db password>' pgDatabase = '<database name>' pgHost = 'localhost' # Change this host if necessary pgPort = 5432 # Change this port if necessary # Functions # ========================================================================================= # process SKOOR ETL service args def getArg(argument): try: opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="]) for arg in args[1:]: splitArg = arg.split('=') if splitArg[0] == argument: return splitArg[1] except getopt.GetoptError: print(args[0], 'sourceFile=<source.file>') sys.exit(1)
Leggere file Excel e scrivere su database
La seguente parte dello script può essere utilizzata come punto di partenza per l'elaborazione di un file Excel e può essere aggiunta sotto l'intestazione dello script:
# Main script # ========================================================================================= source_file = getArg("sourceFile") # Check if a filename argument is available if isinstance(source_file, str): print("Processing file " + source_file) else: print("No input file defined!") exit(1) # Create SqlAlchemy engine engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase) # Read raw data from a sheet called "customer" customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer') # Transform data if required # ------------------------------------------------------------------------ <transformation code> print(customers.head()) # Write data into table "customers" of PostgreSQL database # ------------------------------------------------------------------------ print("Writing data to database...") customers.to_sql(name='customers', con=engine, if_exists='replace') print("Done")
Inserire o aggiornare dati in PostgreSQL
La funzione pandas to_sql può essere utilizzata per inserire o sostituire dati. Se i dati devono essere aggiornati, è possibile utilizzare la libreria SQLAlchemy. Il seguente frammento di codice mostra come si può ottenere questo risultato:
# Write DataFrame to temporary table on database customers.to_sql(name='customers_temp', con=engine, if_exists='replace') # Update target table using temporary table values update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address) SELECT customer_id, customer_name, customer_address FROM customers_temp t ON CONFLICT (customer_id) DO UPDATE SET customer_name = EXCLUDED.customer_name, customer_address = EXCLUDED.customer_address''' with engine.begin() as conn: conn.execute(text(update_sql))
Creare un convertitore
In ogni directory del convertitore, il servizio SKOOR ETL cerca uno script chiamato <nome-directory>_run.sh che verrà eseguito quando il convertitore viene eseguito. Creare una directory chiamata <nome-directory> con all'interno questo script e lo script Python contenente la logica di elaborazione:
$ ls -1 customers/ customers.py customers_run.sh
Script di esempio customers_run.sh:
#!/bin/bash cd $(dirname $0) home=$(pwd) /opt/eranger/python3-env/bin/python3 $home/customers.py $@
A questo punto, è necessario creare un file zip contenente la directory del convertitore con gli script. Questo file zip può essere caricato nella sezione Carico dati delle dashboard di SKOOR.
Creare un lavoro SKOOR per automatizzare un convertitore
Se necessario, i convertitori possono essere eseguiti automaticamente da un job. Configurare un job execute per eseguire lo script *_run.sh di cui sopra nella struttura di directory di SKOOR etl:
/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh
Esempio:
/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh