Big Data

Áttekintés

Definíció

A big data napjaink egyik felfutó témája. Magyarra szó szerint úgy lehetne fordítani, hogy nagy adat, de nincs elterjedt magyar elnevezése. Ebben a leírásban az szerepel, ahogy a témát én megértettem. Igyekeztem az objektivitásra törekedni, de mivel nem vagyok a téma szakértője, hibák az oldalon lehetségesek.

Óriási mennyiségű adat áll rendelkezésünkre, hatalmas sebességgel keletkezik az új adat, ráadásul az új adat keletkezésének az üteme exponenciálisan gyorsul. Nincs tűéles határ a nagy mennyiségű adat és a big data között. Definíció szerint big datának nevezzük azt az adatmennyiséget, amelynek a feldolgozása hagyományos eszközökkel nem lehetséges, vagy legalábbis jelentős nehézségekbe ütközne. A big datát az alábbi 3, az angol elnevezések kezdőbetűi alapján 3V-nek nevezett dolog jellemzi:

  • Volume, azaz adatmennyiség: az adat nem fér el egy tárolóegységen, vagy a feldolgozáshoz szükséges adatmennyiség nem fér el a memóriában.
  • Velocity, azaz sebesség: az adatok keletkezési sebessége olyan mértékű, hogy egyetlen számítógép nem képes annak feldolgozására.
  • Variety, azaz változatosság: az adatokat nem tudjuk hagyományos módon, pl. egy relációs adatbázisnak sorokból és oszlopokból álló adattáblájában tárolni.

Ehhez még hozzá jön a negyedik V, a Veracity, ami az adatok megbízhatóságát jelenti. Ez egyfajta, a big data témában felmerülő probléma: hogyan bizonyosodjunk meg a nagy adathalmazban arról, hogy egy adat mennyire megbízható?

Példaként vegyük mondjuk a YouTube-ot, mely mindhárom feltételnek megfelel: nincs az a hatalmas merevlemez melyen elférne az összes feltöltött videó; nincs az a szuperszámítógép, mely képes feldolgozni a feltöltés alatt álló videókat és képes kiszolgálni a megtekintéseket; egy filmet (különösen egy élő stream-et) nem lehet beszúrni egy adatbázis cellába.

Az alapötlet

A big data esetén az adat tehát nem fér rá egy szokásos tárolóra, így azt el kell osztani több tárolón és meg kell valósítani a szokásos műveleteket, mint pl. másolás, olvasás, törlés stb., valamint gondoskodni kell arról is, hogy megfelelő legyen a rendelkezésre állás akkor is, ha kiesik egy komponens. Az adatok keletkezési sebessége pedig megköveteli a párhuzamos feldolgozást, mégpedig úgy, hogy a részfeladatokkal már egyetlen számítógép is meg tudjon birkózni, és a részeredmények egyesíthetőek legyenek.

A Hadoop volt az első ismert big data keretrendszer, ami a fenti problémákat kezelte. Egyrészt bevezette az elosztott adattárolót (a HDFS fájlrendszert), valamint a MapReduce néven ismertté vált big data feldolgozót. A ma ismert megoldások ennek a továbbgondolásai, így ahhoz, hogy otthonosan járjunk a témában, nélkülözhetetlen az alapötlet megértése.

A probléma kezelésére az alapötlet a MapReduce algoritmus. Ez az adatokat részekre osztja, párhuzamosan feldolgozza és az eredményt egyesíti. A lépései:

  • leképezés (map): <kulcs1, érték1> → lista (<kulcs2, érték2>)
  • kombinálás (combine): ugyanahhoz a kulcshoz tartozó értékek listájának elkészítése
  • redukálás (reduce): <kulcs2, lista (<érték2>)> → lista (<kulcs3, érték3>)

Itt ugyanaz a kulcs többször is szerepelhet. A fejlesztőnek a map és reduce függvényeket kell megvalósítania, a kombinálást a keretrendszer végzi.

A big data helló világ programja a szó számlálás; ebben a feladatban van egy óriási szöveg adatbázis, mely nemcsak hogy a memóriában nem fér el, de még egy számítógépen sem. A feladat megszámolni azt, hogy melyik szó hányszor fordul elő. A fenti algoritmusban viszont mindhárom lépés végrehajtható a megszokott módon. A megoldás megértéséhez lássuk, hogyan működik a fenti módszerrel a szószámláló!

Az input tehát egy óriási szövegfájl. Bontsuk fel sorokra; a kulcs1 tehát legyen egy sor ebben a fájlban, az érték1 pedig legyen üres. Itt azt feltételezzük, hogy egy sor már belefér a memóriába. Vegyük észre, hogy párhuzamosan akárhány számítógép feldolgozhatja a sorokat. Ebben a példában tehát van egy beolvasó, amely rögtön három folyamnak (stream) adja tovább a feladatot.

Vegyük a következő példát:

nem minden fajta szarka farka tarka csak a tarka fajta szarka farka tarka
sokat akar a szarka de nem birja a farka

Itt lesz két kulcs1: a két sor, az érték pedig mindkét esetben nem definiált. Tegyük fel, hogy az A feldolgozó kapja az első, a B a második sort.

Leképezés: bontsuk fel a kulcs1-et (tehát az épp feldolgozás alatt álló sort) szavakra. Ebből képezzünk kulcs2 - érték2 párokból álló listát, mégpedig úgy, hogy a kulcs2 a szó legyen, az érték2 pedig minden esetben 1. Ezt a lépést el lehet végezni egy számítógépen belül, és vegyük észre azt is, hogy más adatra nem volt szükség a lépés elvégzéséhez.

A példában a A számítógépen ez lesz az eredmény: (nem → 1), (minden → 1), (fajta → 1), (szarka → 1), (farka → 1), (tarka → 1), (csak → 1), (a → 1), (tarka → 1), (fajta → 1), (szarka → 1), (farka → 1), (tarka → 1).

A B számítógépen pedig a következő: (sokat → 1), (akar → 1), (a → 1), (szarka → 1), (de → 1), (nem → 1), (birja → 1), (a → 1), (farka → 1).

Kombinálás: a Hadoop rendszer képes arra, hogy az egyes folyamokból érkező outputokat, azaz kulcs-érték listákat kombinálja, azaz létre hozza az ugyanahhoz a kulcshoz (jelen esetben: ugyanahhoz a szavakhoz) tartozó értékeket. Ebben az esetben az érték mindegyik esetben 1 lesz, és a lista olyan hosszú lesz, ahányszor az adott szó előfordul. Itt feltételezzük, hogy a részeredmény belefér a memóriába, azaz jelen esetben maga a szó is és annyi egyes, ahányszor az adott szó előfordul összesen az adatbázisban.

A fenti példa folytatása; a kulcs2 - érték2 lista az alábbi lesz:

  • a → (1, 1, 1)
  • akar → (1)
  • birja → (1)
  • csak → (1)
  • de → (1)
  • fajta → (1, 1)
  • farka → (1, 1, 1)
  • minden → (1)
  • nem → (1, 1)
  • sokat → (1)
  • szarka → (1, 1, 1)
  • tarka → (1, 1, 1)

Redukálás: itt tehát egy kulcsból és hozzá tartozó érték listából kell egy kulcsot (ami lehet ugyanaz, mint a bejövő, de más is) és egyetlen értéket produkálni. Vegyük itt is észre azt, hogy a feladat elvégzéséhez nem kell látni a feldolgozandó szöveg egyéb részeit, így bele fér a memóriába.

A példában, mivel előfordulást számolunk, a számokat össze kell adnunk; a kulcs nem változik. A 12 összeadást tudjuk párhuzamosítani, pl. az A számítógép végzi az egyik, a B a másik felét. Az eredmény az alábbi lesz:

  • a → 3
  • akar → 1
  • birja → 1
  • csak → 1
  • de → 1
  • fajta → 2
  • farka → 3
  • minden → 1
  • nem → 2
  • sokat → 1
  • szarka → 3
  • tarka → 3

A műveletnek (jelen esetben ez az összeadás) kommutatívnak és asszociatívnak kell lennie. Ez esetben a műveletet már el lehet kezdeni helyben, majd a részeredményeket egy helyre gyűjtve folytatni.

Felhő szolgáltatások

Azzal, hogy a big data kategóriába eső adatokat nem tudjuk egyetlen számítógép segítségével végrehajtani, automatikusan megjelenik az igény a felhő szolgáltatások iránt, ahol tetszőleges mennyiségű erőforrást vásárolhatunk, és ezt ráadásul néhány kattintással meg is tudjuk változtatni.

Számos felhő szolgáltatás létezik (pl. Amazon Web Service, Google Cloud Platform, Microsoft Azure stb.). Nekem a Microsoft Azure-rel van némi tapasztalatom (https://portal.azure.com/), így azon keresztül mutatom be a felhő szolgáltatások tipikus lehetőségeit. Ez a szolgáltatás fizetős, de van ingyenes kipróbálási lehetőség. Az alábbi lista ill. kategóriák a saját elképzelésemet tükrözik.

  • Tárhely: a big datának hely kell, így a felhőben természetesen alap a nagy méretű tárhely szolgáltatás. Ilyen pl. a Azure Blob Storage vagy az Amazon S3.
  • Virtuális számítógépek: a felhő szolgáltatók lehetőséget biztosítanak arra, hogy virtuális számítógépet hozhassunk létre, amit valamilyen távoli eléréssel (pl. Remote Desktop Connection, ssh) tudunk kezelni. A létrehozáskor meg kell adnunk a szükséges paramétereket (memória, CPU, tárhely, operációs rendszer, kell-e fix IP stb.), és utána mi magunk alakítjuk ki a rendszert: telepítünk szoftvereket stb. Az Azure-ben lehetőségünk van Windows és Linux alapú virtuális gépek létrehozására is.
  • Szoftver mint szolgáltatás (software as a service, SaS): a felhő szolgáltatók tipikusan számos szoftvert kínálnak közvetlenül. Ez esetben nem kell nekünk magunknak feltelepíteni és gondoskodni a frissítésekről, ezt megoldja a felhő szolgáltató, és szakmai segítséget is nyújt szükség esetén. Mielőtt a saját virtuális környezetünkben telepítenénk egy szolgáltatást, érdemes megnézni, hogy a szolgáltató nyújtja-e azt SaS módon. Néhány példa abból a rengetegből, amit az Azure nyújt:
    • SQL adatbázis szerverek: Microsoft SQL, MySQL, MardiaDB; sőt, közvetlenül lehet SQL adatbázist is készíteni.
    • Data Warehouse: nagy méretű relációs adattáblák tárolására szolgál.
    • Analysis Services: az SQL egyfajta továbbgondolása, melyben a nagy méretű tény (fact) táblákat különböző dimenziók mentén aggregáljuk és számolunk ki értékeket MDX (SQL-re hasonlít) ill. DAX (Excel függvényekre hasonlít) nyelveken.
    • Databricks: Spark szolgáltatás. Nagyrészt e köré épül ez az oldal.
    • PowerBI: üzleti intelligencia szolgáltatás, melyben az adatokat diagramok formájában tudjuk megjeleníteni.
    • HDInsight: Kafka szolgáltatás, nagy mennyiségű üzenetek kezelésére.
    • Event hubs: a nagy mennyiségű üzenetek kezelésének Microsoft-os megoldása (hasonló a Kafkához).
    • Data factory: adatfolyam kezelő (ETL) rendszer.
    • Kubernates: automatikus alkalmazás telepítés.
  • Alapszolgáltatások: azokat a szolgáltatásokat sorolom ide, amelyek erőforrástól függetlenek, mindegyik erőforrásra érvényesek.
    • Felhasználók kezelése: beállítható, hogy mely felhasználók milyen erőforrásokhoz milyen szinten férhetnek hozzá. Az Azure esetén ez az Azure Active Directory. A felhasználókat tipikusan csoportokba szokás szervezni, és egy-egy csoportnak tudunk jogosultságokat adni adott erőforrásokhoz, pl. olvasási, írási vagy akár adminisztrátori jogosultság. Így aki az adott csoportba belekerül, akkor automatikusan örökli a csoport jogosultságait. Egy felhasználó több csoportnak is lehet a tagja, és a jogosultság halmaza az összes csoportja jogosultságának az uniója.
    • Hozzáférés kezelés: mindegyik erőforrásra beállítható, hogy mely IP tartományokról lehet elérni.
    • Monitorozás: a rendelkezésre állásról, a használatról, a hardver állapotáról ad kimutatásokat.

Spark

Áttekintés

Az első elterjedt big data technológia a Hadoop volt. Az Apache Spark valójában ennek a továbbgondolása. Néhány fontosabb komponense:

  • Fürt (cluster) vezérlés: ennek segítségével egyben láthatjuk az háttérben valójában elosztott erőforrásokat.
  • Az elosztott rendszert támogató adattípusok: rugalmas elosztott adathalmaz (Resilient Distributed Dataset, RDD), adat keret (DataFrame), adat halmaz (DataSet).
  • Spark SQL, mellyel a fenti adattípusokon lekérdezéseket lehet végrehajtani.
  • Spark Streaming, melynek segítségével az adatfolyamokat lehet kezelni.
  • MLib: gépi tanulás komponens.
  • GraphX a gráf műveletek végrehajtásához.

A Spark egy elosztott rendszer. A komponensek közül az egyik a vezérlő (vagy irányító, angolul driver), a többi a végrehajtó (worker). Négy programozási nyelvet támogat: Scala, Java, Python és R, valamint ötödikként a natív SQL-t is. Ebben a leírásban a Scalaról lesz szó, és a példák megértéséhez nélkülözhetetlen a nyelv ismerete.

A Sparkot alapvetőek kétféleképpen tudjuk használni: mi magunk alakítjuk ki a környezetet, vagy felhő szolgáltatásként vesszük igénybe. A gyakorlatban szinte kizárólag ez utóbbi a jellemző. Mindkét esetben az alábbiakra van lehetőségünk:

  • A parancsokat ki tudjuk adni parancssorból (kivéve Java). Ezt leginkább teszt jelleggel szoktuk használni.
  • Önálló alkalmazást tudunk készíteni (Scala esetén jar formában), és azt el tudjuk indítani. A gyakorlatban ez a tipikusabb.

A szószámlálós példa a felhőben

Regisztráció

Ha felhő szolgáltatásként szeretnénk igénybe venni, akkor egy kicsit előre kell szaladnunk, és a Databricks nevű szolgáltatásra kell előfizetnünk (erről részletesen lesz szó lejjebb). Létezik ingyenes kipróbálási lehetőség az alábbi módon:

A Databricks rendszerbe belépve hozzunk létre egy új fürtöt: bal oldalon Clusters → + Create Cluster → adjunk neki nevet, a többi maradjon alapértelmezett, majd kattintsunk a Create Cluster gombra. Miután létrejött, indítsuk el. Alapértelmezésben 120 perc után automatikusan leáll; ez esetben újra kell indítani.

Interaktív mód

Hozzunk létre egy notebookot: bal oldalon felül Home → a nevünk melletti kis lefele mutató nyílvégre kattintsunk → Create → Notebook. A neve legyen pl. ez: Spark Example, a Language pedig Scala.

A fenti példához kell a fájl a megfelelő tartalommal. A Databricks fájlrendszerről még lesz szó a későbbiekben; most megadom azt a két sort, amit másoljunk be a notebook-ba (Cmd 1), majd futtassuk le a jobb felső sarokban található kis háromszögre kattintva → Run cell:

dbutils.fs.rm("/FileStore/file.txt")
dbutils.fs.put("/FileStore/file.txt", "nem minden fajta szarka farka tarka csak a tarka fajta szarka farka tarka\nsokat akar a szarka de nem birja a farka")

Hozzunk létre egy új parancsot: az aktuális parancsnál lent középre tesszük a kurzort, akkor megjelenik egy + jel, arra kattintsunk rá. Az új parancsbeviteli mezőbe írjuk be a következőt:

sc.textFile("/FileStore/file.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_).collect().foreach(println)

Ha mindent jól csináltunk, akkor kiírja alatta a következőket:

(csak,1)
(sokat,1)
(birja,1)
(fajta,2)
(nem,2)
(szarka,3)
(a,3)
(de,1)
(farka,3)
(minden,1)
(tarka,3)
(akar,1)

Önálló alkalmazás Databricksben

Hozzuk létre a fenti példát önállóan futtatható formában! A build.sbt fájlhoz hozzá kell adni a Spark függőséget:

name := "Word Count"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.1" % "provided"

A provided kulcsszó azt jelenti, hogy az említett függőséget nem kell beletenni az eredménybe, mert a futtató környezetben jelen van, csak a fordításhoz kell.

Az src/main/scala/hu/faragocsaba/spark/wordcount/WordCount.scala tartalma:

package hu.faragocsaba.spark.wordcount

import org.apache.spark.sql.SparkSession

object WordCount {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("Word Count").getOrCreate()
    val sc = spark.sparkContext
    sc.textFile("/FileStore/file.txt").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_).collect().foreach(println)
  }
}

Fordítsuk le:

build package

Az eredményt töltsük fel a fürtbe: Clusters → kattintsunk a megfelelő fürt nevére → Libraries → Install New → Drop JAR here → válasszuk ki a target/scala-2.11/word-count_2.11-1.0.jar fájlt → Open → Install. Indítsuk újra a fürtöt: Restart felül. Majd hozzunk létre egy új parancsot akár a meglévő notebook-on, akár egy újon, az alábbi tartalommal:

hu.faragocsaba.spark.WordCount.main(Array())

Ha ezt lefuttatjuk, akkor szintén a fenti eredményt kapjuk.

A szószámlálós példa lokálisan

Mivel a lokális kialakítás nehézkes és nem is tipikus, lenyitható módon készítettem el, hogy ne foglaljon helyet, de szükség esetén meglegyen.

Egy tipikus Spark program felépítése

Egy tipikus Spark program beolvas valahonnan adatot, azt átalakítja, majd az eredményt valahova kiírja. Ezt a folyamatot hívjuk rövidítve ETL-nek (extract = kinyerés, transform = átalakítás, load = betöltés). A működés dióhéjban a következő: a vezérlő (driver) program a feladatot részekre tudja osztani, azt el tudja küldeni a fürtben (cluster) levő végrehajtóknak (executor), majd összesíti az eredményt. Az adatokon végrehajtható műveleteket két fő csoportba oszthatjuk, és a különbség megértése talán a legkritikusabb az egész big data technológia megértése szempontjából:

  • Átalakítások (transformations): ezek a függvények azt határozzák meg, hogy az adatok milyen átalakuláson menjenek keresztül. A legfontosabb a MapReduce algoritmusból örökölt map() függvény, ami átalakítja az adott adatot valami másra, pl. val lineLengths = textFile.map(s => s.length). Fontos függvény még a szűrés (filter()). ezek a függvények ún. lusta (lazy) kiértékelésűek: önmagában ezek hatására egyelőre semmi sem történik. Ez a tulajdonsága teszi lehetővé azt, hogy a Spark megfelelően tudjon optimalizálni.
  • Akciók (actions): e műveletek hatására számolódik ki az eredmény, azaz egy ilyen művelet eredményezi azt, hogy lefussanak az átalakítások is. A MapReduce algoritmusból ide tartozik a reduce() függvény, amely egy olyan függvényt vár paraméterül, melynek két azonos típusú paramétere van, az eredménye egy ugyanolyan típusú érték, a végrehajtott művelet pedig asszociatív és kommutatív.

Amint arról már szó volt, van egy vezérlő, amely kiosztja a feladatokat, és összegzi az eredményt. Produktív környezetben ez tipikusan azt jelenti, hogy a vezérlést más számítógép más processzora kapja meg, ahol a saját virtuális gépen történik a számolás. Ez a következőket jelenti:

  • Az egyes feladatok megkapják azokat a külső változókat, melyeket felhasznál a művelet. Azt viszont tilos módosítani, mivel a vezérlőnél található prototípus nem változik.
  • Mivel az egyes komponenseken fut a feldolgozás párhuzamosan, pl. a kiírás is az adott környezetben történik.
  • Ez azt is jelenti, hogy a működés eltér a lokális és az elosztott környezetben. Pl. lokális környezetben a kiírás is lokálisan történik, ill. a globális változó ténylegesen módosul. Emiatt óvatosan kell programoznunk, és mindig fel kell tennünk a kérdést, hogy ugyanúgy fog-e működni a programunk lokálisan mint élesben.

A függvényeket többféleképpen is megadhatjuk:

  • Scala anonymous függvényként. Egyszerűbb esetekben ezt célszerű használni.
  • Globális singleton objektumok statikus függvényeként. Összetettebb esetekben, ha ki szeretnénk szervezni a logikát, akkor használhatjuk ezt a módszert.
  • Egy osztálypéldány függvényét is átadhatjuk, viszont ez esetben a teljes példány szerializálódik és átadódik; ezt lehetőleg kerülnünk érdemes.

Minden egyes akció a transzformációk újbóli lefutását eredményezik. Tetszőleges elosztott adatra kiadhatjuk a persist() függvényt, amivel azt jelezzük a rendszernek, hogy szeretnénk, ha eltárolná a részeredményt, és ez esetben a következő művelet azt használja fel. Alapértelmezésben a memóriában próbálja tárolni az eredményt, és ha nem fér el, akkor bizonyosakat nem tárol el, hanem szükség esetén újra kiszámolja. Az finomhangolható, hogy pontosan hogyan hajtsa végre, pl. ha nem fér el a memóriában, akkor lemezre is írhatja.

Léteznek olyan műveletek, melyek a feladatok újra elosztását (shuffle) váltják ki. Az alap MapReduce is tartalmaz a map és a reduce között egy shuffle-t. Ezt a Sparkban a reduceByKey() függvény. Ebben az esetben (valójában az eredeti MapReduce algoritmusnak megfelelően) kulcs-érték párokról van szó, és az azonos kulcsokhoz tartozó értékeken szeretnénk végrehajtani a reduce-t. Viszont az azonos kulcsokhoz tartozó értékek szét vannak szórva a fürt különböző részein, tehát a művelet során össze kell gyűjteni minden egyes kulcshoz tartozó minden értéket minden egységről, majd újra szétosztani a feladatokat. Ez egy nagyon erőforrás igényes művelet, így óvatosan hajtsuk végre. Nem elég ismételni: a műveletnek magának kommutatívnak és asszociatívnak kell lennie, így a Spark tud úgy optimalizálni, hogy az egyes komponenseken végrehajtja a rész reduce műveletet, utána hajtja csak végre (kisebb adatmennyiségen) a shuffle-t, majd osztja szét ismét, hogy folytassák a reduce-t, végül összegyűjti az eredményeket. A shuffle műveletek is lazy műveletek.

A fenti szószámolós példában a bemenet egy szövegfájl volt, az átalakítás a szavak előfordulásainak megszámlálása, a kimenet pedig a képernyő. Vegyük jobban szemügyre a példát!

sc
  .textFile("/FileStore/file.txt")
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_+_)
  .collect()
  .foreach(println)

A Spark shellben a "semmiből" előkerül egy sc nevű objektum. Valójában két objektumot nyújt alapból a Spark rendszer:

Ezeket tudjuk tehát használni az elosztott adatkezeléshez. Elméletben tehát elképzelhető, hogy a futás még ezen a kicsi példán is elosztottan történt. Viszont azzal, hogy a kód egy része a vezérlőn más részei pedig a végrehajtón futnak, különös odafigyelést igényel az erőforrás kezelés. Pl. ha egy kapcsolatot megnyitunk mondjuk a vezérlőn vagy az egyik végrehajtón, akkor az nem lesz aktív egy másik végrehajtón, arról külön kell gondoskodni, hogy minden végrehajtón létrejöjjön.

Az önálló alkalmazás esetén ezt létre kell hozni, amit többek között a példában megadott módokon tudunk megtenni. Ahhoz, hogy a programunk elosztottan fusson, ezen objektumok valamelyikét kell használnunk. Ha "csak úgy" megírjuk a programot, az futni fog Sparkon is, de nem lesz elosztott.

Szedjük szét kicsit a fenti kódot!

val file = sc.textFile("/FileStore/file.txt")

A file típusa org.apache.spark.rdd.RDD[String] lesz. Az RDD a Resilient Distributed Dataset (rugalmas elosztott adathalmaz) a Spark egyik alaptípusa; ahogy a nevéből is következik, egy olyan adattípusról van szó, melyen elosztott műveleteket lehet végrehajtani, és erről még lesz szó részletesen. Felfoghatjuk úgy is mint egy lista, melynek adott típusú (jelen esetben String) elemei vannak.

A példában Databricks rendszert feltételezünk, és a Databricks fájlrendszer egy speciális részét használjuk.

Az RDD a Scala gyűjtemények leggyakoribb műveleteit biztosítja. Ezek folyam (stream) jellegűek. Megjegyzés a fogalommal kapcsolatban: míg Scala-ban folyamként tekintünk minden gyűjteményre, a Spark szövegkörnyezetben stream alatt a (potenciálisan) végtelen folyamokat értik. Ezeknek semmi közük a kezdeti Java-ban használt stream fogalomhoz úgy, mint pl. FileInputStream. Lássuk a példaprogram műveleteit:

  • flatMap(line => line.split(" ")): ez a művelet az alap Scala-ban a listák listáját módosítja egyetlen listává. A Sparkban egy függvényt adunk át paraméterül, ami listává bontja az elemeket, és ezekből készít ismét RDD-t. A példában az átalakítás inputja sorok tetszőleges számú szóval, az outputja pedig szavak RDD-je. A típus marad RDD[String]
  • map(word => (word, 1)): elem ketteseket (tuple) készít a szavakból. A lépés eredménye tehát olyan RDD, melynek típusa (String, Int), és a második elem (tehát a szám) mindig 1.
  • reduceByKey(_+_): a fenti elemketteseket kulcs-érték párokként értelmezve végrehajtja az ugyanahhoz a kulcshoz tartozó értékeken a megadott műveletet, jelen esetben az összeadást. Az eredmény szintén egy (String, Int) típusú RDD, melyben a bal oldali elem csak egyszer fordul elő, a jobb oldali pedig annak előfordulása. Ezt amiatt tehetjük meg, mert az összeadás kommutatív és asszociatív, így párhuzamosíthatjuk a műveletet.

A fenti pontig valójában nem történik semmi! Ha itt fejeződne be a program, akkor nem indulna el a feldolgozás. A fenti műveletek ún. transzformációs műveletek, és azt mondják meg, hogy ha rájuk kerül a sor, akkor milyen átalakulást hajtson végre rajtuk a rendszer. Egy nagyon fontos művelet hiányzik innen: a filter(), ami egy predikátum függvényt vár paraméterül, ami tehát egy paramétert vár, és logikai értéket ad vissza, és a szűrést hajtja végre.

Ahhoz, hogy a fenti műveletek végrehajtódjanak, akcióra van szükség. A példában ez a collect(). Igazából enélkül is menne, ha rögtön a foreach jönne; az is olyan akciónak számít, ami kiváltja a lefutást. A collect nélkül csak a naplófájlban jelent meg az eredmény, a konzolon nem. Tehát collect nélkül a foreach egy RDD függvény; a collect után viszont már a collect eredményeként létrejövő gyűjteményen hajtódik végre, az már nem RDD eljárás. Számos egyéb akció művelet van, ezek némelyikéről még lesz szó bővebben.

Globális változók

Amint arról szó volt, kerülnünk kell a globális változók használatát az átalakítások és az akciók során is. Itt érdemes megismerkednünk a closure fogalmával, ami dióhéjban egy függvény környezetét jelenti, tehát magát a függvényt, a paramétereket és azokat a külső változókat, melyeket használ. A programunk megírásakor mindig arra kell gondolnunk, hogy ez élesben akár több tucat rendszeren fog futni párhuzamosan, és egy-egy műveletnél mindegyiknek el kell küldeni a closure-t, ami nem gazdaságos. Ráadásul ha módosítjuk is a globális változót, az nehezen felderíthető hibákhoz vezet. Ha mindenképpen globális változót szeretnénk használni, akkor az alábbi lehetőségeink vannak:

  • Ha nagyobb mennyiségű változót szeretnénk csak olvasni, akkor használhatjuk a broadcast-ot (pl. val broadcasted = sc.broadcast(Array(1, 2, 3))).
  • Ha módosítani szeretnénk egy globális értéket, akkor azt ún. akkumulátor változókban tehetjük meg. Léteznek egész (longAccumulator) és lebegőpontos (doubleAccumulator) akkumulátor változók, de mi magunk is készíthetünk saját típust, az AccumulatorV2 absztrakt osztályból származva, megvalósítva a szükséges műveleteket. Nagyon fontos, hogy a műveletek kommutatívak és asszociatívak legyenek, különben helytelen eredményt kapunk. Egy példa a helyes működésre: val accum = sc.longAccumulator("My Accumulator"), majd sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)).

Lássunk egy példát! Tekintsük az alábbi nem túl szép, de működő programot!

var sum = 0
Array(1, 2, 3, 4).foreach(x => sum = sum + x)
println(sum)

Működik, azt írja ki, amit elvárunk tőle (10), de nem párhuzamosítható. Ebben a formában persze ennek nincs jelentősége, de nagy adazmennyiség esetén már lenne. Először próbáljuk meg átírni Spark RDD megoldássá!

var sum = 0
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => sum = sum + x)
println(sum)

Ez már elosztott módon fut, az eredmény viszont 0! A problémát az okozza, hogy a sum globális változót a driver node-on inicializáltuk, amit átadtunk a worker node-oknak, a növelése ott történt, vissza viszont nem került, és a kiírás ismét a driver node-on történt.

Módosítsuk úgy, hogy egyrészt elosztott legyen, másrészt működjön is!

val sum = sc.longAccumulator("Sum Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => sum.add(x))
println(sum)

Ez már úgy viselkedik, ahogy elvárjuk.

Adattípusok

Az adattípusok bemutatásakor a következő feladatot hajtjuk végre. Egy adat két részből áll: egy megnevezésből és egy számból. A példában a megnevezés gyümölcs lesz, a szám meg mondjuk értelmezhető darabként. Az alábbi műveletet fogjuk többféleképpen végrehajtani:

  • Vegyük csak azokat a gyümölcsöket, melyek melletti szám nagyobb mint kettő.
  • Számoljuk ki és jelenítsük meg a gyümölcsönkénti összegeket és átlagokat.

Mindhárom adattípusnál megpróbáljuk többféleképpen létrehozni az adatot majd többféleképpen feldolgozni. Mielőtt belevágunk, hozzuk is létre a gyumolcsok.txt fájlt az alábbi tartalommal:

alma,5
körte,8
szilva,3
alma,7
banán,2
körte,1

RDD

Az RDD a Resilient Distributed Dataset rövidítése, ami magyarul kb. rugalmas elosztott adathalmazt jelent. Erről részletesen a https://spark.apache.org/docs/latest/rdd-programming-guide.html oldalon olvashatunk, az API itt található: https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html. Ez az adattípus már kissé túlhaladott, a DataFrame és Dataset átvette a szerepét, viszont a működés megértéséhez nélkülözhetetlen az RDD ismerete.

RDD-t alapvetően háromféleképpen lehet létrehozni:

  • Egy létező Scala struktúrából az sc.parallelize() függvénnyel, pl. val gyumolcsokRDD = sc.parallelize(Seq("alma,5", "körte,8", "szilva,3", "alma,7", "banán,2", "körte,1")).
  • Beolvasással, pl. val gyumolcsokRDD = sc.textFile("gyumolcsok.txt").
  • Egy másik elosztott adattípusból (RDD, DataFrame, Dataset), átalakítással.
  • DataFrame-ből vagy Dataset-ből (ld. lejjebb), az rdd függvény segítségével.

Az adatokat legegyszerűbben a következő utasítással tudjuk megjeleníteni: gyumolcsokRDD.collect.foreach(println). Az imént létrehozott példákban az adat típusa szöveg. Bontsuk fel (szöveg, szám) típusra, egyúttal megismerkedünk a map() függvénnyel:

val gyumolcsokRDDSplit = gyumolcsokRDD.map(gyumolcs => {
    val splitted = gyumolcs.split(",")
    (splitted(0), (splitted(1).toInt))
})

Ezt megjelenítve a következőt kapjuk eredményül:

scala> gyumolcsokRDDSplit.collect.foreach(println)
(alma,5)
(körte,8)
(szilva,3)
(alma,7)
(banán,2)
(körte,1)

Végül végrehajtjuk a fent vázolt algoritmust, miközben megismerünk pár fontos függvényt:

  • A filter() függvény egy predikátum függvényt vár paraméterül. Jelen esetben a második paraméternek kell nagyobbnak lennie 2-nél, azaz filter(x => x._2 > 2) (vagy tömörebben filter(_._2 > 2)).
  • Az átlagszámításhoz az összegen kívül a darabszámra is szükség van. Mivel a következő függvény, a reduceByKey() olyan adattípust vár, mely két részből áll, a mostani (név, szám) értéket átalakítjuk a következőre: (név, (szám, 1)): map(x => (x._1, (x._2, 1))).
  • A shuffle művelet során összeadjuk az azonos kulcshoz tartozó érték kettős értékeit: reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)). Ez nagy adathalmaz esetén drága művelet, mert a vezérlő összegyűjti a részeredményeket, majd kulcsonként ismét szétküldi. A művelet egyébként nem az RDD osztályon belül definiált, hanem a PairRDDFunctions osztályon belül (https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/PairRDDFunctions.html), ahova implicit konverzióval jutunk. Az API dokumentáció ezt a függvényt javasolja használni az RDD osztályban található groupBy függvény helyett.
  • Végül kiszámoljuk az összeget és átlagot: map(x => (x._1, x._2._1, 1.0 * x._2._1 / x._2._2)).

Összefoglalva:

val gyumolcsokRDDOsszegzett = gyumolcsokRDDSplit
    .filter(x => x._2 > 2)
    .map(x => (x._1, (x._2, 1)))
    .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
    .map(x => (x._1, x._2._1, 1.0 * x._2._1 / x._2._2))
gyumolcsokRDDOsszegzett.collect.foreach(println)

Az eredmény:

(körte,8,8.0)
(szilva,3,3.0)
(alma,12,6.0)

A feladat megoldható, de nehezen áttekinthető, ráadásul fordítási időben szinte semmilyen hibát nem tudunk felderíteni.

DataFrame

A DataFrame adattípus az R-hez hasonlóan az adatok táblázatos elrendezését jelenti. A cél az volt, hogy segítségével SQL utasításokat tudjunk kiadni. A Spark 2.0 verziótól kezdve a DataFrame nem más, mint egy Dataset[Row] alias. Így az API dokumentáció is közös: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html; a DataFrame-re nem típusos Dataset-ként hivatkozik. A vonatkozó tutorial ez: https://spark.apache.org/docs/latest/sql-programming-guide.html.

DataFrame-et is többféleképpen létrehozhatunk:

  • Tetszőleges szekvenciából a toDF() függvény segítségével, pl. val gyumolcsokDF = Seq(("alma", 5), ("körte", 8), ("szilva", 3), ("alma", 7), ("banán", 2), ("körte", 1)).toDF, vagy az oszlopnevek megadásával: val gyumolcsokDF = Seq(("alma", 5), ("körte", 8), ("szilva", 3), ("alma", 7), ("banán", 2), ("körte", 1)).toDF("nev", "darab"). (Ehhez az önálló alkalmazások esetén szükség van erre: import spark.implicits._, mégpedig azt követően, hogy létrejött a spark.)
  • Egy RDD-ből, szintén a toDF() függvény segítségével, pl. val gyumolcsokDF = gyumolcsokRDDSplit.toDF("nev", "darab")
  • A spark.createDataFrame(data, schema) segítségével, ahol az első paraméter egész sokféle adat lehet, a sémát pedig a lent megadott módon tudjuk megadni.
  • Beolvasással, pl. a gyumolcsok.txt CSV fájlként történő kezeléssel (ehhez is szükség van a sémára): val gyumolcsokDF = spark.read.format("csv").schema(schema).load("gyumolcsok.txt").

A séma kétféleképpen is megadható, az alábbi kettő ekvivalens

import org.apache.spark.sql.types._
val schema1 = new StructType()
    .add(StructField("nev", StringType, true))
    .add(StructField("darab", LongType, true))
val schema2 = StructType(List(
    StructField("nev", StringType, true),
    StructField("darab", LongType, true)
))

Vegyük most példaként a beolvasást, ezzel dolgozzunk tovább, egyúttal megismerünk két nagyon hasznos függvényt: a show táblázatosan kirajzolja a DataFrame tartalmát:

import org.apache.spark.sql.types._

val gyumolcsokDF = spark.read.format("csv").schema(StructType(List(
          StructField("nev",StringType,true),
          StructField("darab",LongType,true))
      )).load("gyumolcsok.txt")

gyumolcsokDF.show

Eredmény:

+------+-----+
|   nev|darab|
+------+-----+
|  alma|    5|
| körte|    8|
|szilva|    3|
|  alma|    7|
| banán|    2|
| körte|    1|
+------+-----+

A printSchema a sémát rajzolja ki:

gyumolcsokDF.printSchema

Eredmény:

root
 |-- nev: string (nullable = true)
 |-- darab: long (nullable = true)

És most lássuk a DataFrame igazi erejét! Ha még nem töltöttük be a spark.implicits._-t, akkor most tegyük meg, a createOrReplaceTempView() függvény segítségével hozzunk létre egy ideiglenes adattáblát, majd hajtsunk végre egy szokásos SQL utasítást:

import spark.implicits._

gyumolcsokDF.createOrReplaceTempView("gyumolcsok")
spark.sql("SELECT nev, sum(darab), avg(darab) FROM gyumolcsok WHERE darab > 2 GROUP BY nev").show

Eredmény:

+------+----------+----------+
|   nev|sum(darab)|avg(darab)|
+------+----------+----------+
|szilva|         3|       3.0|
|  alma|        12|       6.0|
| körte|         8|       8.0|
+------+----------+----------+

Ez a kód sokkal olvashatóbb mint az RDD segítségével megvalósított! (Megjegyzés: előfordult, hogy nem Windows-on nem működött.)

Az olvashatóság már meg van, viszont ha elvétjük az SQL utasítást, akkor az csak futásidőben derül ki, ráadásul nem túl segítőkész hibaüzenet kíséretében. A fentivel lényegében ekvivalens a következő:

import org.apache.spark.sql.functions._

gyumolcsokDF.select($"nev", $"darab").where($"darab" > 2).groupBy($"nev").agg(sum("darab"), avg("darab")).show

Eredmény:

+------+----------+----------+
|   nev|sum(darab)|avg(darab)|
+------+----------+----------+
|szilva|         3|       3.0|
|  alma|        12|       6.0|
| körte|         8|       8.0|
+------+----------+----------+

Valószínűleg ez az a szintaxis, ami miatt megalkották a DataFrame adattípust: itt már a fordító is sokat segít, ha valamit elrontunk. Ráadásul az RDD-ben használható függvények itt is működnek.

A groupBy eredményének típusa már nem DataFrame, hanem RelationalGroupedDataset (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/RelationalGroupedDataset.html). Az ezen belül végrehajtott agg() metódus eredménye viszont már DataFrame típusú.

Dataset

Amint arról már szó volt, a DataFrame egy Row típusú Dataset. A Dataset típusa viszont bármilyen case class lehet. Dataset-et legegyszerűbben úgy tudunk létrehozni, hogy a DataFrame-en meghívjuk az as[Tipus] típusmódosítót. Pl.:

case class Gyumolcs(nev: String, darab: Long)
val gyumolcsokDS = gyumolcsokDF.as[Gyumolcs]

Ezen is működik a show, ill. a korábban bemutatott eljárások.

A Dataset viszont mivel már típusos, a fordító fordítási időben jelzi a hibákat. Pl.: gyumolcsokDS.filter(gyumolcs => gyumolcs.darab > 2).show. Ezen felül működik rajta a DataFrame-nél bemutatott függvények is.

Azonban - sajnos - a select lekérdezést nem lehet tovább egyszerűsíteni (legalábbis nekem nem sikerült), és mindegyik RDD függvény sem működik. A fent megfogalmazott logika megvalósítása az alábbi:

gyumolcsokDS
  .filter(_.darab > 2)
  .groupByKey(_.nev)
  .reduceGroups((a, b) => Gyumolcs(a.nev, a.darab + b.darab))
  .map(_._2)
  .show()

A kód magyarázata:

  • filter(_.darab > 2): hasonló az RDD-hez. De figyeljük meg: itt mezőnévre hivatkozunk.
  • groupByKey(_.nev): a megadott mező alapján képez csoportokat. A visszatérési típusa KeyValueGroupedDataset<K,T>.
  • reduceGroups((a, b) => Gyumolcs(a.nev, a.darab + b.darab)): ez egy KeyValueGroupedDataset osztályon belüli metódus, amely a csoportokon belül elvégzi a megadott műveletet. Láthatjuk, hogy itt is mezőnévvel hivatkozunk az adatra.
  • map(_._2): az előző függvény visszatérési típusa egy elem kettes. A bal oldali elem tartalmazza a példában a csoport nevét, ami a gyümölcs neve, a jobb oldali pedig az eredményt, ami egy Gyumolcs típusú objektum. Nekünk ez utóbbira van szükségünk.

Adatfolyamok

A folyamok a nagy adathalmazok természetes velejárói. A hagyományos algoritmusokban az összes adatot betöltjük a memóriába, és mindegyik elemet meg tudjuk címezni. Viszont ha az adat nem fér be a memóriába, akkor egyesével kell végigpásztáznunk az elemeken, lokálisan végrehajtható műveleteket hajtunk végre rajtuk, ill. asszociatív és kommutatív műveletekkel egyszerűsítünk. Léteznek véges és végtelen folyamok.

A Spark-ban kétféle folyam módszertan alakult ki: az első a DStreams volt, az újabb pedig a strukturált folyam. Mivel a big datában a folyamok egy alapvető eszköze a Kafka, azt külön megnézzük.

A példákhoz szükség van a nc programra, amit Windows-on közvetlenül nem tudunk használni, és nem egyértelmű a beállítása.

DStreams

Mivel ennek a helyét már átvette a strukturált streaming, ráadásul technikai nehézségek is adódtak vele, a technológia leírása alapból rejtve van, hogy helyet ne foglaljon.

Structured Streaming

A strukturált streaming felfogható egy végtelen táblának, amelyben folyamatosan új sorok jelennek meg. A leírása itt található: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

A szokásos szószámolós példa, ami a localhost:9999-ről érkező szavakat számolja, Spark Shellből a következő:

val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start()

Ugyanez önálló alkalmazásként:

build.sbt:

name := "Structured Streaming Example"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.1" % "provided"

src/main/scala/hu/faragocsaba/spark/StructuredStreamingExample.scala:

package hu.faragocsaba.spark

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

object StructuredStreamingExample {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
    val words = lines.as[String].flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()
    val query = wordCounts.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()
  }
}

Az import spark.implicits._ nem egy megszokott helyre került. Ennek az az oka, hogy a spark objektum létrejötte utánra kell tenni, hogy a végrehajtókon is meglegyenek.

Az output mode (ami a fenti példában complete) a következő lehet:

  • Complete: mindent kiír; a szószámolós példában tehát az összes szót, ami a program indulása óta érkezett.
  • Append: csak az új sorokat írja ki; a példában tehát azokat a szavakat, amelyek a legutóbbi futás óta érkeztek. (Ez nálam nem működött.)
  • Update: csak azt írja ki, ami változott.

A fenti példa socket-ről olvassa az adatokat, melyet csak tesztelési céllal javasolnak. Lehet még fájlból olvasni (példát lejjebb találunk), ill. (ami a leggyakoribb) Kafka topic-ról. Ez utóbbiról még részletesen lesz szó.

Típustól függően lehet paraméterezni a forrást az .option(kulcs, érték) hívással. A fenti példában a szervert és a portot állítjuk be. A fenti oldalon láthatjuk, hogy melyik típusnál milyen beállítási lehetőségek vannak.

A gyakorlatban sokszor előfordul, hogy a végrehajtás során a folyamat ne folyamatosan menjen, hanem ablakokban dolgozza fel azt. Itt három fogalmat kell megismernünk:

  • A futás gyakorisága: ezt mutatja meg, hogy milyen gyakran fusson le a feldolgozás; a lenti példában 10 perc.
  • Egy ablak hossza: adott lefutáskor mennyi adatot vegyen figyelembe; a lenti példában 20 perc (tehát minden adat kétszer fog szerepelni).
  • Watermarking: mennyi ideig vegye figyelembe a későn érkező adatokat. A lenti példában ez 30 perc, ami azt jelenti, hogy ha legfeljebb ennyit késik az adat (tehát pl. most érkezik meg egy 28 perccel ezelőtti adat), akkor visszamenőlegesen aktualizálja azt a két ablakot.

Az alábbi utasításokat a Spark Shell-ből adjuk ki:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val wordSchema = StructType(Array(
  StructField("timestampStr", StringType),
  StructField("word", StringType)
))

val streamingWords = spark.readStream.schema(wordSchema).csv("words").select(unix_timestamp($"timestampStr", "yyyy/MM/dd HH:mm:ss").cast(TimestampType).as("timestamp"), current_timestamp().as("current"), $"word")
val windowedWords = streamingWords.withWatermark("timestamp", "30 minutes").groupBy(window($"timestamp", "20 minutes", "10 minutes"), $"word").count()
val query = windowedWords.writeStream.format("console").option("truncate", false).start()

Hozzunk létre egy words nevű könyvtárat, és oda helyezzünk el fájlokat (a fájlnév mindegy) megfelelő tartalommal, kb. így:

2019/04/19 12:10:15, alma
2019/04/19 12:10:20, alma
2019/04/19 12:10:25, barack
2019/04/19 12:10:30, alma
2019/04/19 12:10:35, alma

Az időpont a futáskori időpont legyen. Majd adjunk hozzá újabb és újabb fájlokat; a konzolon látni fogjuk az eredményt. (Megjegyzés: önálló alkalmazásként futtatva nálam nem az elvárt módon működött.)

A stream-elésnél a szokásos DataFrame és Dataset függvények nagy része használható (pl. select(), where() ill. filter(), map()), de van pár, amely vagy logikailag nem értelmezhető (pl. count() vagy distinct()) vagy még technikailag nincs megvalósítva (pl. többszörös stream aggregáció).

A fenti példákban a kimenet a konzolon történt (ld. writeStream.format("console")). További lehetőségek:

  • Fájl: pl. writeStream.format("parquet"). A parquet fájl a szokásos adattábla transzponáltja; nagy mennyiségű adatnál bizonyos műveletek végrehajtása így gazdaságosabb, mivel nagyobb eséllyel kerül ugyanarra a fizikai lemezre egy olyan adat, amire szűrünk.
  • Kafka: writeStream.format("kafka"). Opcióként meg kell adni a a szervert, a topic nevet és még opcionálisan pár adatot.
  • Memória (egészen pontosan egy virtuális adattábla a memóriában): .format("memory").queryName("tableName").
  • Foreach: writeStream.foreach(…), ahol paraméterül egy ForeachWriter példányt kell átadni. Itt 3 műveletet kell megvalósítani: open(), process(), close(). Ügyeljünk arra, hogy az egyes lépések nem a vezérlőn hajtódnak végre, így lokálisan kell megvalósítani az adatkapcsolatot. Mivel alapból minden egyes elemre felépíti a teljes kapcsolatot, érdemes egy közös készletet (angolul pool-t) létrehozni, azokat nyitva tartani és onnan választani egy írót.
  • Foreach batch: writeStream.foreachBatch(…). Ez hatékonyabb mint a foreach(), mivel itt nagyobb egységenként hajtódik végre a művelet.

Databricks

Amint arról már volt szó, a Databricks az a cég, amely a Sparkot fejleszti, és egyúttal maga a termék (mint szolgáltatás) maga is ezt a nevet viseli. Korábban már volt szó a regisztrációról, az ingyenes kipróbálási lehetőségről. A dokumentációja (https://docs.databricks.com/) igen alapos és hasznos, különösen amiatt, mert egy az egyben használható példakódokat tartalmaz. Ez a szakasz a Databricks legfontosabb komponenseit mutatja be.

Fő komponensek

A Databricks fő komponensei az alábbiak:

Clusters (fürtök)

Itt tudunk létrehozni olyan virtuális környezeteket, amelyen fut majd a program. Meg tudjuk határozni az alapadatokat: a végrehajtók számának minimumát és maximumát (a tényleges terheléstől függően ez dinamikusan változik), a szoftververziót, a végrehajtó típusát (ez határozza meg pl. a memória mennyiségét) stb. Minél izmosabb konfigurációt választunk, annál gyorsabb lesz a futás, de annál többet kell fizetnünk érte. Legalább egy fürtöt létre kell hoznunk.

Egy fürt meghatározott idejő inaktivitást követően automatikusan leáll, és ezzel tudunk takarékoskodni. Alapértelmezésben ez 120 perc. Rákattintva egy fürtre az alábbiakat láthatjuk:

  • Configuration: itt találjuk az alap konfigurációt, pl. a végrehajtók száma stb. Itt tudjuk azt is beállítani, hogy mennyi hány perc inaktivitás után álljon le automatikusan.
  • Notebooks: az erre a fürtre csatlakozó füzetek listája.
  • Libraries: a fürthöz hozzácsatolt könyvtárak, azaz jar fájlok. Alapértelmezésben is tartalmaz párat, ill. ide tudjuk feltölteni a programunk által használt külső könyvtárakat, valamint az önállóan futó programokat is.
  • Event Log: az alap műveletek naplója, pl. indulás, átméretezés, leállás stb.
  • Spark UI, Spark Cluster UI - Master: itt láthatunk további részleteket a futásról, pl. mikor mekkora volt a terhelés, részleteket a környezetről stb.
  • Driver Logs: itt találhatóak a naplófájlok. A naplófájlok tehát fürthöz, és nem füzethez kötöttek.
  • Metrics: megmutatja, hogy hogyan változott a terhelés, ill. negyedóránként készít egy automatikus képernyőképet az összefoglaló oldalról.

Notebooks (füzetek)

Bal oldalon a Home, Workspace és Recents ikonokkal érhető el. A Home a saját füzeteket jelenti, a Workspace a közös (azaz lényegében ez tekinthető produktív környezetben), a Recents pedig segítséget nyújt a nemrég megnyitott füzetek gyors eléréséhez. Egy füzet létrehozásakor meg kell adnunk a programozási nyelvet, és azt is, hogy meny fürthöz csatlakozzon. Ez utóbbit utólag egy kattintással meg tudjuk változtatni. A füzeten belül parancsok vannak (Cmd). Új parancsot úgy tudunk beszúrni, hogy egy létező parancs alatt vagy fölött középre visszük az egeret, és rákattintunk a megjelenő + jelre. Lehetőség van csak egy adott cella parancsait végrehajtani, adott cella előttieket ill. adott cella utániakat, vagy az összes cellát. A módosítások történetét is meg tudjuk nézni a jobb felső sarokban.

Egy hasznos, csak itt működő parancs a display, ami megfelelő módon (táblázatosan és/vagy grafikus módon) megjeleníti a paraméterül átadott adatokat. Példaként hozzuk ismét létre a fenti gyumolcsokDF-et, majd adjuk ki a következő parancsot:

display(gyumolcsokDF)

Táblázatosan jelenik meg az adat. Kattintsunk alul a második ikonra, és válasszunk egyet, pl. ezt: Pie. Az eredmény egy szép tortadiagram lesz, automatikusan összegezve a számokat.

gyumolcsok.png

Jobs (feladatok)

Az időzített feladatokat tudjuk itt adminisztrálni. Egy Spark program tipikusan időzített formában fut, pl. naponta egyszer. Itt tudjuk megadni azt, hogy melyik füzet milyen gyakorisággal és melyik fürtön fusson. Egy feladat tipikus felépítése a következő: önálló alkalmazásként megírjuk, a keletkezett jar fájlt hozzáfűzzük a megfelelő fürthöz, létrehozunk egy füzetet, amely elindítja a fő programot, végül létrehozunk egy feladatot, ami ütemezetten futtatja a megfelelő füzetet. Ha a füzet indulásakor nem fut a fürt, akkor automatikusan elindul, majd meghatározott idő után leáll.

Data (adatok)

Az egyes fürtökhöz tartozhatnak adattáblák; ezeket találjuk bal oldalon, középen, a Data ikonra kattintva. Létre tudunk hozni adattáblákat, ill. az egyes táblákra kattintva meg tudjuk nézni a sémát, néhány sort meg tudunk nézni belőle, láthatjuk a méretét, valamint azt is, hogy ki és mikor módosította.

DBFS

A DBFS a DataBricks FileSystem rövidítése. A háttérben elosztott módon tárolt adatokat tudjuk ennek segítségével kezelni és az alapműveleteket végrehajtani. Dokumentáció:

Ezt többféleképpen el tudjuk érni.

Databricks CLI

Telepítsük fel a https://docs.databricks.com/dev-tools/cli/index.html oldalon leírtak szerint. Ehhez fel kell telepíteni először a Python-t (https://www.python.org/downloads/), a bin könyvtárat bele kell tenni a PATH környezeti változóba, majd pip3 install databricks-cli paranccsal telepítsük fel a Databricks CLI-t, végül adjuk ki a databricks configure —token parancsot.

Használat:

databricks fs ls

Rövidített formátum, és adott alkönyvtár listázása:

dbfs ls dbfs:/FileStore

Parancsok:

  • configure: egyszer kell végrehajtani, ld. fenn.
  • cp: ezzel lehet másolni a helyi fájlrendszer és a DBFS között.
  • ls: fájlok listázása.
  • mkdirs: könyvtár létrehozása.
  • mv: átnevezés DBFS-en belül.
  • rm: törlés.

dbutils.fs

Egy füzetből a következőt tudjuk kiadni:

dbutils.fs.ls("dbfs:/FileStore")

Ill. táblázatosan megjelenítve:

display(dbutils.fs.ls("dbfs:/FileStore"))

Erre (és az alábbiakra is) van egy rövidítés is:

%fs ls dbfs:/FileStore

A visszatérési érték egy struktúra, melynek elemei az alábbiak (https://docs.databricks.com/dev-tools/databricks-utils.html):

  • path: elérési útvonal (string)
  • name: a fájl vagy könyvtár neve (string)
  • isDir: logikai, ami azt mondja meg, hogy ez a bejegyzés fájl-e vagy könyvtár
  • size: a fájl mérete (64 bites egész szám), ill. 0, ha könyvtárról van szó

Pl. az elérési útvonalat a következőképpen tudjuk kinyerni.

dbutils.fs.ls("dbfs:/FileStore").map(fi => fi.path)

További példák:

dbutils.fs.help()
dbutils.fs.mkdirs("/mydir/")
dbutils.fs.put("/mydir/myfile.txt", "Hello, World!")

DBFS API

HTTP műveletekkel is tudjuk kezelni a fájlrendszert, melynek leírását itt találjuk: https://docs.databricks.com/dev-tools/api/latest/dbfs.html#dbfs-api.

Spark API

Az egyes adattípusokat ki tudjuk írni fájlba, ill. onnan be tudjuk olvasni, pl.:

df.write.text("/mydir/mydf.txt")

FileStore

A fenti példákban a fájlokat a /FileStore/ alá tettük, pl. /FileStore/file.txt. Ennek az az előnye, hogy a böngészőből is el tudjuk érni. Pl. az említett fájlt a következőképpen: https://<databricks-instance>/files/file.txt?o=######.

Secrets

Biztonsági szempontból talán az egyik leggyakoribb probléma a jelszavak kezelése: pl. egy adatbázist csakis jelszóval védetten érdemes üzemeltetni, ugyanakkor erre a jelszóra szüksége van a programoknak is, amelyek hozzá csatlakoznak. A legnyilvánvalóbb megoldás közvetlenül beleírni a forrásba, vagy egy property fájlba, így viszont könnyen illetéktelen kezekbe kerülhet. A jelszó környezeti változóban történő tárolása se sokkal jobb, a titkosított tárolása pedig komplikálttá teszi a kódot.

A Spark megoldása erre a problémára a Secrets (titkok): segítéségével titkosítva tároljuk az érzékeny adatokat a Spark rendszerben, és azonosítóval hivatkozunk rájuk. Dokumentáció: https://docs.databricks.com/security/secrets/. A Secrets részletes ismertetése túlmutat az oldal keretein, itt csak a legfontosabbak szerepelnek. A Secrets kezelése történhet Databricks CLI-ból vagy API-n keresztül, és a szokásos műveleteket (létrehozás, listázás, törlés) valamint a hozzáférés finomhangolását végre lehet hajtani; erről a hivatkozott oldalról kiindulva olvashatunk bővebben.

Példaként vegyük a JDBC adatbázis kapcsolatot! Először a hatókört (scope) kell definiálnunk. A legegyszerűbb ezt a lépést Databricks CLI segítségével végrehajtani:

databricks secrets create-scope --scope myJdbc --initial-manage-principal users

Utána betehetjük a titkainkat. A példában a felhasználónevet és a jelszót is titkosítjuk:

databricks secrets put --scope myJdbc --key username
databricks secrets put --scope myJdbc --key password

A felugró ablakban meg kell adnunk a tényleges értékeket. Használni a következőképpen tudjuk:

val jdbcUsername = dbutils.secrets.get(scope = "myJdbc", key = "username")
val jdbcPassword = dbutils.secrets.get(scope = "myJdbc", key = "password")

Widget-ek

A widget-ek segítségével könnyebben tudjuk paraméterezni a notebook-ot. Négyféle módon tudunk paramétereket bekérni:

  • Szöveges beviteli mező (text): ide szabad szöveget írhatunk.
  • Legördülő lista (dropdown): egy előre definiált listából választhatunk.
  • Kombinált (combobox): van egy lista, amiből választhatunk, de ugyanoda szabad szöveget is beírhatunk.
  • Többszörös választás (multiselect): többszörös választási lehetőség.

Mindegyiknél meg kell adnunk a nevet, ami alapján hivatkozunk majd rá, az alapértelmezett értéket, valamint annál, ami listát is tartalmazhat, a lista elemeit, valamint opcionálisan megadhatunk egy feliratot, amit megjelenít a változónév helyett. A https://docs.databricks.com/notebooks/widgets.html oldal egy kiváló kiinduló hely a téma megismeréséhez. Segítséget a következő paranccsal kérhetünk:

dbutils.widgets.help()

Adott típusú widget-re vonatkozó segítség kérése:

dbutils.widgets.help("dropdown")

Szöveges widget készítése:

dbutils.widgets.text("fruitText", "apple", "Fruit (text)")

Legördülő lista widget készítése:

dbutils.widgets.dropdown("fruitDropdown", "apple", Seq("apple", "orange", "banana"), "Fruit (dropdown)")

Kombinált widget készítése:

dbutils.widgets.combobox("fruitCombobox", "apple", Seq("apple", "orange", "banana"), "Fruit (combobox)")

Többszörös választás widget készítése:

dbutils.widgets.multiselect("fruitMultiselect", "apple", Seq("apple", "orange", "banana"), "Fruit (multiselect)")

Aktuális értékek lekérdezése:

val fruitTextValue = dbutils.widgets.get("fruitText")
val fruitDropdownValue = dbutils.widgets.get("fruitDropdown")
val fruitComboboxValue = dbutils.widgets.get("fruitCombobox")
val fruitMultiselectValue = dbutils.widgets.get("fruitMultiselect")

Ez utóbbit elég egyszer lefuttatni, és ha megváltoztatjuk felül a beviteli mező értékét, akkor automatikusan változik itt is. Az első 3 esetben az adott elemet kapjuk eredményül, a negyedikben pedig a kiválasztott elemek listáját, vesszővel elválasztva.

Egy widget törlése:

dbutils.widgets.remove("fruitText")

Az összes widget törlése:

dbutils.widgets.removeAll

Támogatott formátumok

Van jó pár adatformátum, amit a Spark alapból támogat, valamilyen beépített könyvtáron keresztül. Ha az általunk használni kívánt formátumot támogatja, akkor a kód lényegi része a következőre egyszerűsödik (ez Spark szinten támogatott, ld. bővebben itt: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html):

  • val df = spark.read.format("formátum").option(…).option(…)….load("…"): beolvasás esetén,
  • df.write.format("formátum").option(…).option(…)….save("…"): kiírás esetén.

Tehát meg kell adnunk a formátumot, a megadott opciókkal lehet finomhangolni, ami már formátum függő (pl. ha a formátum CSV, akkor meg tudjuk adni pl. azt, hogy van-e fejléce, vagy mi legyen az elválasztó karakter). A támogatott formátumokat megtaláljuk ezen az oldalon: https://docs.databricks.com/data/data-sources/. (Ha nem Databricks-en keresztül használjuk, akkor megfelelő könyvtárakra lehet szükségünk.)

A lenti listán jól látható, hogy igen sok a támogatott formátum, ami folyamatosan növekszik. Tehát mielőtt belefognánk a saját formátumunk implementálásába, mindenképpen érdemes megnézni, hogy van-e már rá támogatás, ill. fordítva, ha még nem döntöttünk az adatformátum mellett, akkor érdemes olyat választani, amit alapból támogat a Spark.

Adat tárházak

A felhő szolgáltatóknak általában van valamilyen adat tárház szolgáltatásuk, ahol fájlrendszer-szerűen tudjuk tárolni az adatokat. Két példa:

Fájl formátumok

CSV

A CSV a comma separated values (vesszővel elválasztott értékek) rövidítése. Gyakorlatilag egy nagyon leegyszerűsített táblázat, melyben az adatokat vessző (esetleg más elválasztó karakter) választja el. Tipikusan van neki fejléce, ami megmondja, hogy melyik oszlop mit jelent. Részletes leírás: https://docs.databricks.com/data/data-sources/read-csv.html.

Igen kiforrottnak tűnik a támogatása, számos opciót tudunk használni:

  • path: a fájlok elérési útvonala.
  • header: megadhatjuk, hogy az első sor fejléc-e. Alapértelmezett érték: false.
  • sep: mi legyen az elválasztó karakter. Alapértelmezés: , (vessző).
  • quote: mi legyen az idézőjel karakter. Alapértelmezés (nem túl meglepő módon): ".
  • escape: mi legyen az a karakter, ami elnyomja a rákövetkező speciális hatását. Pl. ha az elválasztó karakter a vessző, de vesszőt tartalmaz maga a cella, akkor ezt kell használni. Alapértelmezés: \; ez tehát azt jelenti, hogy ha egy cella vesszőt tartalmaz, az így jelenik meg: \,.
  • mode: ezzel azt tudjuk vezérelni, hogy mit kezdjen a rendszer a hibás sorral. Lehetséges értékek:
    • PERMISSIVE: mindegyik sort megpróbálja beolvasni. Szükség esetén null értékek jelennek meg, ill. ha túl sok az adat, akkor figyelmen kívül hagyja. Ez az alapértelmezett.
    • DROPMALFORMED: a hibás bejegyzéseket eldobja.
    • FAILFAST: hibás bejegyzés esetén kivétellel leáll a beolvasás.
  • charset: karakterkódolás. Alapértelmezés: UTF-8.
  • inferSchema: ezzel tudjuk azt jelezni, hogy megpróbálja-e kitalálni a sémát az adatokból. Ez esetben kétszer végig kell lépkednie az adatokon. Alapértelmezés: hamis (false).
  • comment: ezzel adhatjuk meg a megjegyzés karaktert, tehát azt, amit beolvasáskor figyelmen kívül hagy, ha ezzel a karakterrel kezdődik a sor. Alapértelmezett érték: #.
  • nullValue: mit tekintünk null értéknek a fájlban, azaz a beolvasás után a DataFrame-ben mi legyen null.
  • dateFormat: ezzel tudjuk megadni a dátumformátumot.

A sémát az opciók után a schema() függvénnyel tudjuk megadni. Sémát a fent megadott módon tudunk megadni, StructType segítségével. Példa a beolvasásra:

val myDf = spark.read.format("csv")
  .option("header", "true")
  .option("sep", ";")
  .schema(mySchema)
  .load("/.../data.csv")

JSON

A JSON formátum az internetes adatcsere egyik kedvelt formátuma annak tömörsége, olvashatósága és egyszerűsége révén, no meg amiatt, mert a JavaScript közvetlenül ezt használja az objektumok szerializálásához. Leírások:

Egy dataframe kiírása JSON-ba talán nem is lehetne egyszerűbb:

val gyumolcsokDF = Seq(("alma", 5), ("körte", 8), ("szilva", 3), ("alma", 7), ("banán", 2), ("körte", 1)).toDF("nev", "darab")
gyumolcsokDF.write.json("/FileStore/gyumolcsok.json")

Ha kilistázzuk:

sc.textFile("/FileStore/gyumolcsok.json").collect().foreach(println)

akkor a következőt kapjuk:

{"nev":"alma","darab":5}
{"nev":"körte","darab":8}
{"nev":"szilva","darab":3}
{"nev":"alma","darab":7}
{"nev":"banán","darab":2}
{"nev":"körte","darab":1}

Tehát egy sor egy JSON formában kapjuk az eredményt.

A valóságban a /FileStore/gyumolcsok.json nem egy fájl, hanem könyvtár! Győződjünk meg róla!

display(dbutils.fs.ls("/FileStore/gyumolcsok.json"))

Mindegyik JSON fizikailag külön fájlban van part-00000x-tid-[UUID]-… formában + még van 3 egyéb fájl. Ennek a törlése:

dbutils.fs.rm("/FileStore/gyumolcsok.json", true)

A törlés akkor sem fut hibára, ha nem létezik a könyvtár, csak a visszatérési értéke lesz false.

A beolvasás is ugyanolyan egyszerű mint a kiírás:

val gyumik = spark.read.json("/FileStore/gyumolcsok.json")
gyumik.show

Ez a módszer akkor is működik, ha nem a fent generált módon jön létre a JSON, hanem küldő forrásból. Ez esetben viszont ha több sorban található a JSON, arra nem jön rá automatikusan, azt külön jelezni kell: option("multiline", "true").

Avro

Az Apache Avro egy bináris adatformátum. Dokumentáció:

Az Spark újabb verziói alapból támogatják, a következő módon:

val df = spark.read.format("avro").load("/.../file.avro")

Hasonlóan írhatunk is ebben a formátumban.

A beállításokat viszont nem opciókként kell megadnunk, hanem globális paraméterekként, spark.conf.set("[kulcs]", "[érték]") formátumban, melyről a fenti linken olvashatunk bővebben.

Parquet

Az Apache Parquet formátum megfordítja a tárolás logikáját. A fenti adatformátumok mindegyike soronként tárolja az adatokat. A Parquet formátum helyett oszlopokként tárolja, azaz először jön az első oszlop, utána a második stb. Fizikailag ez nem egy fájl, hanem egy könyvtár, több fájllal. Nagy méretű adatoke setén bizonyos műveletek hatékonyabban így, mint a szokásos, sor orientált tárolás esetén.

A Parquet a Spark alapértelmezett formátuma. Dokumentáció:

Mivel ez az alapértelmezett formátum (hacsak nem állítottuk át a spark.sql.sources.default beállítással), akkor alapból használhatjuk így:

val df = spark.read.load("example.parquet")

Ha ki szeretnénk hangsúlyozni a formátumot, akkor írhatjuk így is:

val df = spark.read.parquet("example.parquet")

Egyéb formátumok

A Spark még számos egyéb formátumot támogat, melyek közül álljon itt pár, felsorolás szinten:

Adatbázisok

Spark táblázatok

A Data ikonra kattintva találjuk a Spark táblázatokat. Ez valójában csak egy nézet, és alatta sokféle adatformátum lehet.

Alapból normál SQL utasításokat adhatunk ki. Adatbázis létrehozása ill. törlése:

CREATE DATABASE mydb
...
DROP DATABASE mydb

Ha a füzet nyelve a Scala, akkor kétféleképpen tudunk SQL utasításokat kiadni:

  • ha %sql-t írunk a sor elejére, akkor közvetlenül;
  • a spark.sql("[SQL command]") utasítás segítségével.

Táblát létrehozni ill. törölni szintén a megszokott szintaxissal tudunk:

CREATE TABLE mydb.gyumolcsok (nev STRING, darab INT)
...
DROP TABLE mydb.gyumolcsok

Beleírni a táblába szintén SQL utasításokkal tudunk:

INSERT INTO mydb.gyumolcsok VALUES ('alma', 2);
INSERT INTO mydb.gyumolcsok VALUES ('barack', 3);

A lekérdezés szintén szokásos:

SELECT * FROM mydb.gyumolcsok

Ugyanezt Scala kódból is el tudjuk érni, a következőképpen:

val gyumik = spark.table("mydb.gyumolcsok")
display(gyumik)

Ha megnézzük a Data ikonra kattintva, akkor látjuk a létrehozott adatbázist, táblát és arra kattintva a sémát és az adatokat is. Számos egyéb lehetőség van, aminek az ismeretése nem célja ennek a leírásnak. Dokumentáció:

Delta Lake

A Delta Lake elosztott rendszerekre optimalizált relációs adatbázis rendszer. A Databricks alapértelmezett adatbázis formátuma. Dokumentáció:

Az írás és az olvasás teljesen megszokott:

gyumolcsokDF.write.format("delta").save("/delta/gyumolcsok")
...
val gyumolcsok = spark.read.format("delta").load("/delta/gyumolcsok")

Ha megnézzük fájlrendszer szinten, akkor láthatjuk, hogy Parquet formátumban menti az adatokat.

Amit arról fent már volt szó, a Spark adattáblák tulajdonképpen csak nézetek, alatta számos formátum lehetséges. Ha például lementettük a fenti helyre a gyümölcsöket, akkor a következő utasítással érhetjük el azt, hogy a mydb adatbázison belül gyumolcsok táblanévvel láthassuk (a mydb adatbázisnak léteznie kell):

CREATE TABLE mydb.gyumolcsok USING DELTA LOCATION '/delta/gyumolcsok/'

Ezt követően a fenti Spark SQL parancsokat is ki tudjuk adni.

A Delta táblák néhány további tulajdonsága:

  • Stream módban is tudunk beléjük írni. Opcióként meg kell adnunk a checkpoint könyvtárat és a módot (pl. append).
  • Lekérdezhetőek a historikus adatok, pl. SELECT * FROM mydata VERSION AS OF 0, vagy adott időpontra vonatkoztatva: SELECT * FROM mydata TIMESTAMP AS OF '2020-01-06 08:28:15'
  • Időnként célszerű optimalizálni a táblákat: OPRIMIZE mydata
  • Historikus adatok törlése: VACUUM mydata (az összes historikus adat törlése) ill. VACUUM mydata RETAIN 24 HOURS (az utolsó 24 óra megtartása, a többi törlése).

JDBC

Az SQL adatbázis támogatás Spark szinten történik. Dokumentáció:

A JDBC adatbázis kapcsolat általános felépítése a következő:

val jdbcHostname = "[hostname]"
val jdbcPort = 1433
val jdbcDatabase = "[dbname]"
val jdbcUsername = "[username]"
val jdbcPassword = "[password]"

val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

import java.util.Properties
val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
Class.forName(driverClass)
connectionProperties.setProperty("Driver", driverClass)

val df = spark.read.jdbc(jdbcUrl, "[table]", connectionProperties)
display(df)

A példában a jelszó közvetlenül benne van a kódban, ami nem biztonságos, ehelyett célszerű a Spark Secrets módszert használni, melyről korábban olvashattunk. Az egyes adatbázis rendszerek meghajtói (dirver) és a kapcsolatot leíró string eltérő, a többi viszont megegyezik.

A példában Microsoft SQL szerverhez kapcsolódtunk. Ezt a Databricks alapból támogatja, ahogyan a legtöbb elterjedt adatbázist. Érdemes tájékozódni a lehetőségekről. További támogatott rendszerek:

Egyéb szolgáltatások

A Spark egyéb tipikusan felhő szolgáltatásokat is támogat, melyekről lesz szó részletesen lejjebb, most felsorolás szinten álljon itt néhány:

További Spark-ot használó rendszerek

Nem a DataBricks az egyedüli rendszer, amely a Spark-ot használja. Néhány egyéb, csupán felsorolás szinten:

TODO: nem csak felsorolás szinten legyenek.

Big Data technológiák

Azure Blob tároló

Áttekintés

Az Azure Blob Storage a Microsoft felhő tároló szolgáltatása. Azure Portalon a Storage Accounts alatt érhető el. Felépítése sokban hasonlít a fa szerkezetű fájlrendszerre.

  • A legfelső szint a Storage Account.
  • Mindegyik Storage Accounthoz tartozhat tetszőleges számú Blob konténer.
  • Egyéb lehetőségek (ezekkel nincs tapasztalatom): fájl megosztások, sorok, táblák.

Használata:

  • Storage Explorer (preview): ezt a megfelelő Storage Account kiválasztásával érhetjük el böngészőből.
  • Storage Explorer: önálló grafikus alkalmazás, mely a fő operációs rendszereken (Windows, Linux, MacOS) elérhető. Innen tudjuk letölteni: https://azure.microsoft.com/hu-hu/features/storage-explorer/. A Blob tárolókat vagy bejelentkezéssel, vagy kapcsolat stringgel tudjuk elérni; ez utóbbit a az Access Keys alatt érhetjük el (valamelyik Connection string). Ennek a tudása meghaladja a böngészőben elérhető változatot, pl. segítségével rekurzívan le tudunk teljes könyvtárakat tölteni (a fentivel csak egyesével).
  • azcopy: ez egy parancssori alkalmazás, melynek segítségével fájlokat tudunk másolni Blobról ill. Blobra. Letöltés és dokumentáció: https://docs.microsoft.com/hu-hu/azure/storage/common/storage-use-azcopy-v10. Használata nem egyszerű. A dokumentációban hivatkozott SAS token a következőképpen érhető el: Shared access signature → Generate SAS and conenction string → jobb oldalon le kell görgetni → az SAS token sorban található szöveget kell kimásolni.
  • Programból: lent látunk egy példát arra, hogy Sparkból hogyan tudjuk elérni.

Használata Sparkból

Az Azure Blob tárolót a Spark alapból támogatja. Leírás: https://docs.databricks.com/data/data-sources/azure/azure-storage.html. Alapvetően kétféleképpen tudjuk használni:

  • Felcsatolással (mount): ezután úgy érhetjük el, mintha a Spark fájlrendszer része lenne (dbfs.fs…).
  • DataFrame API segítségével: ez esetben egy globális konfigurációs paraméterként meg kell adni az elérési kulcsot, majd a szokásos fájlműveleteket végre tudjuk hajtani rajta.

Ez utóbbit nézzük meg most. Nyissuk meg a használni kívánt Blob tárolót az Azure Portálban (portal.azure.com → All Resources → Type = Storage account → katt a megfelelőn), majd kattintsunk a Settings alatt Access keys menüpontra. Ott két dologra lesz szükségünk:

  • Storage account name
  • Key (pl. a key1 alatti)

A Spark programba szúrjuk be a következő sort:

spark.conf.set("fs.azure.account.key.[storage-account-name].blob.core.windows.net", "[key]")

ahol a [storage-account-name] a Storage account name (írjuk be a két pont közé simán), a [key] pedig a Key (ez egy hosszú kód, és ahogy a példában is láthatjuk, ezt idézőjelek közé kell tenni).

Ezt követően a következőképpen tudjuk végrehajtani a fájlműveleteket:

dbutils.fs.ls("wasbs://<container-name>@<storage-account-name>.blob.core.windows.net/<directory-name>")

A többi fájlműveletet is tudjuk hasonlóképpen használni.

Data Warehouse

Az Azure Data Warehouse az SQL adatbázis továbbfejlesztése egyrészt úgy, hogy jóval több adat fér el benne (egy adatbázis maximális mérete 1PT, míg a standard Microsoft SQL-én 4 TB), és támogatja a párhuzamos feldolgozást. Mindennek viszont ára van:

  • Szó szerint: havi tízezer Eurós nagyságrendű a bérleti díja.
  • Jóval kevesebb párhuzamos kapcsolatot ill. párhuzamos lekérdezést tesz lehetővé.
  • Van pár technikai hiányosság is, pl. nincsenek benne privát kulcsok, így referenciák se az adattáblák között.
  • Nem igazán kiforrott, sok benne a hiba.

A használata viszont szinte egy az egyben megegyezik a Microsoft SQL használatával.

Kafka

Az Apache Kafka egy elosztott folyam rendszer (distributed streaming platform). Alapfunkciója tehát végső soron az, hogy adatokat fogadjon és továbbítson, hasonlóan pl. Az Apache Camelhez, a különbség a kettő között viszont az, hogy az Apache Kafka elosztott módon, akármennyi feldolgozóval tud működni párhuzamosan, így lényegesen nagyobb (felső korlát nélküli) az "áteresztő képessége". Emiatt a Kafka a big data alapvető építőköve, ráadásul úgy, hogy egymáshoz rendelődnek a Kafka és Spark példányok. Ennek persze ára van: a Kafkának sokkal kevesebb funkciója van, mint a Camelnek. Viszont kiválóan integrálható a Sparkkal, mivel ha ugyanannyi végrehajtó van a Sparkon és a Kafkán is, akkor egy-egy megfeleltetés képezhető közöttük.

Ezt is alapvetően felhő szolgáltatásként szoktuk igénybe venni, az Azure-ön pl. a következőképpen https://docs.microsoft.com/en-us/azure/hdinsight/kafka/. Ha mi magunk szeretnénk beállítani lokálisan (fejlesztési, tesztelési céllal), akkor a lenyíló részben találjuk a leírást.

A lokálisan feltelepített Kafka alkalmas arra, hogy az alapműveleteket végrehajtsuk távoli Kafka példányokon is: téma (topic) létrehozása, azok listázása, üzenet küldése és üzenet fogadása. Ezekre normál esetben alapvetően nincs szükség (a topic automatikusan éltrejön, tipikusan programból írunk a topicra és szintén programból olvasunk belőle), de tesztelési céllal viszont hasznos lehet a tudás. A lenyíló részben láthatjuk ezeket a műveleteket.

A használata során könnyen problémákban ütközünk, így fontosnak tartom leírni, hogy nálam hogyan működött. A legfontosabb dokumentumok, melyeket érdemes elolvasni:

A programok általános felépítése

A lenti programok általános felépítése a következő.

build.sbt

name := "Kafka Receiver Batch"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.1"

A név (Kafka Receiver Batch) helyre az aktuális nevet kell írni.

src/main/scala/hu/faragocsaba/kafka/KafkaReceiverBatch.scala

package hu.faragocsaba.kafka

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._

object KafkaReceiverBatch {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("KafkaReceiverBatch").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    // ide jön a program
  }
}

Az objektum neve és az appName (KafkaReceiverBatch) az aktuális legyen. A naplózást érdemes alacsony (ERROR) szintre venni, egyébként túl sok naplóbejegyzés jelenik meg.

Interaktív módban elég a lent megadott parancsokat beírni. A jar fájlt elküldve a fentit kell lefordítani szokásosan (sbt clean package), és a fent leírtak szerint kell eljárni. Ha hibára fut (fog elég gyakran…), akkor próbálkozzunk a checkpoints könyvtár törlésével.

Az alábbiakban csak a program lényeges részét látjuk.

Adatok írása Kafka topicra kötegelt módban

Először írjunk egy Kafka topicra valamit! A program neve legyen Kafka Sender Batch ill. KafkaSenderBatch.

    val textDs = Seq(
      "nem minden fajta szarka farka tarka csak a tarka fajta szarka farka tarka",
      "sokat akar a szarka de nem birja a farka"
    ).toDS()
    textDs
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "WordCountTopic")
      .save()

A szerver nevét írjuk át a megfelelőre. A következő paranccsal tudunk meggyőződni a program sikerességéről:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic WordCountTopic --from-beginning

Adatok olvasása Kafka topicról kötegelt módban

Olvassuk ki az adott topic tartalmát, és készítsünk róla összegzést! A program javasolt neve {Kafka Receiver Batch}} ill. KafkaReceiverBatch.

    val wordsDf = spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "WordCountTopic")
      .option("startingOffsets", "earliest")
      .option("endingOffsets", "latest")
      .load()
    val wordCounts = wordsDf
      .select(explode(split($"value".cast("string"), "\\s+")).as("word"))
      .groupBy($"word")
      .count
    wordCounts.collect.foreach(println)

Az offsettel tudjuk beállítani azt, hogy mettől meddig olvassa ki az adatokat. Ebben a példában a Kafka topic teljes tartalmát kiolvassuk. A valóságban gyakoribb az, hogy az aktuális kiolvasáskor lementjük azt, ameddig kiolvastuk, és a következőben onnan kezdjük.

Ha mindent jól csináltunk, akkor a következő jelenik meg a képernyőn:

[csak,1]
[sokat,1]
[tarka,3]
[fajta,2]
[minden,1]
[akar,1]
[nem,2]
[de,1]
[farka,3]
[birja,1]
[a,3]
[szarka,3]

TODO: offset

Adatok írása Kafka topicra folyam módban

A fenti megoldások kötegelt (batch) módúak voltak: egyszer végrehajtódott a művelet, és befejeződött a program. Létezik egy másik mód is: a folyam (stream), melyben többé-kevésbé folyamatosan dolgozza fel a program az adatokat (egészen pontosan meghatározott ablakokban úgy, hogy maga a program folyamatosan fut). Mivel a Kafka leginkább nagy adatmennyiségek továbbítására, feldolgozására szolgál, érdemes ezzel is megismerkedni.

Az folyam módú Kafka topicra írás kicsit róka fogta csuka, csuka fogta róka, ugyanis ahhoz, hogy ezt meg tudjuk tenni, már létező folyamra van szükségünk, amit a gyakorlatban nagyrészt valamilyen folyam szolgáltatótól nyerünk, mint pl. a Kafka. Tesztelési céllal használhatjuk a socketről olvasást. Indítsuk el a netcat alkalmazást a fent leírt módon (nc -l -p 9999). Majd a következő módon tudunk ebből kiolvasni (a program javasolt neve Kafka Sender Stream ill. KafkaSenderStream):

    val lines = spark
      .readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    val words = lines.as[String].flatMap(_.split(" "))
    val query = words
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "WordCountTopic")
      .option("checkpointLocation", "checkpoints")
      .start()
    query.awaitTermination()

A példa a beolvasott szöveget szavakra bontja, és úgy írja a topicra. Írjuk be pl. ezt a NetCat konzolon: tarka barka mint a szarka; az eredményt a felbontott szöveg lesz.

Adatok olvasása Kafka topicról folyam módban

Kiolvasásra példa a következő (Kafka Receiver Stream ill. KafkaReceiverStream):

    val wordsDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "WordCountTopic")
      .option("startingOffsets", "latest")
      .load()
    val wordCounts = wordsDf.select(explode(split($"value".cast("string"), "\\s+")).as("word"))
      .groupBy($"word")
      .count
    val query = wordCounts
      .writeStream
      .outputMode("complete")
      .format("console")
      .trigger(ProcessingTime(30000))
      .start()
    query.awaitTermination()

Itt a start offset a legutolsó, azaz a korábbiakat nem veszi figyelembe. Ha az előző program még fut, akkor időnként írjuk valamit a NetCat konzolra, majd úgy félpercenként nézzünk rá erre az ablakra is. Az eredmény kb. így fog kinézni:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| tarka|    1|
|szarka|    1|
| barka|    1|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| tarka|    1|
| marka|    1|
| farka|    1|
|szarka|    1|
| barka|    2|
+------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
| tarka|    2|
| fajta|    1|
| marka|    1|
|minden|    1|
|   nem|    1|
| farka|    2|
|szarka|    2|
| barka|    3|
+------+-----+

Event Hubs

Az Event Hubs a Microsoft elosztott folyam megoldása, kb. a Kafkával egyenértékű. Az Azure-ben Event Hubs Namespace néven érhető el, és a tulajdonképpeni Event Hubokat (amelyek tehát egy témakörnek, azaz topicnak felelnek meg). Egy-egy Event Hubnál megadhatjuk azt, hogy hány párhuzamos részre legyen osztva (célszerű a várható párhuzamos olvasók számára állítani; alapból maximum 32 lehet, de külön lehet kérni többet is), valamint az, hogy mennyi ideig tárolja az adatokat (1-7 nap). Be lehet állítani a rendszeres mentést, megadva a mentési gyakoriságot, a könyvtárstruktúrát stb.

A Databricks alapból támogatja az Event Hubot: https://docs.databricks.com/spark/latest/structured-streaming/streaming-event-hubs.html. A kódban megadandó kapcsolat stringet a következőképpen határozhatjuk meg:

  • Az Azure Portalon válasszuk ki a megfelelő Event Hub Namespace-t, azon belül az Event Hubot.
  • Oldalt kattintsunk a Shared access policies menüpontra.
  • Válasszunk ki egy létezőt, vagy kattintsunk az Add-ra, ott adjunk neki nevet, és adjuk meg, hogy ennek irányítási (Manage), küldési (Send) és/vagy fogadási (Listen) jogosultsága van-e.
  • Jobb oldalon találjuk a kulcsokat és a kapcsolat stringeket is. Használhatjuk bármelyiket.

Az Event Hubs csatlakozáshoz több programozási nyelvhez van könyvtár. Java csatlakozási lehetőségről olvashatunk itt: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send.

TODO: példa

Redis Cache

A Redis Cache (https://redis.io/) egy olyan adatstruktúra, amelyet a szerver a memóriában tárol, és melybe kulcs-érték párokat lehet tenni. Elosztott módban is lehet telepíteni, így megnövelve a tárolható adatmennyiséget. A szokásos műveleteket lehet segítségével végrehajtani: egy kulcsot beállítani ill. azt lekérdezni. A kulcs mindig string, az érték pedig sokféle lehet: string, lista, halmaz, asszociatív tömb (tehát egy kulcs egy kulcs-érték párokat tartalmazó struktúrára hivatkozik), folyam stb. Elvileg van egy online kipróbálható Redis (http://try.redis.io/), de az írás pillanatában nem működött.

A honlapról csak a forrást lehet letölteni, és nem is olyan egyszerű Windows binárist találni. Ez nekem működött: https://github.com/MicrosoftArchive/redis/releases/. Ha feltelepítjük, akkor Windows szervizként állítja be. Ez tartalmaz egy redis-cli.exe programot, azzal tudunk csatlakozni, és kipróbálni a parancsokat. Az alapértelmezett port a 6379, az alapértelmezett titkosított port pedig a 6380. Alapértelmezésben nem kér azonosítót és jelszót, de ez is beállítható.

Egy egyszerű kulcs-érték példa, melyben a SET segítségével állítjuk be az értékeket, GET segítségével kérdezzük le, míg a KEYS a kulcsok listáját adja vissza:

127.0.0.1:6379> SET alma 5
OK
127.0.0.1:6379> SET szilva 3
OK
127.0.0.1:6379> GET alma
"5"
127.0.0.1:6379> KEYS *
1) "szilva"
2) "alma"
127.0.0.1:6379> KEYS a*
1) "alma"

Listák esetén a LPUSH ill. RPUSH műveletet használhatjuk elemek hozzáadásához az elejére (L, left) ill. a végére (R, right), lekérdezni a LRANGE paranccsal tudjuk (a kezdő és a vég indexet kell megadni, 0-tól sorszámozva, ill. -1 jelenti az összeset), a LREM pedig törlésre való (a lista neve után gy számot kell megadni, ami azt jelenti, hogy hány előfordulást szeretnénk törölni; 0 jelenti azt, hogy mindet, a negatív pedig azt, hogy jobbról):

127.0.0.1:6379> LPUSH gyumolcsok alma
(integer) 1
127.0.0.1:6379> LPUSH gyumolcsok korte
(integer) 2
127.0.0.1:6379> LPUSH gyumolcsok szilva
(integer) 3
127.0.0.1:6379> RPUSH gyumolcsok barack
(integer) 4
127.0.0.1:6379> LRANGE gyumolcsok 0 -1
1) "szilva"
2) "korte"
3) "alma"
4) "barack"
127.0.0.1:6379> LREM gyumolcsok 0 alma
(integer) 1
127.0.0.1:6379> LRANGE gyumolcsok 0 -1
1) "szilva"
2) "korte"
3) "barack"

A SADD és SPOP eljárásokkal tudunk halmazokba behelyezni adatokat, ill. véletlenszerű sorrendben kivenni belőle:

127.0.0.1:6379> SADD zoldsegek repa
(integer) 1
127.0.0.1:6379> SADD zoldsegek salata
(integer) 1
127.0.0.1:6379> SADD zoldsegek retek
(integer) 1
127.0.0.1:6379> SPOP zoldsegek
"retek"
127.0.0.1:6379> SPOP zoldsegek
"repa"
127.0.0.1:6379> SPOP zoldsegek
"salata"

Asszociatív tömb esetén beállítani a HMSET segítségével tudunk, ahol a kulcs után egymás után felsoroljuk a kulcsokat és értékeket. A HGETALL az összes kucslot és értéket visszaadja, a HGET egy értéket (itt tehát meg kell adnunk a fő kulcsot és az érték kulcsát is), a HSET pedig egy kulcs értékát átállítja:

127.0.0.1:6379> HMSET gyumolcs1 nev alma ar 250
OK
127.0.0.1:6379> HMSET gyumolcs2 nev korte ar 500
OK
127.0.0.1:6379> HMSET gyumolcs3 nev szilva ar 300
OK
127.0.0.1:6379> HGETALL gyumolcs2
1) "nev"
2) "korte"
3) "ar"
4) "500"
127.0.0.1:6379> HGET gyumolcs3 nev
"szilva"
127.0.0.1:6379> HSET gyumolcs3 nev cseresznye
(integer) 0
127.0.0.1:6379> HGETALL gyumolcs3
1) "nev"
2) "cseresznye"
3) "ar"
4) "300"

Igen sok programozási nyelvet támogat: https://redis.io/clients. Kettőt emelek ki:

Cassandra

TODO: kidolgozni

Elastic Search

TODO: kidolgozni

Apache Hive

TODO: kidolgozni

Apache Nifi

TODO: kidolgozni

Apache Beam

TODO: kidolgozni

Kubernates

TODO: kidolgozni

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License