ETL/ELT with pandas

For the use with SKOOR dashboards, data can be read from various sources like databases, REST APIs or files and loaded into a local database. If necessary, it is transformed either before or after loading. This processing, called ETL (extract, transform, load) or ELT (extract, load, transform), is done by so called converters in the SKOOR solution. SKOOR converters are typically created using Talend or Pandas. Following a specific directory structure, they can be built and zipped for upload into the Data import section. This guide covers a basic setup for a pandas converter.

Please visit the pandas documentation website for details about the product, command reference etc..

Prerequisits

The following Python modules are typically used by converters:

  • pandas (the pandas software)

  • pyarrow (a helpful library)

  • openpyxl (library to read/write Microsoft Excel files)

  • sqlalchemy (library to access various databases)

  • psycopg2-binary (PostgreSQL database adapter)

Example converter

Basic setup

Pandas script header:

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

Read Excel file and write to database

The following part of the script can be used as a starting point for processing an Excel file and may be added below the above script header:

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

Insert or update data in PostgreSQL

The pandas to_sql function can be used to insert or replace data. If data must be updated, the SQLAlchemy library can be used. The following code snippet shows how this can be achieved:

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

Create a converter

In each converter directory, the SKOOR ETL service looks for a script called <directory-name>_run.sh which will be executed when the converter is run. Create a directory called <directory-name> with this script inside and the Python script containing the processing logic:

$ ls -1 customers/
customers.py
customers_run.sh

Example script customers_run.sh:

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

Now, a zip file containing the converter directory with the scripts must be created. This zip file can be loaded in the Data load section of the SKOOR dashboards.

Create a SKOOR job to automate a converter

If required, converters can be run automatically by a job. Configure an execute job to run the above *_run.sh script in the directory structure of SKOOR etl:

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

Example:

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh