Kategória: Adat
ETL
Az adatfeldolgozás során tipikus művelet az adatok beolvasása egy adatforrásból, annak valamilyen átalakítása és más adattárolóba történő mentése. Az ETL ezt a három lépést rövidíti:
- E: Extract, azaz az adatok kinyerése,
- T: Transform, azaz az adatok átalakítása, valamint
- L: Load, azaz az adatok mentése.
Ezeket a lépéseket tetszőleges programozási nyelven végre lehet hajtani. A kifejezetten ETL rendszerekhez viszont többnyire nem kell programozói tudás, a folyamatokat általában grafikusan kell összeollózni. Ezeket a folyamatokat angolul pipeline-nak, azaz csővezetéknek hívjuk: az adat valahonnan jön, valami történik vele és valahova kerül.
Az ETL rendszerek tipikusan számos (több tucat, vagy akár több száz) adatforrást támogatnak: különböző fájlformátumokat, adatbázisokat, szolgáltatásokat. Az ETL rendszerek sok esetben szorosan kapcsolódnak az üzleti intelligenciához: a nyers adatokat előkészítik megjelenítésre, például szűrik, aggregálják, és az ETL outputja lesz az üzleti intelligencia rendszer inputja.
Számos ETL rendszer létezik, néhány példa:
- Pentaho Data Integration (Kettle): Java alapú, nagy tudású, de nehézkes (ingyenes változat nincs, a 30 napos kipróbáláshoz céges e-mail cím szükséges).
- Apache Hop: nyílt forráskódú Pentaho fork.
- Microsoft SSIS (SQL Server Integration Services): a Microsoft ökoszisztéma része.
- Apache Beam: az SDK támogatja a kötegelt (batch) és folyamatos (stream) adatfeldolgozást is.
- Apache Nifi: áramlás alapú rendszer.
Apache Hop
Ezt a szakaszt a Cubix [|Business Intelligence rendszerek fejlesztése] című képzés második házi feladata ihlette.
Telepítés
Töltsük le az Apache Hop rendszer a https://hop.apache.org/download/ oldalról és csomagoljuk ki. Az alkalmazás indítása: hop-gui.bat.
Pipeline és Workflow
A rendszer két fő fogalma a Pipeline és a Workflow. Mindkettőt a kék alapú kerek + ikonra kattintva tudjuk létrehozni; ott rá kell keresni a megfelelő elemre.
- Pipeline: ez tartalmazza a tulajdonképpeni adatbetöltés → átalakítás → adatmentés folyamatot. Ahhoz, hogy használni tudjuk, fájlba kell menteni, melynek kiterjesztése hpl.
- Workflow: ez pipeline-okat tartalmazhat. Tipikus felépítése: az egyik pipeline eredménye a másik pipeline inputja. Ezt is le kell menteni külön fájlba, melynek tipikus kiterjesztése hfw.
Előkészületek
Adatok beszerzése:
- Töltsük le a tranzakciós adatokat innen: https://www.kaggle.com/datasets/smayanj/e-commerce-transactions-dataset
- Töltsük le az országok adatait innen: https://www.kaggle.com/datasets/fernandol/countries-of-the-world
Adatbázis beállítása:
- Töltsük le a MySQL Community verzióját. A Windows-os telepítőt, ami minden lényeges komponenst tartalmaz, innen lehet letölteni: https://dev.mysql.com/downloads/installer/.
- Telepítsük fel. A telepítés során be kell állítani azt, hogy azonosítóval és jelszóval lehessen csatlakozni. A root jelszót meg kell adni. A teszteléshez nem szükséges, de jó gyakorlat külön felhasználót létrehozni.
- Indítsuk el a MySQL Workbench nevű alkalmazást.
- Hozzunk létre egy adatbázist transactions_db néven.
- Hozzuk létre a következő tranzakciós adattáblát:
CREATE TABLE transactions (
Transaction_ID INT PRIMARY KEY,
User_Name VARCHAR(255),
Age INT,
Country VARCHAR(100),
Product_Category VARCHAR(100),
Purchase_Amount DECIMAL(10, 2),
Payment_Method VARCHAR(50),
Transaction_Date DATE
);- Hozzuk létre a következő országokat tartalmazó adattáblát:
CREATE TABLE countries (
Country VARCHAR(100),
Region VARCHAR(100),
Population INT,
Area_sq_mi INT,
Pop_Density DECIMAL(10,2),
Coastline DECIMAL(10,2),
Net_migration DECIMAL(10,2),
Infant_mortality DECIMAL(10,2),
GDP INT,
Literacy DECIMAL(10,2),
Phones DECIMAL(10,2),
Arable DECIMAL(10,2),
Crops DECIMAL(10,2),
Other DECIMAL(10,2),
Climate DECIMAL(10,2),
Birthrate DECIMAL(10,2),
Deathrate DECIMAL(10,2),
Agriculture DECIMAL(10,4),
Industry DECIMAL(10,4),
Service DECIMAL(10,4)
);Meghajtó beállítása:
- Töltsük le a MySQL connector meghajtót innen: https://dev.mysql.com/downloads/connector/j/.
- Másoljuk be a connector fájlt ide: hop\lib\jdbc\mysql-connector-j-9.6.0.jar.
- Indítsuk újra az Apache Hop-ot.
- Bal oldalon kattintsunk a Matadata ikonra.
- Kattintsunk jobb egérgombbal a Relational Database Connection-re, és válasszuk ki a New menüpontot.
- A Connection Type a legördülő menüből legyen MySQL. A többit töltsük ki értelemszerűen: adjunk neki nevet (pl. transactions_mysql), a felhasználónév root (ha nem állítottunk be mást), a jelszó amit megadtunk telepítéskor, a Server host name localhost, a port maradjon 3306, a database name transactions_db, a Database type Mysql 8+.
- Kattintsunk a Test nyomógombra hogy meggyőződjünk, a kapcsolat működik.
- Bezáráskor mentsük el.
Fájlból olvasás és adatbázisba írás
Ennyi előkészület után jöhet az első Pipeline!
- Extract: CSV file input (a tranzakciók betöltése CSV-ből)
- Ha a navigálás során "eltévedünk", akkor kattintsunk bal oldalon a legfelső ikonra (File Explorer).
- Kattintsunk bal oldalon felül a + ikonra → keressünk meg a Pipeline-t.
- Kattintsunk a bal gombbal a megjelenő munkafelületen.
- Válasszuk ki ezt: CSV file input. Itt elég sok ikon van, érdemes rákeresni (és ezt a későbbiekben is megtenni).
- Kattintsunk a CSV file input feliratra a megjelenő ikon alatt.
- A Transform name akár maradhat, de érdemes átnevezni valami egyértelműbbre, pl. Transactions CSV input.
- A Filename sor végén kattintsunk a Browse… nyomógombra, és keressük meg és válasszuk ki ezt a fájlt: ecommerce_transactions.csv.
- Alul kattintsunk a Get Fields nyomógombra, majd az OK-ra. Itt beolvassa az első 100 sort (vagy amennyit megadtunk), és az alapján megpróbálja kitalálni az adatformátumot.
- Kattintsunk a Preview gombra. Ha minden rendben történt, megjelenik az első 1000 (vagy amennyit megadtunk) sor.
- Zárjuk be a dialógusablakokat az OK gomb(ok) megnyomásával.
- Load: Table output (kiírás adatbázisba)
- A fentihez hasonlóan adjunk hozzá egy Table output ikont.
- A Shift-et nyomva tartva húzzunk egy nyilat az inputból az outputba. Itt tehát most nem lesz transzformáció. Az output végénél rákérdez, hogy mi legyen a szerepe; ezt válasszuk: Main output of transform.
- Kattintsunk rá az ikon alatti Table output feliratra.
- Adjunk neki egy megfelelő nevet, pl. Transactions table output.
- A Connection legyen a korábban megadott kapcsolat, pl. transactions_mysql.
- A Target schema legyen a korábban létrehozott adatbázis, pl. transactions_db.
- A Target table legyen a korábban létrehozott tábla: transactions. (Ez utóbbit a Browse… segítségével is ki tudjuk választani, aminek következtében a Target schema is kiválasztódik.)
- Kattintsuk be a Truncate table-t, hogy a lefutások során üres táblával kezdjünk.
- Kattintsunk az OK-ra.
További lépések:
- Mentsük le a Save ikonra kattintva. Adjunk neki nevet, pl. CSV_load.hpl.
- Kattintsunk a lejátszás ikonra.
- A megjelenő Run Options ablakban kattintsunk a Launch-ra.
Ha minden rendben történt, a Logging fülön nincs hiba, az ikonokon megjelenik egy-egy zöld pipa, a MySQL Workbench-ben pedig ha ráfrissítünk a Transactions táblára, akkor látjuk benne az adatokat.
Átalakítások
Következő lépésben töltsük be az országokat. Itt már szükség lesz átalakításokra. Használhatjuk a már megkezdet pipeline-t, de kezdhetünk újat is.
- Extract: CSV file input (az országok betöltése CSV-ből)
- A fentiekhez hasonlóan töltsük be a countries of the world.csv fájlt.
- Transfer 1: String operations (a felesleges szóközök leszedése)
- Adjunk hozzá egy String operations lépést.
- Húzzunk nyilat a CSV inputról az imént hozzáadotthoz.
- Kattintsunk a String operations feliratra.
- Adjunk neki megfelelő nevet, pl. Trim fields.
- Kattintsunk a Get field-re. Két mező kell, hogy megjelenjen: Country és Region.
- A Trim type alatt mindkettő legyen both.
- Transfer 2: Value mapper (országok átnevezése)
- A tranzakció és az ország táblában két ország másképp van elnevezve, emiatt át kell az egyikben nevezni, praktikusan az országok táblában (ott a gyorsabb).
- Adjunk hozzá egy Value mapper lépést.
- A Field name to use ez legyen: Country.
- Source value: United States, Target value: USA.
- Source value: United Kingdom, Target value: UK.
- Load: Table output (az eredmény kiírása adatbázisba)
- A fentiek mintájára hozzunk létre egy Table output-ot az országok számára is.
- Target table: countries.
- A Truncate table legyen kiválasztva.
- A Specify database fields is legyen itt kiválasztva.
- Ez esetben aktiválódik a Get fields. Kattintsunk rá.
- Kattintsunk erre: Enter field mapping,
- Megjelenik egy figyelmeztetés, hogy bizonyos mező hivatkozások nem találhatóak. Kattintsunk a Yes-re.
- Ebben az esetben az ajánlott kapcsolat megfelelő. Egyesével adjuk hozzá a párokat bal oldalon az Add segítségével, majd kattintsunk az OK-ra.
- Kattintsunk az OK-ra.
Indítsuk el a folyamatot, majd ellenőrizzük az eredményt. Figyeljük meg, hogy USA és UK szerepel az United States és United Kingdom helyett.
Aggregálás
Következő lépésben országok szerint aggregáljuk az eladásokat.
- Extract: Table input (a tranzakciók betöltése adatbázisból)
- Adjunk hozzá egy Table input lépést.
- A Connection maradjon az alapértelmezett (transactions_mysql).
- Kattintsunk a Get SQL select statement…-re.
- Válasszuk ki ezt: Tables → transactions_db.transactions, majd kattintsunk az OK-ra.
- Fogadjuk el, hogy beletegye a lekérdezésbe az oszlopneveket.
- Kattintsunk a Preview-ra, hogy lássuk, rendben betölti-e.
- Kattintsunk az OK-ra.
- Transform: Memory group by (aggregálás országok szerint)
- Adjunk hozzá egy Memory group by lépést.
- Kössük össze az előző lépést ezzel.
- Nyissuk meg a lépés részleteit a névre kattintva.
- Felül válasszuk ki azt, hogy mi szerint szeretnénk aggregálni: Country.
- Alul válasszuk ki, hogy mi legyen az aggregálás művelete.
- A vásárlások száma: a Name legyen total_purchase_count, a Subject legyen a Transaction_ID, a Type pedig Number of Values (N).
- A vásárlás teljes összege: a Name legyen total_purchase_amount, a Subject legyen a Purchase_Amount, a Type pedig Sum.
- Zárjuk be az OK-ra kattintva.
- Load: Table output (az eredmény kiírása adatbázisba)
- Hozzunk létre egy Table output lépést.
- Kapcsoljuk össze az előzővel.
- Nyissuk meg a részleteket.
- A Target table legyen ez: aggregated_by_country.
- Kapcsoljuk be ezt: Truncate table.
- Alul kattintsunk az SQL gombra.
- Ellenőrizzük le a generált SQL parancsot. Ha elfogadjuk, akkor kattintsunk az Execute gombra.
- A felugró, sikeres végrehajtást jelző dialógus ablakot rájuk be az OK-ra kattintva, majd - nem túl intuitív módon í kattintsunk a Cancel-re a bezáráshoz.
- Zárjuk be a lépés részleteit az OK-ra kattintva.
Mentsük le a Pipeline-t pl. aggregate by country.hpl néven. Futtassuk le. Ha nem ír ki hibát, akkor frissítsük a táblák listáját a MySQL Workbench-ben, és ellenőrizzük az új tábla tartalmát:
Összekapcsolás
Kapcsoljuk össze a két táblát ország szerint, és csoportosítsuk régió szerint.
- Extract 1: Table input (a tranzakciók betöltése adatbázisból)
- Hozzunk létre egy Table input lépést, és olvassuk be a tranzakciókat.
- Extract 2: Table input (az országok tranzakciók betöltése adatbázisból)
- Hozzunk létre egy Table input lépést, és olvassuk be az országokat.
- Transform 1: Stream lookup (az adattáblák összekapcsolása országok szerint)
- Adjunk hozzá egy Stream lookup lépést.
- Hozzuk kapcsoljuk össze a tranzakció beolvasást és az ország beolvasást is ezzel.
- A részleteknél Lookup transform: Countries. Amiatt kellett korábban átnevezni két országot, hogy ez a lépés sikeres legyen.
- Felül a Field és a Lookup field is legyen Country. Itt kapcsoljuk össze a két táblát.
- Alul a Field legyen Region; ezt fogjuk tovább adni. Majd kattintsunk az OK-ra.
- Transform 2: Memory group by (aggregálás régió szerint)
- Hozzunk létre egy Memory group by lépést. Kössük össze az előző aggregálást ezzel.
- A Group field legyen Region. Amiatt kellett az előző lépésben beállítani a Region-t, hogy itt ki tudjuk választani.
- Állítsuk be a total_purchase_count-ot és a total_purchase_amount-ot a fent leírtak szerint, és kattintsunk az OK-ra.
- Load: Table output (az eredmény kiírása adatbázisba)
- Hozzunk létre egy Table output lépést.
- A fent leírtaknak megfelelően hozzuk létre az aggregated_by_region táblát.
Mentsük le és futtassuk le a Pipeline-t. Az elvárt eredmény a MySQL Workbench-ben:
Számított mező hozzáadása
Számoljuk ki az évenkénti összesítést! A dátum napra pontosan van megadva, így ehhez a lépéshez ki kell nyerni az évet. A lépések nagy része a már megtanult.
- Extract: Table input (a tranzakciók betöltése adatbázisból)
- A fent leírtak szerint olvassuk be a tranzakciókat.
- Transform 1: Calculator (az év kinyerése a tranzakció dátumából)
- Adjunk hozzá egy Calculator lépést.
- New field: Transaction_Year (ez az új mező neve)
- Calculation: Year of date A
- Field A: Transaction_Date
- Transform 2: Memory group by (aggregálás év szerint)
- A Group field legyen Transaction_Year.
- Load: Table output (az eredmény kiírása adatbázisba)
- Az output tábla legyen aggregated_by_year.
Aggregálás több érték alapján
Aggregáljuk a vásárlásokat ország és év szerint.
- Extract: Table input (a tranzakciók betöltése adatbázisból)
- Transform 1: Calculator (az év kinyerése a tranzakció dátumából)
- Transform 2: Memory group by (aggregálás ország és év szerint)
- Itt két Group field legyen: Country és Transaction_Year.
- Load: Table output (az eredmény kiírása adatbázisba)
- Írjuk ki a aggregated_by_country_and_year táblába.
Számított érték szakasz alapján
Az aggregálást az egy főre eső GDP szerint hajtjuk végre. Az országokat 3 részre osztjuk: szegény (5000 dollár alatti), közepes (5000 és 20000 dollár közötti) és gazdag (20000 dollár feletti).
- Extract 1: Table input (a tranzakciók betöltése adatbázisból)
- Extract 2: Table input (az országok betöltése adatbázisból)
- Transform 1: összekapcsolás
- A Stream lookup lépésben a Specify fields to retrieve alá vegyük fel a GDP-t.
- Transform 2: Number range (GDP kategória létrehozása egy főre eső GDP szerint)
- Adjunk hozzá egy Number range lépést.
- Input field: GDP
- Output field: GDP_category
- Default value: unknown
- Ranges: Lower bound, Upper bound, Value
- (üres), 5000, low (below 5000)
- 5000, 20000, medium (5000-20000)
- 20000, (üres), high (above 20000)
- Transform 3: Memory group by (aggregálás GDP kategória szerint)
- A Group field legyen GDP_category.
- Load: Table output (az eredmény kiírása adatbázisba)
- Írjuk ki a aggregated_by_gdp_category táblába.
Eredmény:
Workflow létrehozása
A Pipeline mintájára hozzunk létre Workflow-t. Vegyünk fel Pipeline lépéseket és kössük össze. Itt párhuzamosíthatóak a lépések. Egy lehetséges eredmény:
Mentsük le és futtassuk le.






