ETL/ELT مع الباندا

للاستخدام مع لوحات معلومات SKOOR، يمكن قراءة البيانات من مصادر مختلفة مثل قواعد البيانات أو واجهات برمجة تطبيقات REST أو الملفات وتحميلها في قاعدة بيانات محلية. إذا لزم الأمر، يتم تحويلها إما قبل أو بعد التحميل. تتم هذه المعالجة، التي تسمى ETL (استخراج وتحويل وتحميل) أو ELT (استخراج وتحميل وتحويل)، بواسطة ما يسمى بالمحوّلات في حل SKOOR. عادةً ما يتم إنشاء محولات SKOOR باستخدام Talend أو Pandas. باتباع بنية دليل محددة، يمكن إنشاؤها وتحميلها للتحميل في قسم استيراد البيانات. يغطي هذا الدليل الإعداد الأساسي لمحول بانداس.

يُرجى زيارة موقع وثائق بانداس على الويب للحصول على تفاصيل حول المنتج ومرجع الأوامر وما إلى ذلك.

المتطلبات الأساسية

تستخدم وحدات بايثون التالية عادةً من قبل المحولات:

  • بانداس (برنامج بانداس)

  • بايرو (مكتبة مفيدة)

  • openpyxl (مكتبة لقراءة/كتابة ملفات مايكروسوفت إكسل)

  • sqlalchemy (مكتبة للوصول إلى قواعد البيانات المختلفة)

  • psycopg2-binary (محول قاعدة بيانات PostgreSQL)

مثال على المحول

الإعداد الأساسي

رأس البرنامج النصي لـ Pandas

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

قراءة ملف Excel والكتابة إلى قاعدة البيانات

يمكن استخدام الجزء التالي من البرنامج النصي كنقطة بداية لمعالجة ملف Excel ويمكن إضافته أسفل رأس البرنامج النصي أعلاه:

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

إدراج البيانات أو تحديثها في PostgreSQL

يمكن استخدام الدالة pandas to_sql لإدراج البيانات أو استبدالها. إذا كان يجب تحديث البيانات، يمكن استخدام مكتبة SQLAlchemy. يوضح مقتطف الشيفرة التالي كيف يمكن تحقيق ذلك:

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

إنشاء محول

في كل دليل محول، تبحث خدمة SKOOR ETL في كل دليل محول عن برنامج نصي يسمى <directory-name>_run.sh والذي سيتم تنفيذه عند تشغيل المحول. أنشئ دليلاً يسمى <directory-name> بداخله هذا النص البرمجي وبرنامج Python النصي الذي يحتوي على منطق المعالجة:

$ ls -1 customers/
customers.py
customers_run.sh

مثال على البرنامج النصي clients_run.sh:

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

الآن، يجب إنشاء ملف مضغوط يحتوي على دليل المحول مع النصوص البرمجية. يمكن تحميل هذا الملف المضغوط في قسم تحميل البيانات في لوحات معلومات SKOOR.

إنشاء مهمة SKOOR لأتمتة المحول

إذا لزم الأمر، يمكن تشغيل المحولات تلقائيًا بواسطة مهمة. قم بتكوين مهمة تنفيذ لتشغيل البرنامج النصي *_run.sh أعلاه في بنية دليل SKOOR etl:

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

مثال:

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh