ETL/ELT con panda

Per l'utilizzo con i dashboard SKOOR, i dati possono essere letti da varie fonti come database, API REST o file e caricati in un database locale. Se necessario, vengono trasformati prima o dopo il caricamento. Questa elaborazione, chiamata ETL (extract, transform, load) o ELT (extract, load, transform), viene effettuata dai cosiddetti convertitori della soluzione SKOOR. I convertitori SKOOR sono creati tipicamente con Talend o Pandas. Seguendo una specifica struttura di directory, possono essere costruiti e zippati per essere caricati nella sezione Importazione dati. Questa guida illustra la configurazione di base di un convertitore pandas.

Per maggiori dettagli sul prodotto, sui comandi di riferimento e così via, consultare il sito web della documentazione di pandas.

Prerequisiti

I seguenti moduli Python sono tipicamente utilizzati dai convertitori:

  • pandas (il software pandas)

  • pyarrow (una libreria utile)

  • openpyxl (libreria per leggere/scrivere file Microsoft Excel)

  • sqlalchemy (libreria per accedere a vari database)

  • psycopg2-binary (adattatore per database PostgreSQL)

Esempio di convertitore

Configurazione di base

Intestazione dello script Pandas:

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

Leggere file Excel e scrivere su database

La seguente parte dello script può essere utilizzata come punto di partenza per l'elaborazione di un file Excel e può essere aggiunta sotto l'intestazione dello script:

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

Inserire o aggiornare dati in PostgreSQL

La funzione pandas to_sql può essere utilizzata per inserire o sostituire dati. Se i dati devono essere aggiornati, è possibile utilizzare la libreria SQLAlchemy. Il seguente frammento di codice mostra come si può ottenere questo risultato:

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

Creare un convertitore

In ogni directory del convertitore, il servizio SKOOR ETL cerca uno script chiamato <nome-directory>_run.sh che verrà eseguito quando il convertitore viene eseguito. Creare una directory chiamata <nome-directory> con all'interno questo script e lo script Python contenente la logica di elaborazione:

$ ls -1 customers/
customers.py
customers_run.sh

Script di esempio customers_run.sh:

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

A questo punto, è necessario creare un file zip contenente la directory del convertitore con gli script. Questo file zip può essere caricato nella sezione Carico dati delle dashboard di SKOOR.

Creare un lavoro SKOOR per automatizzare un convertitore

Se necessario, i convertitori possono essere eseguiti automaticamente da un job. Configurare un job execute per eseguire lo script *_run.sh di cui sopra nella struttura di directory di SKOOR etl:

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

Esempio:

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh