ETL/ELT avec pandas
Pour l'utilisation avec les tableaux de bord SKOOR, les données peuvent être lues à partir de différentes sources comme des bases de données, des API REST ou des fichiers et chargées dans une base de données locale. Si nécessaire, elles sont transformées avant ou après le chargement. Ce traitement, appelé ETL (extraction, transformation, chargement) ou ELT (extraction, chargement, transformation), est effectué par des convertisseurs dans la solution SKOOR. Les convertisseurs SKOOR sont généralement créés à l'aide de Talend ou de Pandas. En suivant une structure de répertoire spécifique, ils peuvent être construits et zippés pour être téléchargés dans la section d'importation de données. Ce guide couvre une configuration de base pour un convertisseur pandas.
Veuillez visiter le site web de documentation de pandas pour plus de détails sur le produit, la référence des commandes, etc.
Conditions préalables
Les modules Python suivants sont généralement utilisés par les convertisseurs :
pandas (le logiciel pandas)
pyarrow (une bibliothèque utile)
openpyxl (bibliothèque pour lire/écrire des fichiers Microsoft Excel)
sqlalchemy (bibliothèque permettant d'accéder à diverses bases de données)
psycopg2-binary (adaptateur de base de données PostgreSQL)
Exemple de convertisseur
Configuration de base
En-tête de script Pandas :
import pandas as pd from sqlalchemy import create_engine import numpy as np import sys, getopt import datetime # Variables # ========================================================================================= # local PostgreSQL database to load data pgUser = '<db user>' pgPass = '<db password>' pgDatabase = '<database name>' pgHost = 'localhost' # Change this host if necessary pgPort = 5432 # Change this port if necessary # Functions # ========================================================================================= # process SKOOR ETL service args def getArg(argument): try: opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="]) for arg in args[1:]: splitArg = arg.split('=') if splitArg[0] == argument: return splitArg[1] except getopt.GetoptError: print(args[0], 'sourceFile=<source.file>') sys.exit(1)
Lire le fichier Excel et écrire dans la base de données
La partie suivante du script peut être utilisée comme point de départ pour le traitement d'un fichier Excel et peut être ajoutée sous l'en-tête du script ci-dessus :
# Main script # ========================================================================================= source_file = getArg("sourceFile") # Check if a filename argument is available if isinstance(source_file, str): print("Processing file " + source_file) else: print("No input file defined!") exit(1) # Create SqlAlchemy engine engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase) # Read raw data from a sheet called "customer" customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer') # Transform data if required # ------------------------------------------------------------------------ <transformation code> print(customers.head()) # Write data into table "customers" of PostgreSQL database # ------------------------------------------------------------------------ print("Writing data to database...") customers.to_sql(name='customers', con=engine, if_exists='replace') print("Done")
Insérer ou mettre à jour des données dans PostgreSQL.
La fonction pandas to_sql peut être utilisée pour insérer ou remplacer des données. Si les données doivent être mises à jour, la bibliothèque SQLAlchemy peut être utilisée. L'extrait de code suivant montre comment cela peut être réalisé :
# Write DataFrame to temporary table on database customers.to_sql(name='customers_temp', con=engine, if_exists='replace') # Update target table using temporary table values update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address) SELECT customer_id, customer_name, customer_address FROM customers_temp t ON CONFLICT (customer_id) DO UPDATE SET customer_name = EXCLUDED.customer_name, customer_address = EXCLUDED.customer_address''' with engine.begin() as conn: conn.execute(text(update_sql))
Créer un convertisseur
Dans chaque répertoire de convertisseur, le service ETL SKOOR recherche un script appelé <nom du répertoire>_run.sh qui sera exécuté lors de l'exécution du convertisseur. Créez un répertoire appelé <nom-de-répertoire> avec ce script à l'intérieur et le script Python contenant la logique de traitement :
$ ls -1 customers/ customers.py customers_run.sh
Exemple de script clients_run.sh :
#!/bin/bash cd $(dirname $0) home=$(pwd) /opt/eranger/python3-env/bin/python3 $home/customers.py $@
Il faut maintenant créer un fichier zip contenant le répertoire du convertisseur avec les scripts. Ce fichier zip peut être chargé dans la section Data load des tableaux de bord SKOOR.
Créer un job SKOOR pour automatiser un convertisseur
Si nécessaire, les convertisseurs peuvent être exécutés automatiquement par un job. Configurez un job execute pour exécuter le script *_run.sh ci-dessus dans la structure de répertoire de SKOOR etl :
/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh
Exemple :
/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh