ETL/ELT مع الباندا

للاستخدام مع لوحات معلومات SKOOR، يمكن قراءة البيانات من مصادر مختلفة مثل قواعد البيانات أو واجهات برمجة التطبيقات REST أو الملفات وتحميلها في قاعدة بيانات محلية. إذا لزم الأمر، يتم تحويلها إما قبل أو بعد التحميل. تتم هذه المعالجة، التي تسمى ETL (استخراج، تحويل، تحميل) أو ELT (استخراج، تحميل، تحويل)، بواسطة ما يسمى بالمحولات في حل SKOOR. عادةً ما يتم إنشاء محولات SKOOR باستخدام Talend أو Pandas. وفقًا لهيكل دليل محدد، يمكن إنشاؤها وضغطها لتحميلها في قسم استيراد البيانات. يغطي هذا الدليل الإعداد الأساسي لمحول pandas.

يرجى زيارة موقع الوثائق الخاص بـ pandas للحصول على تفاصيل حول المنتج ومرجع الأوامر وما إلى ذلك.

المتطلبات الأساسية

عادةً ما تستخدم المحولات وحدات Python التالية:

  • pandas (برنامج pandas)

  • pyarrow (مكتبة مفيدة)

  • openpyxl (مكتبة لقراءة/كتابة ملفات Microsoft Excel)

  • sqlalchemy (مكتبة للوصول إلى قواعد بيانات مختلفة)

  • psycopg2-binary (محول قاعدة بيانات PostgreSQL)

مثال على المحول

الإعداد الأساسي

رأس ملف بانداس:

import pandas as pd
from sqlalchemy import create_engine
import numpy as np
import sys, getopt
import datetime

# Variables
# =========================================================================================

# local PostgreSQL database to load data
pgUser = '<db user>'
pgPass = '<db password>'
pgDatabase = '<database name>'
pgHost = 'localhost'    # Change this host if necessary
pgPort = 5432           # Change this port if necessary

# Functions
# =========================================================================================

# process SKOOR ETL service args
def getArg(argument):
   try:
      opts, args = getopt.getopt(sys.argv,"",["sourceFile=", "sessionId="])
      for arg in args[1:]:
          splitArg = arg.split('=')
          if splitArg[0] == argument:
              return splitArg[1]

   except getopt.GetoptError:
      print(args[0], 'sourceFile=<source.file>')
      sys.exit(1)

قراءة ملف Excel والكتابة إلى قاعدة البيانات

يمكن استخدام الجزء التالي من البرنامج النصي كنقطة انطلاق لمعالجة ملف Excel ويمكن إضافته أسفل رأس البرنامج النصي أعلاه:

# Main script
# =========================================================================================
source_file = getArg("sourceFile")

# Check if a filename argument is available
if isinstance(source_file, str):
   print("Processing file " + source_file)

else:
   print("No input file defined!")
   exit(1)

# Create SqlAlchemy engine
engine = create_engine('postgresql://' + pgUser + ':' + pgPass + '@' + pgHost + ':' + str(pgPort) + '/' + pgDatabase)

# Read raw data from a sheet called "customer"
customers = pd.read_excel(source_file, engine='openpyxl', sheet_name='customer')

# Transform data if required
# ------------------------------------------------------------------------

<transformation code>

print(customers.head())

# Write data into table "customers" of PostgreSQL database
# ------------------------------------------------------------------------
print("Writing data to database...")
customers.to_sql(name='customers', con=engine, if_exists='replace')

print("Done")

إدراج أو تحديث البيانات في PostgreSQL

يمكن استخدام وظيفة pandas to_sql لإدراج أو استبدال البيانات. إذا كان يجب تحديث البيانات، فيمكن استخدام مكتبة SQLAlchemy. يوضح مقتطف الشفرة التالي كيفية تحقيق ذلك:

# Write DataFrame to temporary table on database
customers.to_sql(name='customers_temp', con=engine, if_exists='replace')

# Update target table using temporary table values
update_sql = '''INSERT INTO customers (customer_id, customer_name, customer_address)
    SELECT customer_id, customer_name, customer_address FROM customers_temp t
    ON CONFLICT (customer_id)
    DO UPDATE SET 
        customer_name = EXCLUDED.customer_name,
        customer_address = EXCLUDED.customer_address'''

with engine.begin() as conn:
    conn.execute(text(update_sql))

إنشاء محول

في كل دليل محول، تبحث خدمة SKOOR ETL عن برنامج نصي يسمى <directory-name>_run.sh والذي سيتم تنفيذه عند تشغيل المحول. قم بإنشاء دليل يسمى <directory-name> يحتوي على هذا البرنامج النصي وبرنامج Python النصي الذي يحتوي على منطق المعالجة:

$ ls -1 customers/
customers.py
customers_run.sh

مثال على البرنامج النصي customers_run.sh:

#!/bin/bash

cd $(dirname $0)
home=$(pwd)

/opt/eranger/python3-env/bin/python3 $home/customers.py $@ 

الآن، يجب إنشاء ملف مضغوط يحتوي على دليل المحول مع البرامج النصية. يمكن تحميل هذا الملف المضغوط في قسم تحميل البيانات في لوحات معلومات SKOOR.

إنشاء مهمة SKOOR لأتمتة المحول

إذا لزم الأمر، يمكن تشغيل المحولات تلقائيًا بواسطة مهمة. قم بتكوين مهمة تنفيذ لتشغيل البرنامج النصي *_run.sh أعلاه في بنية الدليل لـ SKOOR etl:

/var/opt/run/eranger/eranger-etl/converters/<converter>/<converter>/<converter>_run.sh

مثال:

/var/opt/run/eranger/eranger-etl/converters/customers/customers/customers_run.sh