ETL

Kategória: Adat

ETL

Az adatfeldolgozás során tipikus művelet az adatok beolvasása egy adatforrásból, annak valamilyen átalakítása és más adattárolóba történő mentése. Az ETL ezt a három lépést rövidíti:

  • E: Extract, azaz az adatok kinyerése,
  • T: Transform, azaz az adatok átalakítása, valamint
  • L: Load, azaz az adatok mentése.

Ezeket a lépéseket tetszőleges programozási nyelven végre lehet hajtani. A kifejezetten ETL rendszerekhez viszont többnyire nem kell programozói tudás, a folyamatokat általában grafikusan kell összeollózni. Ezeket a folyamatokat angolul pipeline-nak, azaz csővezetéknek hívjuk: az adat valahonnan jön, valami történik vele és valahova kerül.

Az ETL rendszerek tipikusan számos (több tucat, vagy akár több száz) adatforrást támogatnak: különböző fájlformátumokat, adatbázisokat, szolgáltatásokat. Az ETL rendszerek sok esetben szorosan kapcsolódnak az üzleti intelligenciához: a nyers adatokat előkészítik megjelenítésre, például szűrik, aggregálják, és az ETL outputja lesz az üzleti intelligencia rendszer inputja.

Számos ETL rendszer létezik, néhány példa:

  • Pentaho Data Integration (Kettle): Java alapú, nagy tudású, de nehézkes (ingyenes változat nincs, a 30 napos kipróbáláshoz céges e-mail cím szükséges).
  • Apache Hop: nyílt forráskódú Pentaho fork.
  • Microsoft SSIS (SQL Server Integration Services): a Microsoft ökoszisztéma része.
  • Apache Beam: az SDK támogatja a kötegelt (batch) és folyamatos (stream) adatfeldolgozást is.
  • Apache Nifi: áramlás alapú rendszer.

Apache Hop

Ezt a szakaszt a Cubix [|Business Intelligence rendszerek fejlesztése] című képzés második házi feladata ihlette.

Telepítés

Töltsük le az Apache Hop rendszer a https://hop.apache.org/download/ oldalról és csomagoljuk ki. Az alkalmazás indítása: hop-gui.bat.

Pipeline és Workflow

A rendszer két fő fogalma a Pipeline és a Workflow. Mindkettőt a kék alapú kerek + ikonra kattintva tudjuk létrehozni; ott rá kell keresni a megfelelő elemre.

  • Pipeline: ez tartalmazza a tulajdonképpeni adatbetöltés → átalakítás → adatmentés folyamatot. Ahhoz, hogy használni tudjuk, fájlba kell menteni, melynek kiterjesztése hpl.
  • Workflow: ez pipeline-okat tartalmazhat. Tipikus felépítése: az egyik pipeline eredménye a másik pipeline inputja. Ezt is le kell menteni külön fájlba, melynek tipikus kiterjesztése hfw.

Előkészületek

Adatok beszerzése:

Adatbázis beállítása:

  • Töltsük le a MySQL Community verzióját. A Windows-os telepítőt, ami minden lényeges komponenst tartalmaz, innen lehet letölteni: https://dev.mysql.com/downloads/installer/.
  • Telepítsük fel. A telepítés során be kell állítani azt, hogy azonosítóval és jelszóval lehessen csatlakozni. A root jelszót meg kell adni. A teszteléshez nem szükséges, de jó gyakorlat külön felhasználót létrehozni.
  • Indítsuk el a MySQL Workbench nevű alkalmazást.
  • Hozzunk létre egy adatbázist transactions_db néven.
  • Hozzuk létre a következő tranzakciós adattáblát:
CREATE TABLE transactions (
    Transaction_ID INT PRIMARY KEY,
    User_Name VARCHAR(255),
    Age INT,
    Country VARCHAR(100),
    Product_Category VARCHAR(100),
    Purchase_Amount DECIMAL(10, 2),
    Payment_Method VARCHAR(50),
    Transaction_Date DATE
);
  • Hozzuk létre a következő országokat tartalmazó adattáblát:
CREATE TABLE countries (
    Country VARCHAR(100),
    Region VARCHAR(100),
    Population INT,
    Area_sq_mi INT,
    Pop_Density DECIMAL(10,2),
    Coastline DECIMAL(10,2),
    Net_migration DECIMAL(10,2),
    Infant_mortality DECIMAL(10,2),
    GDP INT,
    Literacy DECIMAL(10,2),
    Phones DECIMAL(10,2),
    Arable DECIMAL(10,2),
    Crops DECIMAL(10,2),
    Other DECIMAL(10,2),
    Climate DECIMAL(10,2),
    Birthrate DECIMAL(10,2),
    Deathrate DECIMAL(10,2),
    Agriculture DECIMAL(10,4),
    Industry DECIMAL(10,4),
    Service DECIMAL(10,4)
);

Meghajtó beállítása:

  • Töltsük le a MySQL connector meghajtót innen: https://dev.mysql.com/downloads/connector/j/.
  • Másoljuk be a connector fájlt ide: hop\lib\jdbc\mysql-connector-j-9.6.0.jar.
  • Indítsuk újra az Apache Hop-ot.
  • Bal oldalon kattintsunk a Matadata ikonra.
  • Kattintsunk jobb egérgombbal a Relational Database Connection-re, és válasszuk ki a New menüpontot.
  • A Connection Type a legördülő menüből legyen MySQL. A többit töltsük ki értelemszerűen: adjunk neki nevet (pl. transactions_mysql), a felhasználónév root (ha nem állítottunk be mást), a jelszó amit megadtunk telepítéskor, a Server host name localhost, a port maradjon 3306, a database name transactions_db, a Database type Mysql 8+.
  • Kattintsunk a Test nyomógombra hogy meggyőződjünk, a kapcsolat működik.
  • Bezáráskor mentsük el.

Fájlból olvasás és adatbázisba írás

Ennyi előkészület után jöhet az első Pipeline!

  • Extract: CSV file input (a tranzakciók betöltése CSV-ből)
    • Ha a navigálás során "eltévedünk", akkor kattintsunk bal oldalon a legfelső ikonra (File Explorer).
    • Kattintsunk bal oldalon felül a + ikonra → keressünk meg a Pipeline-t.
    • Kattintsunk a bal gombbal a megjelenő munkafelületen.
    • Válasszuk ki ezt: CSV file input. Itt elég sok ikon van, érdemes rákeresni (és ezt a későbbiekben is megtenni).
    • Kattintsunk a CSV file input feliratra a megjelenő ikon alatt.
    • A Transform name akár maradhat, de érdemes átnevezni valami egyértelműbbre, pl. Transactions CSV input.
    • A Filename sor végén kattintsunk a Browse… nyomógombra, és keressük meg és válasszuk ki ezt a fájlt: ecommerce_transactions.csv.
    • Alul kattintsunk a Get Fields nyomógombra, majd az OK-ra. Itt beolvassa az első 100 sort (vagy amennyit megadtunk), és az alapján megpróbálja kitalálni az adatformátumot.
    • Kattintsunk a Preview gombra. Ha minden rendben történt, megjelenik az első 1000 (vagy amennyit megadtunk) sor.
    • Zárjuk be a dialógusablakokat az OK gomb(ok) megnyomásával.
  • Load: Table output (kiírás adatbázisba)
    • A fentihez hasonlóan adjunk hozzá egy Table output ikont.
    • A Shift-et nyomva tartva húzzunk egy nyilat az inputból az outputba. Itt tehát most nem lesz transzformáció. Az output végénél rákérdez, hogy mi legyen a szerepe; ezt válasszuk: Main output of transform.
    • Kattintsunk rá az ikon alatti Table output feliratra.
    • Adjunk neki egy megfelelő nevet, pl. Transactions table output.
    • A Connection legyen a korábban megadott kapcsolat, pl. transactions_mysql.
    • A Target schema legyen a korábban létrehozott adatbázis, pl. transactions_db.
    • A Target table legyen a korábban létrehozott tábla: transactions. (Ez utóbbit a Browse… segítségével is ki tudjuk választani, aminek következtében a Target schema is kiválasztódik.)
    • Kattintsuk be a Truncate table-t, hogy a lefutások során üres táblával kezdjünk.
    • Kattintsunk az OK-ra.
transactions_load.jpg

További lépések:

  • Mentsük le a Save ikonra kattintva. Adjunk neki nevet, pl. CSV_load.hpl.
  • Kattintsunk a lejátszás ikonra.
  • A megjelenő Run Options ablakban kattintsunk a Launch-ra.

Ha minden rendben történt, a Logging fülön nincs hiba, az ikonokon megjelenik egy-egy zöld pipa, a MySQL Workbench-ben pedig ha ráfrissítünk a Transactions táblára, akkor látjuk benne az adatokat.

transactions_table.jpg

Átalakítások

Következő lépésben töltsük be az országokat. Itt már szükség lesz átalakításokra. Használhatjuk a már megkezdet pipeline-t, de kezdhetünk újat is.

  • Extract: CSV file input (az országok betöltése CSV-ből)
    • A fentiekhez hasonlóan töltsük be a countries of the world.csv fájlt.
  • Transfer 1: String operations (a felesleges szóközök leszedése)
    • Adjunk hozzá egy String operations lépést.
    • Húzzunk nyilat a CSV inputról az imént hozzáadotthoz.
    • Kattintsunk a String operations feliratra.
    • Adjunk neki megfelelő nevet, pl. Trim fields.
    • Kattintsunk a Get field-re. Két mező kell, hogy megjelenjen: Country és Region.
    • A Trim type alatt mindkettő legyen both.
  • Transfer 2: Value mapper (országok átnevezése)
    • A tranzakció és az ország táblában két ország másképp van elnevezve, emiatt át kell az egyikben nevezni, praktikusan az országok táblában (ott a gyorsabb).
    • Adjunk hozzá egy Value mapper lépést.
    • A Field name to use ez legyen: Country.
    • Source value: United States, Target value: USA.
    • Source value: United Kingdom, Target value: UK.
  • Load: Table output (az eredmény kiírása adatbázisba)
    • A fentiek mintájára hozzunk létre egy Table output-ot az országok számára is.
    • Target table: countries.
    • A Truncate table legyen kiválasztva.
    • A Specify database fields is legyen itt kiválasztva.
    • Ez esetben aktiválódik a Get fields. Kattintsunk rá.
    • Kattintsunk erre: Enter field mapping,
    • Megjelenik egy figyelmeztetés, hogy bizonyos mező hivatkozások nem találhatóak. Kattintsunk a Yes-re.
    • Ebben az esetben az ajánlott kapcsolat megfelelő. Egyesével adjuk hozzá a párokat bal oldalon az Add segítségével, majd kattintsunk az OK-ra.
    • Kattintsunk az OK-ra.

Indítsuk el a folyamatot, majd ellenőrizzük az eredményt. Figyeljük meg, hogy USA és UK szerepel az United States és United Kingdom helyett.

Aggregálás

Következő lépésben országok szerint aggregáljuk az eladásokat.

  • Extract: Table input (a tranzakciók betöltése adatbázisból)
    • Adjunk hozzá egy Table input lépést.
    • A Connection maradjon az alapértelmezett (transactions_mysql).
    • Kattintsunk a Get SQL select statement…-re.
    • Válasszuk ki ezt: Tables → transactions_db.transactions, majd kattintsunk az OK-ra.
    • Fogadjuk el, hogy beletegye a lekérdezésbe az oszlopneveket.
    • Kattintsunk a Preview-ra, hogy lássuk, rendben betölti-e.
    • Kattintsunk az OK-ra.
  • Transform: Memory group by (aggregálás országok szerint)
    • Adjunk hozzá egy Memory group by lépést.
    • Kössük össze az előző lépést ezzel.
    • Nyissuk meg a lépés részleteit a névre kattintva.
    • Felül válasszuk ki azt, hogy mi szerint szeretnénk aggregálni: Country.
    • Alul válasszuk ki, hogy mi legyen az aggregálás művelete.
      • A vásárlások száma: a Name legyen total_purchase_count, a Subject legyen a Transaction_ID, a Type pedig Number of Values (N).
      • A vásárlás teljes összege: a Name legyen total_purchase_amount, a Subject legyen a Purchase_Amount, a Type pedig Sum.
    • Zárjuk be az OK-ra kattintva.
  • Load: Table output (az eredmény kiírása adatbázisba)
    • Hozzunk létre egy Table output lépést.
    • Kapcsoljuk össze az előzővel.
    • Nyissuk meg a részleteket.
    • A Target table legyen ez: aggregated_by_country.
    • Kapcsoljuk be ezt: Truncate table.
    • Alul kattintsunk az SQL gombra.
    • Ellenőrizzük le a generált SQL parancsot. Ha elfogadjuk, akkor kattintsunk az Execute gombra.
    • A felugró, sikeres végrehajtást jelző dialógus ablakot rájuk be az OK-ra kattintva, majd - nem túl intuitív módon í kattintsunk a Cancel-re a bezáráshoz.
    • Zárjuk be a lépés részleteit az OK-ra kattintva.

Mentsük le a Pipeline-t pl. aggregate by country.hpl néven. Futtassuk le. Ha nem ír ki hibát, akkor frissítsük a táblák listáját a MySQL Workbench-ben, és ellenőrizzük az új tábla tartalmát:

aggregated_by_country.jpg

Összekapcsolás

Kapcsoljuk össze a két táblát ország szerint, és csoportosítsuk régió szerint.

  • Extract 1: Table input (a tranzakciók betöltése adatbázisból)
    • Hozzunk létre egy Table input lépést, és olvassuk be a tranzakciókat.
  • Extract 2: Table input (az országok tranzakciók betöltése adatbázisból)
    • Hozzunk létre egy Table input lépést, és olvassuk be az országokat.
  • Transform 1: Stream lookup (az adattáblák összekapcsolása országok szerint)
    • Adjunk hozzá egy Stream lookup lépést.
    • Hozzuk kapcsoljuk össze a tranzakció beolvasást és az ország beolvasást is ezzel.
    • A részleteknél Lookup transform: Countries. Amiatt kellett korábban átnevezni két országot, hogy ez a lépés sikeres legyen.
    • Felül a Field és a Lookup field is legyen Country. Itt kapcsoljuk össze a két táblát.
    • Alul a Field legyen Region; ezt fogjuk tovább adni. Majd kattintsunk az OK-ra.
  • Transform 2: Memory group by (aggregálás régió szerint)
    • Hozzunk létre egy Memory group by lépést. Kössük össze az előző aggregálást ezzel.
    • A Group field legyen Region. Amiatt kellett az előző lépésben beállítani a Region-t, hogy itt ki tudjuk választani.
    • Állítsuk be a total_purchase_count-ot és a total_purchase_amount-ot a fent leírtak szerint, és kattintsunk az OK-ra.
  • Load: Table output (az eredmény kiírása adatbázisba)
    • Hozzunk létre egy Table output lépést.
    • A fent leírtaknak megfelelően hozzuk létre az aggregated_by_region táblát.

Mentsük le és futtassuk le a Pipeline-t. Az elvárt eredmény a MySQL Workbench-ben:

aggregated_by_region.jpg

Számított mező hozzáadása

Számoljuk ki az évenkénti összesítést! A dátum napra pontosan van megadva, így ehhez a lépéshez ki kell nyerni az évet. A lépések nagy része a már megtanult.

  • Extract: Table input (a tranzakciók betöltése adatbázisból)
    • A fent leírtak szerint olvassuk be a tranzakciókat.
  • Transform 1: Calculator (az év kinyerése a tranzakció dátumából)
    • Adjunk hozzá egy Calculator lépést.
    • New field: Transaction_Year (ez az új mező neve)
    • Calculation: Year of date A
    • Field A: Transaction_Date
  • Transform 2: Memory group by (aggregálás év szerint)
    • A Group field legyen Transaction_Year.
  • Load: Table output (az eredmény kiírása adatbázisba)
    • Az output tábla legyen aggregated_by_year.
aggregated_by_year.jpg

Aggregálás több érték alapján

Aggregáljuk a vásárlásokat ország és év szerint.

  • Extract: Table input (a tranzakciók betöltése adatbázisból)
  • Transform 1: Calculator (az év kinyerése a tranzakció dátumából)
  • Transform 2: Memory group by (aggregálás ország és év szerint)
    • Itt két Group field legyen: Country és Transaction_Year.
  • Load: Table output (az eredmény kiírása adatbázisba)
    • Írjuk ki a aggregated_by_country_and_year táblába.

Számított érték szakasz alapján

Az aggregálást az egy főre eső GDP szerint hajtjuk végre. Az országokat 3 részre osztjuk: szegény (5000 dollár alatti), közepes (5000 és 20000 dollár közötti) és gazdag (20000 dollár feletti).

  • Extract 1: Table input (a tranzakciók betöltése adatbázisból)
  • Extract 2: Table input (az országok betöltése adatbázisból)
  • Transform 1: összekapcsolás
    • A Stream lookup lépésben a Specify fields to retrieve alá vegyük fel a GDP-t.
  • Transform 2: Number range (GDP kategória létrehozása egy főre eső GDP szerint)
    • Adjunk hozzá egy Number range lépést.
    • Input field: GDP
    • Output field: GDP_category
    • Default value: unknown
    • Ranges: Lower bound, Upper bound, Value
      • (üres), 5000, low (below 5000)
      • 5000, 20000, medium (5000-20000)
      • 20000, (üres), high (above 20000)
  • Transform 3: Memory group by (aggregálás GDP kategória szerint)
    • A Group field legyen GDP_category.
  • Load: Table output (az eredmény kiírása adatbázisba)
    • Írjuk ki a aggregated_by_gdp_category táblába.

Eredmény:

aggregated_by_gdp_category.jpg

Workflow létrehozása

A Pipeline mintájára hozzunk létre Workflow-t. Vegyünk fel Pipeline lépéseket és kössük össze. Itt párhuzamosíthatóak a lépések. Egy lehetséges eredmény:

workflow.jpg

Mentsük le és futtassuk le.

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License