ETL/ELT con panda

Per l'utilizzo con i SKOOR Dashboard, i dati possono essere letti da varie fonti come database, API REST o file e caricati in un database locale. Se necessario, vengono trasformati prima o dopo il caricamento. Questa elaborazione, denominata ETL (extract, transform, load) o ELT (extract, load, transform), viene eseguita dai cosiddetti convertitori nella soluzione SKOOR. I convertitori SKOOR vengono in genere creati utilizzando Talend o Pandas. Seguendo una struttura di directory specifica, possono essere creati e compressi per essere caricati nella sezione Importazione dati. Questa guida illustra la configurazione di base di un convertitore pandas.

Per ulteriori dettagli sul prodotto, i comandi di riferimento ecc., visitare il sito web della documentazione Pandas.

Prerequisiti

I seguenti moduli Python sono tipicamente utilizzati dai convertitori:

  • pandas (il software pandas)

  • pyarrow (una libreria utile)

  • openpyxl (libreria per leggere/scrivere file Microsoft Excel)

  • sqlalchemy (libreria per accedere a vari database)

  • psycopg2-binary (adattatore per database PostgreSQL)

Convertitore di esempio

Configurazione di base

Intestazione dello script Pandas:

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

Leggi il file Excel e scrivi nel database

La parte seguente dello script può essere utilizzata come punto di partenza per l'elaborazione di un file Excel e può essere aggiunta sotto l'intestazione dello script sopra riportata:

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

Inserire o aggiornare dati in PostgreSQL

La funzione pandas to_sql può essere utilizzata per inserire o sostituire dati. Se i dati devono essere aggiornati, è possibile utilizzare la libreria SQLAlchemy. Il seguente frammento di codice mostra come ottenere questo risultato:

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

Creare un convertitore

In ogni directory del convertitore, il servizio SKOOR ETL cerca uno script chiamato <nome-directory>_run.sh che verrà eseguito quando viene avviato il convertitore. Creare una directory chiamata <nome-directory> con questo script al suo interno e lo script Python contenente la logica di elaborazione:

$ ls -1 customers/
customers.py
customers_run.sh

Esempio di script customers_run.sh:

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

Ora è necessario creare un file zip contenente la directory del convertitore con gli script. Questo file zip può essere caricato nella sezione Caricamento dati delle SKOOR Dashboard.

Creare un job SKOOR per automatizzare un convertitore

Se necessario, i convertitori possono essere eseguiti automaticamente da un job. Configurare un job di esecuzione per eseguire lo script *_run.sh sopra indicato nella struttura delle directory di SKOOR etl:

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

Esempio:

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh