ETL/ELT avec pandas

Pour une utilisation avec les SKOOR Dashboard, les données peuvent être lues à partir de diverses sources telles que des bases de données, des API REST ou des fichiers, puis chargées dans une base de données locale. Si nécessaire, elles sont transformées avant ou après leur chargement. Ce traitement, appelé ETL (extract, transform, load) ou ELT (extract, load, transform), est effectué par des convertisseurs dans la solution SKOOR. Les convertisseurs SKOOR sont généralement créés à l'aide de Talend ou Pandas. Suivant une structure de répertoires spécifique, ils peuvent être créés et compressés pour être téléchargés dans la section Importation de données. Ce guide couvre la configuration de base d'un convertisseur Pandas.

Veuillez consulter le site web de documentation Pandas pour plus de détails sur le produit, la référence des commandes, etc.

Prérequis

Les modules Python suivants sont généralement utilisés par les convertisseurs :

  • pandas (le logiciel pandas)

  • pyarrow (une bibliothèque utile)

  • openpyxl (bibliothèque permettant de lire/écrire des fichiers Microsoft Excel)

  • sqlalchemy (bibliothèque permettant d'accéder à diverses bases de données)

  • psycopg2-binary (adaptateur de base de données PostgreSQL)

Exemple de convertisseur

Configuration de base

En-tête du script Pandas :

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

Lire le fichier Excel et écrire dans la base de données

La partie suivante du script peut être utilisée comme point de départ pour le traitement d'un fichier Excel et peut être ajoutée sous l'en-tête du script ci-dessus :

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

Insérer ou mettre à jour des données dans PostgreSQL

La fonction pandas to_sql peut être utilisée pour insérer ou remplacer des données. Si les données doivent être mises à jour, la bibliothèque SQLAlchemy peut être utilisée. L'extrait de code suivant montre comment cela peut être réalisé :

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

Créer un convertisseur

Dans chaque répertoire de convertisseur, le service SKOOR ETL recherche un script appelé <nom-du-répertoire>_run.sh qui sera exécuté lorsque le convertisseur sera lancé. Créez un répertoire appelé <nom-du-répertoire> contenant ce script et le script Python contenant la logique de traitement :

$ ls -1 customers/
customers.py
customers_run.sh

Exemple de script customers_run.sh :

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

Vous devez maintenant créer un fichier zip contenant le répertoire du convertisseur avec les scripts. Ce fichier zip peut être chargé dans la section Chargement des données des SKOOR Dashboard.

Créer une tâche SKOOR pour automatiser un convertisseur

Si nécessaire, les convertisseurs peuvent être exécutés automatiquement par une tâche. Configurez une tâche d'exécution pour exécuter le script *_run.sh ci-dessus dans la structure de répertoires de SKOOR etl :

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

Exemple :

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh