ETL/ELT avec pandas

Pour l'utilisation avec les tableaux de bord SKOOR, les données peuvent être lues à partir de différentes sources comme des bases de données, des API REST ou des fichiers et chargées dans une base de données locale. Si nécessaire, elles sont transformées avant ou après le chargement. Ce traitement, appelé ETL (extraction, transformation, chargement) ou ELT (extraction, chargement, transformation), est effectué par des convertisseurs dans la solution SKOOR. Les convertisseurs SKOOR sont généralement créés à l'aide de Talend ou de Pandas. En suivant une structure de répertoire spécifique, ils peuvent être construits et zippés pour être téléchargés dans la section d'importation de données. Ce guide couvre une configuration de base pour un convertisseur pandas.

Veuillez visiter le site web de documentation de pandas pour plus de détails sur le produit, la référence des commandes, etc.

Conditions préalables

Les modules Python suivants sont généralement utilisés par les convertisseurs :

  • pandas (le logiciel pandas)

  • pyarrow (une bibliothèque utile)

  • openpyxl (bibliothèque pour lire/écrire des fichiers Microsoft Excel)

  • sqlalchemy (bibliothèque permettant d'accéder à diverses bases de données)

  • psycopg2-binary (adaptateur de base de données PostgreSQL)

Exemple de convertisseur

Configuration de base

En-tête de script Pandas :

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

Lire le fichier Excel et écrire dans la base de données

La partie suivante du script peut être utilisée comme point de départ pour le traitement d'un fichier Excel et peut être ajoutée sous l'en-tête du script ci-dessus :

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

Insérer ou mettre à jour des données dans PostgreSQL.

La fonction pandas to_sql peut être utilisée pour insérer ou remplacer des données. Si les données doivent être mises à jour, la bibliothèque SQLAlchemy peut être utilisée. L'extrait de code suivant montre comment cela peut être réalisé :

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

Créer un convertisseur

Dans chaque répertoire de convertisseur, le service ETL SKOOR recherche un script appelé <nom du répertoire>_run.sh qui sera exécuté lors de l'exécution du convertisseur. Créez un répertoire appelé <nom-de-répertoire> avec ce script à l'intérieur et le script Python contenant la logique de traitement :

$ ls -1 customers/
customers.py
customers_run.sh

Exemple de script clients_run.sh :

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

Il faut maintenant créer un fichier zip contenant le répertoire du convertisseur avec les scripts. Ce fichier zip peut être chargé dans la section Data load des tableaux de bord SKOOR.

Créer un job SKOOR pour automatiser un convertisseur

Si nécessaire, les convertisseurs peuvent être exécutés automatiquement par un job. Configurez un job execute pour exécuter le script *_run.sh ci-dessus dans la structure de répertoire de SKOOR etl :

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

Exemple :

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh