ETL/ELT with pandas
For the use with SKOOR dashboards, data can be read from various sources like databases, REST APIs or files and loaded into a local database. If necessary, it is transformed either before or after loading. This processing, called ETL (extract, transform, load) or ELT (extract, load, transform), is done by so called converters in the SKOOR solution. SKOOR converters are typically created using Talend or Pandas. Following a specific directory structure, they can be built and zipped for upload into the Data import section. This guide covers a basic setup for a pandas converter.
Please visit the pandas documentation website for details about the product, command reference etc..
Prerequisits
The following Python modules are typically used by converters:
pandas (the pandas software)
pyarrow (a helpful library)
openpyxl (library to read/write Microsoft Excel files)
sqlalchemy (library to access various databases)
psycopg2-binary (PostgreSQL database adapter)
Example converter
Basic setup
Pandas script header:
import pandas as pd from sqlalchemy import create_engine import numpy as np import sys, getopt import datetime # Variables # ========================================================================================= # local PostgreSQL database to load data pgUser = '<db user>' pgPass = '<db password>' pgDatabase = '<database name>' pgHost = 'localhost' # Change this host if necessary pgPort = 5432 # Change this port if necessary # Functions # ========================================================================================= # process SKOOR ETL service args def getArg(argument): try: opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="]) for arg in args[1:]: splitArg = arg.split('=') if splitArg[0] == argument: return splitArg[1] except getopt.GetoptError: print(args[0], 'sourceFile=<source.file>') sys.exit(1)
Read Excel file and write to database
The following part of the script can be used as a starting point for processing an Excel file and may be added below the above script header:
# Main script # ========================================================================================= source_file = getArg("sourceFile") # Check if a filename argument is available if isinstance(source_file, str): print("Processing file " + source_file) else: print("No input file defined!") exit(1) # Create SqlAlchemy engine engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase) # Read raw data from a sheet called "customer" customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer') # Transform data if required # ------------------------------------------------------------------------ <transformation code> print(customers.head()) # Write data into table "customers" of PostgreSQL database # ------------------------------------------------------------------------ print("Writing data to database...") customers.to_sql(name='customers', con=engine, if_exists='replace') print("Done")
Insert or update data in PostgreSQL
The pandas to_sql function can be used to insert or replace data. If data must be updated, the SQLAlchemy library can be used. The following code snippet shows how this can be achieved:
# Write DataFrame to temporary table on database customers.to_sql(name='customers_temp', con=engine, if_exists='replace') # Update target table using temporary table values update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address) SELECT customer_id, customer_name, customer_address FROM customers_temp t ON CONFLICT (customer_id) DO UPDATE SET customer_name = EXCLUDED.customer_name, customer_address = EXCLUDED.customer_address''' with engine.begin() as conn: conn.execute(text(update_sql))
Create a converter
In each converter directory, the SKOOR ETL service looks for a script called <directory-name>_run.sh which will be executed when the converter is run. Create a directory called <directory-name> with this script inside and the Python script containing the processing logic:
$ ls -1 customers/ customers.py customers_run.sh
Example script customers_run.sh:
#!/bin/bash cd $(dirname $0) home=$(pwd) /opt/eranger/python3-env/bin/python3 $home/customers.py $@
Now, a zip file containing the converter directory with the scripts must be created. This zip file can be loaded in the Data load section of the SKOOR dashboards.
Create a SKOOR job to automate a converter
If required, converters can be run automatically by a job. Configure an execute job to run the above *_run.sh script in the directory structure of SKOOR etl:
/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh
Example:
/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh