Big Data

Áttekintés

A big data napjaink egyik felfutó témája. Magyarra szó szerint úgy lehetne fordítani, hogy nagy adat, de nincs elterjedt magyar elnevezése. Fontos: ebben a leírásban az szerepel, ahogy a témát én megértettem, ahogy én látom. Igyekeztem az objektivitásra törekedni, de mivel nem vagyok a téma szakértője, hibák az oldalon lehetségesek. Bízom abban, hogy mást is segít a megértésben, viszont egy fontos döntést senki ne alapozzon pusztán erre.

Óriási mennyiségű adat áll rendelkezésünkre, hatalmas sebességgel keletkezik az új adat, ráadásul az új adat keletkezésének az üteme exponenciálisan gyorsul. Nincs tűéles határ a nagy mennyiségű adat és a big data között. Definíció szerint big datának nevezzük azt az adatmennyiséget, amelynek a feldolgozása hagyományos eszközökkel nem lehetséges, vagy legalábbis jelentős nehézségekbe ütközne. Pontosabban definiálva, a következő 3 feltétel legalább egyikének teljesülnie kell ahhoz, hogy az adatot big datának nevezhessünk (az angol elnevezés kezdőbetűjéből kiindulva ezt 3V-nek nevezzük):

  • Volume, azaz adatmennyiség: az adat nem fér el egy tárolóegységen, vagy a feldolgozáshoz szükséges adatmennyiség nem fér el a memóriában.
  • Velocity, azaz sebesség: az adatok keletkezési sebessége olyan mértékű, hogy egyetlen számítógép nem képes annak feldolgozására.
  • Variety, azaz változatosság: az adatokat nem tudjuk hagyományos módon, pl. egy relációs adatbázisnak sorokból és oszlopokból álló adattáblájában tárolni.

Példaként vegyük mondjuk a YouTube-ot, mely mindhárom feltételnek megfelel: nincs az a hatalmas merevlemez melyen elférne az összes feltöltött videó; nincs az a szuperszámítógép, mely képes feldolgozni a feltöltés alatt álló videókat és képes kiszolgálni a megtekintéseket; egy filmet (különösen egy élő stream-et) nem lehet beszúrni egy adatbázis cellába.

A feladatot olyan kis egységekre kell bontani, mellyel már egy számítógép egység is megbirkózik, de ez nem egyszerű. Az alábbiakban bemutatom a legelterjedtebb eszközt, melynek segítségével ez megvalósítható. Előtte viszont lássunk két fogalmat, melynek megjelenése, vagy legalábbis elterjedése szorosan kapcsolható a big datához:

  • Felhő (cloud): számítási- és tárhely szolgáltatásokat vásárolhatunk, akár virtuális környezet formájában (ahova mi magunk telepítjük fel az eszközöket), akár magukat a szolgáltatásokat (pl. adatbázis) is közvetlenül megvásárolhatjuk. Előnye a változtathatóság: ha nincs rá szükségünk, visszaadhatjuk, ha pedig nagyobbra van igényünk, akkor dinamikusan növelhetjük, így pont annyit fizetünk, amennyit használunk belőle. A big datához több pontos is kapcsolódik: az igényelt környezetet a háttérben több számítógép szolgálhatja ki, ill. közvetlenül az, hogy a lent felsorolt big datás technológiákat is megvásárolhatjuk.
  • Folyam (stream): valószínűleg a big datától függetlenül is megjelent, a big datában viszont megkerülhetetlen, így egyre több programozási nyelvbe kerül bele. A Java nyelvbe a 8-as verzióval került. Arról van szó, hogy a big datában olyan nagy mennyiségű adatokról beszélünk, amely nem fér bele egyszerre a memóriába, ezért mintegy folyamként kell egyesével "végighúzni" az elemeket és azokat feldolgozni. Aki a konvencionális programozáshoz szokott, ahol egy-egy utasítás konkrét hardver szintű végrehajtási lépéseket jelent, annak nehézséget jelenthet a stream filozófia megértése. Ebben ugyanis alapvetően kétféle utasítás van: az egyik azt mondja meg, hogy ha rá kerül a sor, akkor milyen lépést hajtson végre (tipikusan ezek az átalakító és szűrő utasítások), de ezek valójában önmagukban nem csinálnak semmit. A másik viszont kiváltja a folyamot annak minden lépésével, mert az már eredményt számol ki. Ilyen lehet pl. számfolyam esetén a maximális elem meghatározása. Pl. ha a bemenet egy számfolyam, az első kategóriába olyan utasításokat írhatunk, hogy szűrd ki a negatív számokat vagy szorozd meg a számokat kettővel, de önmagában ha csak ennyit adnunk meg, a végrehajtás nem kezdődik el. A maximális elem meghatározásához viszont már vére kell hajtani a folyamat minden lépését.
  • Funkcionális programozás (functional programming): ez létezett korábban is, viszont új reneszánszát éli a big datával. Amint arról már volt szó, a megoldásban olyan lépésekre kell bontani a feladatot, mely egy számítógépen elvégezhető. Egy-egy ilyen egység paraméterül megkapja a feladat elvégéséhez szükséges összes adatot, ami visszaadja a kiszámolt eredményt. Ezek tehát valójában melléghatás nélküli függvények, melyek a funkcionális programozás legfontosabb építőkövei. A folyam műveleteit (tehát pl. azt, hogy mit mire konvertálunk vagy hogy hogyan szűrünk) általában megfelelő függvényekkel adjuk meg. A funkcionális nyelvekben a függvény ún. első osztályú állampolgár (first class citizen): pl. át tudunk adni függvényt paraméterként. Valamint helyben is megadhatjuk, hogy milyen inputra milyen outputot adjon, ezáltal tömörítve a kódon. A folyam műveletekkel a Java 8-ban a funkcionális programozás alapjai is megjelentek.

Bit data keretrendszerek

A big data keretrendszerek fő feladata az, hogy elrejtse a nagy rendszerek elosztott voltából adódó komplexitást a fejlesztő elől. Mivel adatokról beszélünk, a legfontosabb műveletek a beolvasás, az átalakítás és az eredmény kiírása.

Apache Hadoop

Ez volt az első ismert big data keretrendszer, ami egyrészt bevezette az elosztott adattárolót (a HDFS fájlrendszert), valamint a MapReduce néven ismertté vált big data feldolgozót. Ma már a helyét átvette a vele kompatibilis Spark, de a történeti hűség érdekében érdemes ezzel is nagy vonalakban megismerkedni.

A MapReduce algoritmus az adatokat részekre osztja, párhuzamosan feldolgozza és az eredményt egyesíti. A lépései:

  • leképezés (map): <kulcs1, érték1> → lista (<kulcs2, érték2>)
  • kombinálás (combine): ugyanahhoz a kulcshoz tartozó értékek listájának elkészítése
  • redukálás (reduce): <kulcs2, lista (<érték2>)> → lista (<kulcs3, érték3>)

A fejlesztőnek a map és reduce függvényeket kell megvalósítania, a kombinálást a keretrendszer végzi.

A big data helló világ programja a szó számlálás; ebben a feladatban van egy óriási szöveg adatbázis, mely nemcsak hogy a memóriában nem fér el, de még egy számítógépen sem. A feladat megszámolni azt, hogy melyik szó hányszor fordul elő. A fenti algoritmusban viszont mindhárom lépés végrehajtható a megszokott módon. A megoldás megértéséhez lássuk, hogyan működik a fenti módszerrel a szószámláló!

  • Az input tehát egy óriási szövegfájl. Bontsuk fel sorokra; a kulcs1 tehát legyen egy sor ebben a fájlban, az érték1 pedig legyen üres. Itt azt feltételezzük, hogy egy sor már belefér a memóriába. Vegyük észre, hogy Párhuzamosan akárhány számítógép feldolgozhatja a sorokat, pl. az elsőt, a negyediket, hetediket stb. az A, a másodikat, ötödiket, nyolcadikat stb. a B, a harmadikat, hatodikat, kilencediket stb. pedig a C. Ebben a példában tehát van egy beolvasó, amely rögtön három folyamnak (stream-nek) adja tovább a feladatot. Vegyünk egy példát is: egy ilyen sor legyen az, hogy "Az alma nem esik messze a fájától." A kulcs1 tehát ez a mondat lesz, az érték pedig nem definiált.
  • Leképezés: bontsuk fel a kulcs1-et (tehát az épp feldolgozás alat tálló sort) szavakra. Ebből képezzünk kulcs-érték párokból álló listát, mégpedig úgy, hogy a kulcs a szó legyen, az érték pedig minden esetben 1. Ezt a lépést el lehet végezni egy számítógépen belül, és vegyük észre azt is, hogy más adatra nem volt szükség a lépés elvégzéséhez. A példában ez lesz az eredmény: (az, 1), (alma, 1), (nem, 1), (esik, 1), (messze, 1), (a, 1), (fájától, 1).
  • Kombinálás: a Hadoop rendszer képes arra, hogy az egyes folyamokból érkező outputokat, azaz kulcs-érték listákat kombinálja, azaz létre hozza az ugyanahhoz a kulcshoz (jelen esetben: ugyanahhoz a szavakhoz) tartozó értékeket. Ebben az esetben az érték mindegyik esetben 1 lesz, és a lista olyan hosszú lesz, ahányszor az adott szó előfordul. Itt feltételezzük, hogy a részeredmény belefér a memóriába, azaz jelen esetben maga a szó is és annyi egyes, ahányszor az adott szó előfordul összesen az adatbázisban. Egy példa: az outputra: "fájától" → (1, 1, 1, 1, 1).
  • Redukálás: itt tehát egy kulcsból és hozzá tartozó érték listából kell egy kulcsot (ami lehet ugyanaz, mint a bejövő, de más is) és egyetlen értéket produkálni. Vegyük itt is észre azt, hogy a feladat elvégzéséhez nem kell látni a feldolgozandó szöveg egyéb részeit, így bele fér a memóriába. A példában, mivel előfordulást számolunk, a számokat össze kell adnunk. A kulcs nem változik. Pl. ebből az inputból: "fájától" → (1, 1, 1, 1, 1) a következő output lesz: "fájától" → 5.
  • Végül a keretrendszer ismét összekombinálja a részeredményeket. Itt is feltételezhetjük azt, hogy az eredmény elfér a memóriában.

Apache Spark

Az Apache Spark valójában a Hadoop továbbgondolása. Néhány fontosabb komponense:

  • Fürt (cluster) vezérlés, melynek része a HDFS fájlrendszer is. Ez az a rész tehát, melynek segítségével egyben láthatjuk az háttérben valójában elosztott erőforrásokat.
  • Az elosztott rendszert támogató adattípusok: rugalmas elosztott adathalmaz (Resilient Distributed Dataset, RDD), adat keret (DataFrame); adat halmaz (DataSet).
  • Spark SQL, mellyel a fenti adattípusokon lekérdezéseket lehet végrehajtani.
  • Spark Streaming, melynek segítségével az adatfolyamokat lehet kezelni.
  • MLib: gépi tanulás komponens.
  • GraphX a gráf műveletek végrehajtásához.

Négy programozási nyelvet támogat: Scala, Java, Python és R, valamint ötödikként a natív SQL-t is; ebben a leírásban a Scala-ról lesz szó.

Kezdő lépések

A Spark-ot élesben alapvetően felhő szolgáltatásként vesszük igénybe (ld. lejjebb a Databricks részt), és ritkán alakítjuk ki magunknak a környezetet. Ebben a részben a lokális használat lépéseit láthatjuk.

Telepítés

A telepítés Windows operációs rendszer alá nem egy egyszerű letöltés, indítás majd Next -> Next -> … -> Finish, hanem elég sok buktatója van. A legtöbb ezzel kapcsolatos leírás elnagyolt és használhatatlan. Nekem ez a leírás segített: https://dzone.com/articles/working-on-apache-spark-on-windows, bár itt is van pár dolog, ami másképpen működött. A következő recept nálam működött:

  • Telepítsük fel a Java SDK-t. Ellenőrizzük a következő parancsokkal: javac -version, java -version
  • Telepítsük fel a Scala SDK-t. Ellenőrizzük a következő parancsokkal: scala -version, sbt version
  • Töltsük le a Spark-ot innen: https://spark.apache.org/downloads.html (pl. 2.4.0 Hadoop 2.7-tel).
  • Tömörítsük ki egy könyvtárba, pl. ide: c:\programs\spark-2.4.0-bin-hadoop2.7.
  • Töltsük le a winutils.exe állományt innen: https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe.
  • Hozzunk létre egy kamu Hadoop könyvtárat, pl. c:\programs\hadoop, és annak a bin könyvtárába másoljuk a fenti winutils.exe fájlt. Tehát ebben a példában ez ide kerül: c:\programs\hadoop\bin\winutils.exe.
  • Állítsuk be az alábbi környezeti változókat:
    • SPARK_HOME legyen a Spark telepítés gyökere, pl. c:\programs\spark-2.4.0-bin-hadoop2.7.
    • HADOOP_HOME legyen a kamu Hadoop könyvtár, pl. c:\programs\hadoop.
    • Adjuk a PATH környezeti változóhoz a Spark bin könyvtárát, pl. c:\programs\spark-2.4.0-bin-hadoop2.7\bin.
    • Adjuk a PATH környezeti változóhoz a kamu Hadoop bin könyvtárat, pl. c:\programs\hadoop\bin.
  • Hozzuk létre a következő könyvtárat: c:\tmp\hive.
  • Adjuk ki a következő parancsot: winutils.exe chmod -R 777 C:\tmp\hive

Ezt követően a https://spark.apache.org/docs/latest/quick-start.html oldalon leírtak már nagyobbrészt használhatóak, bizonyos módosításokkal.

Spark shell

Indítsuk el a Spark Shell-t: spark-shell, és utána Spark-os Scala shell parancsokat tudunk kiadni. Példaként adjuk ki az alábbi parancsokat:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_161)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val textFile = spark.read.textFile("c:\\programs\\spark-2.4.0-bin-hadoop2.7\\README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.count()
res0: Long = 105

scala>

A Spark Shell két fő objektumot nyújt:

A fenti példában egy kis szövegfájlt olvasunk be a SparkSession.read függvény segítségével, viszont azzal, hogy a Spark beolvasóját használtuk, elosztott rendszeren levő adatmennyiséget is kezelni tudunk, amit a háttérben a rendszer megfelelően bont le feladatokra. Figyeljük meg, hogy a beolvasás eredménye Dataset típusú.

Ha olyan külső függőségünk is van, ami nem része a Spark-nak, akkor a következő lehetőségeink vannak:

  • spark-shell —jars [filename].jar - abban az esetben célszerű használni, ha nem található meg a központi repository-ban.
  • spark-shell —packages [group]:[artifact]:[version] - ha megtalálható a központi repository-ban, akkor ezt érdemes használnunk.

Futás közben a http://localhost:4040 oldalon tudjuk megnézni a részleteket.

Önálló alkalmazások

Valódi értelme az önálló alkalmazásoknak van. Ehhez az alábbiakat kell végrehajtanunk:

  • Létre kell hoznunk egy Scala projektet, a Spark függőséggel.
  • Példányosítanunk kell a SparkSession-t (amit fent a Spark Shell megtett helyettünk).
  • A lefordított alkalmazást el kell küldenünk a Spark-nak.

Lássuk a lépéseket! Hozzunk létre egy build.sbt fájlt az alábbi tartalommal:

name := "Spark Example"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided"

Hozzuk létre a src/main/scala/SparkExample.scala fájlt az alábbi tartalommal:

import org.apache.spark.sql.SparkSession

object SparkExample {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("Spark Example").getOrCreate()
    val textFile = spark.read.textFile("c:\\programs\\spark-2.4.0-bin-hadoop2.7\\README.md")
    println("Number of lines: " + textFile.count())
    spark.stop()
  }
}

Fordítsuk le a példaprogramot az alábbi paranccsal:

sbt package

Végül küldjük el az eredményt ezzel a paranccsal:

spark-submit --class "SparkExample" --master local[4] target/scala-2.11/spark-example_2.11-1.0.jar

A példa a fenti pár Spark Shell sor megvalósítása. Ha minden rendben ment, a hosszú log üzenetek között elveszve megtaláljuk ezt a rövidke sort: Number of lines: 105. Pár megjegyzés:

  • A fordításkor ha proxy mögött vagyunk, akkor az SBT_OPTS környezeti változót állítsuk be megfelelően, pl. set SBT_OPTS=-Dhttp.proxyHost=[proxy] -Dhttp.proxyPort=[port] -Dhttps.proxyHost=[proxy] -Dhttps.proxyPort=[port] -Dhttp.nonProxyHosts=localhost.
  • Ez érzékeny a Scala verzióra; az SBT fájlban akkor is a 2.11.12-t kell megadnunk, ha valamelyik 2.12-es verziójú Scala van feltelepítve. Ellenkező esetben fordításkor a kicsit semmitmondó java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/LambdaDeserialize hibaüzenetet kapjuk.
  • A quick start példában nem szerepel a "provided" rész. Ez a kulcsszó azt jelenti, hogy az említett függőséget nem kell beletenni az eredménybe, csak a fordításhoz kell, mert a futtató környezetben jelen van, így ebben az esetben érdemes odatenni.
  • Ha ténylegesen 2.12-es Scala van telepítve, akkor kapunk egy ilyen hibaüzenetet: The Java Development Kit (JDK) installation you have is not up to date. sbt requires at least version 6+, you have version 0. Ez hülyeség, hagyjuk figyelmen kívül. Ha a Scala 2.11.12-t használunk, akkor ez a hibaüzenet nem jelentkezik.
  • A végrehajtáskor a végén kapunk egy ilyen hibaüzenetet, hosszú stack trace-szel: java.io.IOException: Failed to delete: C:\Users\[username]\AppData\Local\Temp\spark-[some-long-hexa-code]\userFiles-[some-another-long-hexa-code]\[projectname].jar. Ez egy Windows specifikus hiba lehet, hagyjuk ezt is figyelmen kívül.
  • A local[4]-ben a 4 a szálak száma; ezt célszerű a processzorok számára állítani.
  • A külső függőségeket a Spark Shell szakaszban leírtak szerint tudjuk itt is alkalmazni.
  • Az alkalmazások küldéséről részletesen ezen az oldalon olvashatunk: https://spark.apache.org/docs/latest/submitting-applications.html.

Ha programból RDD-t szeretnénk használni (ld. lejjebb), akkor a már említett az sc-re van szükségünk. Ezt kétféleképpen is megkaphatjuk: közvetlenül létrehozva, vagy a spark-tól lekérve.

A fenti példa a Dataset adattípust használja. Közvetlenül RDD használatához álljon itt a fenti példa! A Spark Shell futtatás eredményének lényegi része az alábbi:

scala> val textFile = sc.textFile("c:\\programs\\spark-2.4.0-bin-hadoop2.7\\README.md")
textFile: org.apache.spark.rdd.RDD[String] = c:\programs\spark-2.4.0-bin-hadoop2.7\README.md MapPartitionsRDD[1] at textFile at <console>:24

scala> textFile.count()
res0: Long = 105

Láthatjuk, hogy az eredmény ugyanaz mint fent, viszont a létrejött adattípus org.apache.spark.rdd.RDD[String]. Ugyanez önálló alkalmazásban: a fent leírtakat kövessük, a fájl neve legyen RddExample.scala, tartalma pedig az alábbi:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object RddExample {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("RDD Example").setMaster("local")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile("c:\\programs\\spark-2.4.0-bin-hadoop2.7\\README.md")
    println("Number of lines: " + textFile.count())
    sc.stop()
  }
}

Értelemszerűen nevezzük át a build.sbt fájlban is a nevet, valamint a küldésnél is figyeljünk a különbségre.

Ez utóbbit valószínűleg ritkán fogjuk használni, a SparkSession-ös megoldás ugyanis felülről kompatibilis a SparkContext-tel: ha ugyanis van egy SparkSession típusú, tipikusan spark nevű objektumunk, akkor a SparkContext típusú, sc nevű objektumot lekérdezéssel megkapjuk: val sc = spark.sparkContext().

Az alábbiakban a példák úgy vannak összeállítva, hogy azok Spark Shell-ből és önálló alkalmazásként is működjenek. Spark Shell-ben adott a spark és az sc objektum, az önálló alkalmazások esetében pedig a fent megadott módokon kell létrehozni; a példákban az azt követő részleketet kell lecserélni.

Látható, hogy (legalábbis az írás pillanatában) eléggé kínszenvedés végrehajtani a kezdő lépéseket, nem tartom igazán kiforrottnak, különösen nem házi használatra. A valóságban a Spark-ot tipikusan nem saját magunknak kell összeraknunk, hanem felhő szolgáltatásként vásároljuk, így a fentiekkel szerencsére nemigazán kell törődnünk.

Folyamat

Amint azt a következő alfejeztben látni fogjuk, a Spark definiál néhány adattípust (ezek a háttérben egyébként szoros kapcsolatban vannak egymással), amelyek segítségével elosztott műveleteket lehet végrehajtani. Ezek az adattípusok nem módosíthatóak.

A működés dióhéjban a következő: az említett adattípusok segítségével a vezérlő (driver) program a feladatot részekre tudja osztani, azt el tudja küldeni a fürtben (cluster) levő végrehajtóknak (executor), majd összesíti az eredményt.

Az adatokon végrehajtható műveleteket két fő csoportba oszthatjuk, és a különbség megértése talán a legkritikusabb az egész big data technológia megértése szempontjából:

  • Átalakítások (transformations): ezek a függvények azt határozzák meg, hogy az adatok milyen átalakuláson menjenek keresztül. A legfontosabb a MapReduce algoritmusból örökölt map() függvény, ami átalakítja az adott adatot valami másra, pl. val lineLengths = textFile.map(s => s.length). A Spark talán legnagyobb előrelépése a Hadoop-hoz képest az, hogy ezen a ponton egyéb műveleteket is végre tud hajtani, melyekről a fenti linkeken olvashatunk. Fontos függvény még a szűrés (filter()). Nagyon fontos, hogy ezek a függvények ún. lusta (lazy) kiértékelésűek: önmagában ezek hatására egyelőre semmi sem történik! Ez a tulajdonsága teszi lehetővé azt, hogy a Spark megfelelően tudjon optimalizálni.
  • Akciók (actions): e műveletek hatására számolódik ki az eredmény, azaz egy ilyen művelet eredményezi azt, hogy lefussanak az átalakítások is. A MapReduce algoritmusból ide tartozik a reduce() függvény, amely egy olyan függvényt vár paraméterül, melynek két azonos típusú paramétere van, az eredménye egy ugyanolyan típusú érték, a végrehajtott művelet pedig asszociatív és kommutatív. A fenti példát folytatva a val totalLength = lineLengths.reduce((a, b) => a + b) a fájl teljes hosszát adja eredményül. Itt is számos egyéb művelet van a reduce() mellett, melyeket szintén a fenti linkeken olvashatunk.

Amint arról már szó volt, van egy vezérlő, amely kiosztja a feladatokat, és összegzi az eredményt. Produktív környezetben ez tipikusan azt jelenti, hogy a vezérlést más számítógép más processzora kapja meg, ahol a saját virtuális gépen történik a számolás. Ez a következőket jelenti (a példákban a distData tetszőleges elosztott adatot jelent, pl. akár a fenti lineLengths is lehet ilyen):

  • Az egyes feladatok megkapják azokat a külső változókat, melyeket felhasznál a művelet (tegyük fel, hogy a fenti példában (a, b) => a + b + c szerepel, ahol c egy globális változó), azt viszont tilos módosítani, mivel a vezérlőnél található "prototípus" nem változik. (Azaz ha módosítjuk, a végeredmény szinte biztos, hogy nem az elvárt lesz.)
  • Mivel az egyes komponenseken fut a feldolgozás párhuzamosan, pl. a kiírás (pl. a distData.foreach(println)) is az adott környezetben történik.
  • Ez azt is jelenti, hogy a működés eltér a lokális és az elosztott környezetben. (Pl. lokális környezetben a kiírás is lokálisan történik, ill. a globális változó ténylegesen módosul.) Emiatt óvatosan kell programoznunk, és mindig fel kell tennünk a kérdést, hogy ugyanúgy fog-e működni a programunk lokálisan mint élesben.
  • A collect egy olyan függvény, amelynek következtében az aktuális értékek a vezérlőhöz kerülnek (pl. distData.collect.foreach(println)). Ezzel vigyáznunk kell, mert nagyméretű adat esetén elfogyhat a memória. Tipikusan szűrést követően szokás használni. Tesztelési céllal vehetjük pl. az első 10 éréket (distData.take(10).foreach(println)) és azon hajthatjuk végre a műveletet.

A függvényeket többféleképpen is megadhatjuk:

  • Scala anonymous függvényként, ahogy fent is láthattuk. Egyszerűbb esetekben ezt célszerű használni.
  • Globális singleton objektumok statikus függvényeként. Összetettebb esetekben, ha ki szeretnénk szervezni a logikát, akkor használhatjuk ezt a módszert.
  • Egy osztálypéldány függvényét is átadhatjuk, viszont ez esetben a teljes példány szerializálódik és átadódik; ezt lehetőleg kerülnünk érdemes.

Minden egyes akció a transzformációk újbóli lefutását eredményezik. Tetszőleges elosztott adatra kiadhatjuk a persist() függvényt, amivel azt jelezzük a rendszernek, hogy szeretnénk, ha eltárolná a részeredményt, és a következő művelet azt használja fel. Alapértelmezésben a memóriában próbálja tárolni az eredményt, és ha nem fér el, akkor bizonyosakat nem tárol el, hanem szükség esetén újra kiszámolja. Az finomhangolható, hogy pontosan hogyan hajtsa végre, pl. ha nem fér el a memóriában, akkor lemezre is írhatja.

Léteznek olyan műveletek, melyek a feladatok újra elosztását (shuffle) váltják ki. Valójában az alap MapReduce is tartalmaz a map és a reduce között egy implicit shuffle-t, de ezt a Spark-ban explicit is kiválhatjuk, pl. a reduceByKey() függvénnyel. Ebben az esetben (valójában az eredeti MapReduce algoritmusnak megfelelően) kulcs-érték párokról van szó, és az azonos kulcsokhoz tartozó értékeken szeretnénk végrehajtani a reduce-t. Viszont az azonos kulcsokhoz tartozó értékek szét vannak szórva a fürt különböző részein, tehát a művelet során össze kell gyűjteni minden egyes kulcshoz tartozó minden értéket minden egységről, majd újra szétosztani a feladatokat. Ez egy nagyon erőforrás igényes művelet, így óvatosan hajtsuk végre. Nem elég ismételni: a műveletnek magának kommutatívnak és asszociatívnak kell lennie, így a Spark tud úgy optimalizálni, hogy az egyes komponenseken végrehajtja a rész reduce műveletet, utána hajtja csak végre (kisebb adatmennyiségen) a shuffle-t, majd osztja szét ismét, hogy folytassák a reduce-t, végül összegyűjti az eredményeket. A shuffle műveletek is lazy műveletek.

Amint arról szó volt, kerülnünk kell a globális változók használatát az átalakítások és az akciók során is. Itt érdemes megismerkednünk a closure fogalmával, ami dióhéjban egy függvény környezetét jelenti, tehát magát a függvényt, a paramétereket és azokat a külső változókat, melyeket használ. A programunk megírásakor mindig arra kell gondolnunk, hogy ez élesben akár több tucat rendszeren fog futni párhuzamosan, és egy-egy műveletnél mindegyiknek el kell küldeni a closure-t, ami nem gazdaságos. Ráadásul ha írni is szeretnénk a globális változót, akkor az nehezen felderíthető hibákhoz vezet. Ha mindenképpen globális változót szeretnénk használni, akkor az alábbi lehetőségeink vannak:

  • Ha nagyobb mennyiségű változó szeretnénk csak olvasni, akkor használhatjuk a broadcast-ot (pl. val broadcasted = sc.broadcast(Array(1, 2, 3))).
  • Ha módosítani szeretnénk egy globális értéket, akkor azt ún. akkumulátor változókban tehetjük meg. Léteznek egész (longAccumulator) és lebegőpontos (doubleAccumulator) akkumulátor változók, de mi magunk is készíthetünk saját típust, az AccumulatorV2 absztrakt osztályból származva, megvalósítva a szükséges műveleteket. Nagyon fontos, hogy a műveletek kommutatívak és asszociatívak legyenek, különben helytelen eredményt kapunk. Egy példa a helyes működésre: val accum = sc.longAccumulator("My Accumulator"), majd sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)).

RDD, DataFrame, Dataset

Az adattípusok bemutatásakor a következő feladatot hajtjuk végre. Egy adat két részből áll: egy megnevezésből és egy számból. A példában a megnevezés gyümölcs lesz, a szám meg mondjuk értelmezhető darabként. Az alábbi műveletet fogjuk többféleképpen végrehajtani:

  • Vegyük csak azokat a gyümölcsöket, melyek melletti szám nagyobb mint kettő.
  • Számoljuk ki és jelenítsük meg a gyümölcsönkénti összegeket és átlagokat.

Mindhárom adattípusnál megpróbáljuk többféleképpen létrehozni az adatot majd többféleképpen feldolgozni. Mielőtt belevágunk, hozzuk is létre a gyumolcsok.txt fájlt az alábbi tartalommal:

alma,5
körte,8
szilva,3
alma,7
banán,2
körte,1

RDD

Az RDD a Resilient Distributed Dataset rövidítése, ami magyarul kb. rugalmas elosztott adathalmazt jelent. Erről részletesen a https://spark.apache.org/docs/latest/rdd-programming-guide.html oldalon olvashatunk, az API itt található: https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html. Ez az adattípus már kissé túlhaladott, a DataFrame és Dataset átvette a szerepét, viszont a működés megértéséhez nélkülözhetetlen az RDD ismerete.

RDD-t alapvetően háromféleképpen lehet létrehozni:

  • Egy létező Scala struktúrából az sc.parallelize() függvénnyel, pl. val gyumolcsokRDDCreated = sc.parallelize(Seq("alma,5", "körte,8", "szilva,3", "alma,7", "banán,2", "körte,1")).
  • Beolvasással, pl. val gyumolcsokRDDRead = sc.textFile("gyumolcsok.txt").
  • Egy másik elosztott adattípusból (RDD, DataFrame, Dataset), átalakítással.
  • DataFrame-ből vagy Dataset-ből (ld. lejjebb), az rdd függvény segítségével.

Az adatokat legegyszerűbben a következő utasítással tudjuk megjeleníteni: gyumolcsokRDDCreated.collect.foreach(println), gyumolcsokRDDRead.collect.foreach(println). Az eredményből láthatjuk, hogy ugyanazt kaptuk. Válasszuk ki az egyiket, adjuk értékül a gyumolcsokRDD objektumnak, és azon dolgozzunk tovább: val gyumolcsokRDD = gyumolcsokRDDCreated.

Az imént létrehozott példákban az adat típusa szöveg. Bontsuk fel (szöveg, szám) típusra, egyúttal megismerkedünk a map() függvénnyel:

val gyumolcsokRDDSplit = gyumolcsokRDD.map(gyumolcs => {
    val splitted = gyumolcs.split(",")
    (splitted(0), (splitted(1).toInt))
})

Ezt megjelenítve a következőt kapjuk eredményül:

scala> gyumolcsokRDDSplit.collect.foreach(println)
(alma,5)
(körte,8)
(szilva,3)
(alma,7)
(banán,2)
(körte,1)

Végül végrehajtjuk a fent vázolt algoritmust, miközben megismerünk pár fontos függvényt:

  • A filter() függvény egy predikátum függvényt vár paraméterül. Jelen esetben a második paraméternek kell nagyobbnak lennie 2-nél, azaz filter(x => x._2 > 2) (vagy tömörebben filter(_._2 > 2)).
  • Az átlagszámításhoz az összegen kívül a darabszámra is szükség van. Mivel a következő függvény, a reduceByKey() olyan adattípust vár, mely két részből áll, a mostani (név, szám) értéket átalakítjuk a következőre: (név, (szám, 1)): map(x => (x._1, (x._2, 1))).
  • A shuffle művelet során összeadjuk az azonos kulcshoz tartozó érték kettős értékeit: reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)). Ez nagy adathalmaz esetén drága művelet, mert a vezérlő összegyűjti a részeredményeket, majd kulcsonként ismét szétküldi.
  • Végül kiszámoljuk az összeget és átlagot: map(x => (x._1, x._2._1, 1.0 * x._2._1 / x._2._2)).

Összefoglalva:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val gyumolcsokRDDOsszegzett = gyumolcsokRDDSplit
    .filter(x => x._2 > 2)
    .map(x => (x._1, (x._2, 1)))
    .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
    .map(x => (x._1, x._2._1, 1.0 * x._2._1 / x._2._2))

// Exiting paste mode, now interpreting.

gyumolcsokRDDOsszegzett: org.apache.spark.rdd.RDD[(String, Int, Double)] = MapPartitionsRDD[28] at map at <pastie>:29

scala> gyumolcsokRDDOsszegzett.collect.foreach(println)
(körte,8,8.0)
(szilva,3,3.0)
(alma,12,6.0)

scala>

A feladat megoldható, de nehezen áttekinthető, ráadásul fordítási időben szinte semmilyen hibát nem tudunk felderíteni.

DataFrame

A DataFrame adattípus az R-hez hasonlóan az adatok táblázatos elrendezését jelenti. A cél az volt, hogy segítségével SQL utasításokat tudjunk kiadni. A Spark 2.0 verziótól kezdve a DataFrame nem más, mint egy Dataset[Row] alias. Így az API dokumentáció is közös: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html; a DataFrame-re nem típusos Dataset-ként hivatkozik.

DataFrame-et is többféleképpen létrehozhatunk:

  • Tetszőleges szekvenciából a toDS() függvény segítségével, pl. val gyumolcsokDF1 = Seq(("alma", 5), ("körte", 8), ("szilva", 3), ("alma", 7), ("banán", 2), ("körte", 1)).toDF, vagy az oszlopnevek megadásával: val gyumolcsokDF2 = Seq(("alma", 5), ("körte", 8), ("szilva", 3), ("alma", 7), ("banán", 2), ("körte", 1)).toDF("nev", "darab"). (Ehhez az önálló alkalmazások esetén szükség van erre: import spark.implicits._, mégpedig azt követően, hogy létrejött a spark.)
  • Egy RDD-ből, szintén a toDS() függvény segítségével, pl. val gyumolcsokDF3 = gyumolcsokRDDSplit.toDF("nev", "darab")
  • A spark.createDataFrame(data, schema) segítségével, ahol az első paraméter egész sokféle adat lehet, a sémát pedig a lent megadott módon tudjuk megadni.
  • Beolvasással, pl. a gyumolcsok.txt CSV fájlként történő kezeléssel (ehhez is szükség van a sémára): val gyumolcsokDFRead = spark.read.format("csv").schema(schema).load("gyumolcsok.txt").

A séma kétféleképpen is megadható, az alábbi kettő ekvivalens

import org.apache.spark.sql.types._
val schema1 = new StructType()
    .add(StructField("nev", StringType, true))
    .add(StructField("darab", LongType, true))
val schema2 = StructType(List(
    StructField("nev", StringType, true),
    StructField("darab", LongType, true))
)

Vegyük most példaként a beolvasást, ezzel dolgozzunk tovább, egyúttal megismerünk két nagyon hasznos függvényt: a show táblázatosan kirajzolja a DataFrame tartalmát, míg a printSchema a sémát rajzolja ki:

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val gyumolcsokDF = spark.read.format("csv").schema(StructType(List(
          StructField("nev",StringType,true),
          StructField("darab",LongType,true))
      )).load("gyumolcsok.txt")
gyumolcsokDF: org.apache.spark.sql.DataFrame = [nev: string, darab: bigint]

scala> gyumolcsokDF.show
+------+-----+
|   nev|darab|
+------+-----+
|  alma|    5|
| körte|    8|
|szilva|    3|
|  alma|    7|
| banán|    2|
| körte|    1|
+------+-----+

scala> gyumolcsokDF.printSchema
root
 |-- nev: string (nullable = true)
 |-- darab: long (nullable = true)

scala>

És most lássuk a DataFrame igazi erejét! Ha még nem töltöttük be a spark.implicits._-t, akkor most tegyük meg, a createOrReplaceTempView() függvény segítségével hozzunk létre egy ideiglenes adattáblát, majd hajtsunk végre egy szokásos SQL utasítást:

scala> import spark.implicits._
import spark.implicits._

scala> gyumolcsokDF.createOrReplaceTempView("gyumolcsok")

scala> spark.sql("SELECT nev, sum(darab), avg(darab) FROM gyumolcsok WHERE darab > 2 GROUP BY nev").show
+------+----------+----------+
|   nev|sum(darab)|avg(darab)|
+------+----------+----------+
|szilva|         3|       3.0|
|  alma|        12|       6.0|
| körte|         8|       8.0|
+------+----------+----------+

Ez a kód sokkal olvashatóbb mint az RDD segítségével megvalósított! (Megjegyzés: előfordult, hogy nem Windows-on nem működött.)

Az olvashatóság már meg van, viszont ha elvétjük az SQL utasítást, akkor az csak futásidőben derül ki, ráadásul nem túl segítőkész hibaüzenet kíséretében. A fentivel lényegében ekvivalens a következő:

scala> gyumolcsokDF.select($"nev", $"darab").where($"darab" > 2).groupBy($"nev").agg(sum("darab"), avg("darab")).show
+------+----------+----------+
|   nev|sum(darab)|avg(darab)|
+------+----------+----------+
|szilva|         3|       3.0|
|  alma|        12|       6.0|
| körte|         8|       8.0|
+------+----------+----------+

Valószínűleg ez az a szintaxis, ami miatt megalkották a DataFrame adattípust: itt már a fordító is sokat segít, ha valamit elrontunk. Ráadásul az RDD-ben használható függvények itt is működnek.

Dataset

Amint arról már szó volt, a DataFrame egy Row típusú Dataset. A Dataset típusa viszont bármilyen case class lehet. Dataset-et legegyszerűbben úgy tudunk létrehozni, hogy a DataFrame-en meghívjuk az as[Tipus] típusmódosítót. Pl.:

case class Gyumolcs(nev: String, darab: Long)
val gyumolcsokDS = gyumolcsokDF.as[Gyumolcs]

Ezen is működik a show, ill. a korábban bemutatott eljárások.

A Dataset viszont mivel már típusos, a fordító fordítási időben jelzi a hibákat. Pl.: gyumolcsokDS.filter(gyumolcs => gyumolcs.darab > 2).show. Ezen felül működik rajza a DataFrame-nél bemutatott függvények is.

Folyamok

A folyamok a nagy adathalmazok természetes velejárói. A hagyományos algoritmusokban az összes adatot betöltjük a memóriába, és mindegyik elemet meg tudjuk címezni. Viszont ha az adat nem fér be a memóriába, akkor egyesével kell végigpásztáznunk az elemeken, lokálisan végrehajtható műveleteket hajtunk végre rajtuk, ill. asszociatív és kommutatív műveletekkel egyszerűsítünk. Léteznek véges és végtelen folyamok.

A Spark-ban kétféle folyam módszertan alakult ki: az első a DStreams volt, az újabb pedig a strukturált folyam. Mivel a big datában a folyamok egy alapvető eszköze a Kafka, azt külön megnézzük.

DStreams

A streaming (a stream magyarul folyamot jelent) a big data alapvető technikája. A fent ismertetett adattípusok is tkp. stream-ek, azaz nem egyszerre látjuk az összes adatot, hanem egyesével, a feldolgozás lokálisan történik, az összegzés pedig kommutatív és asszociatív műveletekkel. Az ebben a fejezetben bemutatott technika annyiba más a fentiekhez képest, hogy itt végtelen folyamokról van szó.

A DStream a Spark kezdeti megoldása a problémára. A következőkben látni fogjuk, hogy van egy újabb technológia is, viszont a lényeg megértéséhez elengedhetetlen, hogy a DStream-et is megismerjük. A dokumentáció itt található: https://spark.apache.org/docs/latest/streaming-programming-guide.html. A leírásban szereplő nc eszközt Windows-on közvetlenül nem tudjuk használni, és máris elérkeztünk ahhoz a ponthoz, hogy miért nem szeretjük igazán a nyílt forráskódú világot. Helyettesítők:

  • Ncat (https://nmap.org/ncat/): ez ingyen letölthető a honlapjukról, viszont elég nehézkes megtalálni magát a telepítőt, ezért megadom a közvetlen linket: https://nmap.org/dist/nmap-7.70-setup.exe. A telepítés önmaga meglepően lassú, és a folyamat során számos hiba lép fel, ezt ignoráljuk. Indítás: ncat -lk 9999. Itt viszont ismét várnunk kell annyit, amit normál esetben lefagyásnak gondolnánk (mert a program "természetesen" nem ír ki semmit…).
  • Netcat for Windows (https://joncraton.org/blog/46/netcat-for-windows/): a safe verziót töltsük le, e megadott jelszóval tömörítsük ki, majd másoljuk az exe fájlt ey tetszőleges könyvtárba (a forráskódot meg felejtsük el). Használat: nc -l -p 9999. "Természetesen" ez sem működik megbízhatóan, és fogalmam sincs, hogy mi miatt… Az indítással próbálkozhatunk git bash-ból, úgy alapvetően működött, bár ami furcsa, hogy Ctrl+C után megpróbálja parancsokként értelmezni a beírt szavakat…

A végtelenített szószámolós példa az alábbi. A kövtkező utasításokat a Spark Shell-ben egyesével adjuk ki, majd a Netcat program segítségével küldjünk a 9999-es portra szavakat:

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()

Az önálló alkalmazás megfelelője sajnos nemigazán egyértelmű; amire én jutottam, az a következő. A build.sbt a következő (időközben a Spark 2.4.1-esre váltva):

name := "Streaming Example"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1" % "provided"

A scalaVersion-nek továbbra is 2.11.12-nek kell maradnia (ha ClassNotFoundException lép fel, akkor ezt a sort segíthet: scalacOptions ++= Seq("-no-specialization")
). Az src/main/scala/StreamingExample.scala tartalma a következő:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming._

object StreamingExample {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Streaming Example").setMaster("local")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
    sc.stop()
  }
}

Emlékeztetőül, a következő utasítással fordítjuk és küldjük a Spark-nak:

sbt package
spark-submit --class "StreamingExample" --master local[4] target/scala-2.11/streaming-example_2.11-1.0.jar

A fordítás és indítás megfelel a fentieknek. Viszont hiába írunk be bármit a Netcat konzolon, a kívánt eredmény nem jelenik meg, csak valami logbejegyzés. Amikor Ctrl+C-vel kilövöm a Spark processzt, akkor viszont a hosszú logban megjelennek az egyes futások eredményei. Szóval nem igazán sikerült ezt működésre bírni… (TODO: működésre bírni…)

Lássuk a kód magyarázatát! A streaming fő belépési pontja a StreamingContext objektum, melynek tipikus neve ssc. Ez alapból a Spark Shell-ben sincs benne, létre kell hozni. A példában a ssc.socketTextStream() hívással inicializáltuk a folyamot, ami - ahogy a neve is jelzi, ill. ahogy már szó volt róla - egy socketről olvas szöveget. Ezt követően az átalakítások és transzformációk hasonlóak a már korábban bemutatottakhoz. A stream-et az ssc.start()-tal indítjuk, és az önálló alkalmazásban a ssc.awaitTermination() gondoskodik arról, hogy ne érjen véget azonnal. (Valószínűleg az sc.stop()-nak nincs jelentősége.)

A fenti oldalon érdemes megnézni a transzformációs lehetőségeket. Pl. lehetőség van két stream uniójára is.

A bemutatott lehetőség mellett létezik ablakos megoldás is. Meg kell adni, hogy milyen gyakorisággal fusson le és mekkora intervallumot vegyen figyelembe. Szintaktikailag nincs jelentős különbség a kettő között; a fenti példában a pairs.reduceByKey(_ + _) részt kell lecserélni pl. erre: pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)). Ebben a példában 10 másodpercenként fut le a szó számolás 30 másodperces ablakokon, tehát egy bemenet háromszor jelenik meg a statisztikában

Structured Streaming

Kicsit úgy viszonyul a strukturált streaming a DStream-hez mint a DataFrame/Dataset az RDD-hez. Sok hasonlóság van a kettő között. Talán a leglényegesebb koncepcionális újítás az, hogy a strukturált streaming felfogható egy végtelen táblának, amelyben folyamatosan új sorok jelennek meg. Ugyanakkor számos egyéb újítás is belekerült. A leírása itt található: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

A szokásos sószámolós példa, ami a localhost:9999-ről érkező szavakat számolja, Spark Shellből a következő:

val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream.outputMode("complete").format("console").start()

Ugyanez önálló alkalmazásként:

build.sbt:

name := "Structured Streaming Example"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.1" % "provided"

src/main/scala/StructuredStreamingExample.scala:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

object StructuredStreamingExample {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("StructuredStreamingExample").getOrCreate()
    import spark.implicits._
    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
    val words = lines.as[String].flatMap(_.split(" "))
    val wordCounts = words.groupBy("value").count()
    val query = wordCounts.writeStream.outputMode("complete").format("console").start()
    query.awaitTermination()
  }
}

Futtatás:

spark-submit --class "StructuredStreamingExample" --master local[4] target/scala-2.11/structured-streaming-example_2.11-1.0.jar

Ez utóbbi a tapasztalatom szerint elképesztően hosszan logol, melyben szinte elvész az eredmény, de alapvetően működik.

Az output mode (ami a fenti példában complete) a következő lehet:

  • Complete: mindent kiír; a szószámolós példában tehát az összes szót, ami a program indulása óta érkezett.
  • Append: csak az új sorokat írja ki; a példában tehát azokat a szavakat, amelyek a legutóbbi futás óta érkeztek. (Ez nálam nem működött.)
  • Update: csak azt írja ki, ami változott.

A fenti példa socket-ről olvassa az adatokat, melyet csak tesztelési céllal javasolnak. Lehet még fájlból olvasni (példát lejjebb találunk), ill. (ami a leggyakoribb) Kafka topic-ról. Ez utóbbiról még részletesen lesz szó.

Típustól függően lehet paraméterezni a forrást az .option(kulcs, érték) hívással. A fenti példában a szervert és a portot állítjuk be. A fenti oldalon láthatjuk, hogy melyik típusnál milyen beállítási lehetőségek vannak.

A gyakorlatban sokszor előfordul, hogy a végrehajtás során a folyamat ne folyamatosan menjen, hanem ablakokban dolgozza fel azt. Itt három fogalmat kell megismernünk:

  • A futás gyakorisága: ezt mutatja meg, hogy milyen gyakran fusson le a feldolgozás; a lenti példában 5 perc.
  • Egy ablak hossza: adott lefutáskor mennyi adatot vegyen figyelembe; a lenti példában 10 perc (tehát minden adat kétszer fog szerepelni).
  • Watermarking: mennyi ideig vegye figyelembe a későn érkező adatokat. A lenti példában ez 30 perc, ami azt jelenti, hogy ha legfeljebb ennyit késik az adat (tehát pl. most érkezik meg egy 28 perccel ezelőtti adat), akkor visszamenőlegesen aktualizálja azt a két ablakot.

Az alábbi utasításokat a Spark Shell-ből adjuk ki:

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val wordSchema = StructType(Array(
  StructField("timestampStr", StringType),
  StructField("word", StringType)
))

val streamingWords = spark.readStream.schema(wordSchema).csv("words").select(unix_timestamp($"timestampStr", "yyyy/MM/dd HH:mm:ss").cast(TimestampType).as("timestamp"), current_timestamp().as("current"), $"word")
val windowedWords = streamingWords.withWatermark("timestamp", "30 minutes").groupBy(window($"timestamp", "20 minutes", "10 minutes"), $"word").count()
val query = windowedWords.writeStream.format("console").option("truncate", false).start()

Hozzunk létre egy words nevű könyvtárat, és oda helyezzünk el fájlokat (a fájlnév mindegy) megfelelő tartalommal, kb. így:

2019/04/19 12:10:15, alma
2019/04/19 12:10:20, alma
2019/04/19 12:10:25, barack
2019/04/19 12:10:30, alma
2019/04/19 12:10:35, alma

Az időpont a futáskori időpont legyen. Majd adjunk hozz újabb és újabb fájlokat; a konzolon látni fogjuk az eredményt. (Megjegyzés: önálló alkalmazásként futtatva nálam nem az elvárt módon működött.)

A stream-elésnél a szokásos DataFrame és Dataset függvények nagy része használható (pl. select(), where() ill. filter(), map()), de van pár, amely vagy logikailag nem értelmezhető (pl. count() vagy distinct()) vagy még technikailag nincs megvalósítva (pl. többszörös stream aggregáció).

A fenti példákban a kimenet a konzolon történt (ld. writeStream.format("console")). További lehetőségek:

  • Fájl: pl. writeStream.format("parquet"). A parquet fájl a szokásos adattábla transzponáltja; nagy mennyiségű adatnál bizonyos műveletek végrehajtása így gazdaságosabb, mivel nagyobb eséllyel kerül ugyanarra a fizikai lemezre egy olyan adat, amire szűrünk.
  • Kafka: writeStream.format("kafka"). Opcióként meg kell adni a a szervert, a topic nevet és még opcionálisan pár adatot.
  • Memória (egészen pontosan egy virtuális adattábla a memóriában): .format("memory").queryName("tableName").
  • Foreach: writeStream.foreach(…), ahol paraméterül egy ForeachWriter példányt kell átadni. Itt 3 műveletet kell megvalósítani: open(), process(), close(). Ügyeljünk arra, hogy az egyes lépések nem a vezérlőn hajtódnak végre, így lokálisan kell megvalósítani az adatkapcsolatot. Mivel alapból minden egyes elemre felépíti a teljes kapcsolatot, érdemes egy közös készletet (angolul pool-t) létrehozni, azokat nyitva tartani és onnan választani egy írót.
  • Foreach batch: writeStream.foreachBatch(…). Ez hatékonyabb mint a foreach(), mivel itt nagyobb egységenként hajtódik végre a művelet.

Kafka

Noha a Kafka egyfajta lehetőség a stream-elésre, érdemes külön foglalkozni vele, mivel a valóságban ez a leggyakoribb stream forrás. A használata itt is problémákhoz vezet, így fontosnak tartom leírni, hogy nálam hogyan működött. A legfontosabb dokumentumok, melyeket érdemes elolvasni:

A telepítést és beállítást ld. lejjebb. A lent megadott módon indítsuk el a Kafkát, hozzunk létre egy test topic-ot, majd indítsunk egy producert, amellyel szavakat küldünk a Kafkának. A programunk ezt fogja számolni.

A build.sbt tartalma legyen ez:

build.sbt

Az src/main/scala/KafkaExample.scala tartalma ez:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming._

object KafkaExample {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("KafkaExample").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
    import spark.implicits._
    val kafka = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test")
      .option("startingOffsets", "latest")
      .load()
    val wordCounts = kafka.select(explode(split($"value".cast("string"), "\\s+")).as("word"))
      .groupBy($"word")
      .count
    val query = wordCounts
      .writeStream
      .outputMode("complete")
      .format("console")
      .trigger(ProcessingTime(1000))
      .start()
    query.awaitTermination()
  }
}

Fordítás:

sbt package

Futtatás:

spark-submit --class "KafkaExample" --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 --master local[4] target/scala-2.11/kafka-example_2.11-1.0.jar

Indítás után írjunk üzeneteket a producerbe. Kicsit várni kell, mire elkezd megjelenni az adat. Példa a bevitelre:

>alma meggy barack alma
>meggy cseresznye
>szilva meggy alma alma

Példa kimenet:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|word|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|barack|    1|
|  alma|    2|
| meggy|    1|
+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-----+
|      word|count|
+----------+-----+
|    barack|    1|
|      alma|    2|
|     meggy|    2|
|cseresznye|    1|
+----------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+-----+
|      word|count|
+----------+-----+
|    szilva|    1|
|    barack|    1|
|      alma|    4|
|     meggy|    3|
|cseresznye|    1|
+----------+-----+

Formátumok

Databricks

A Databricks egy felhő alapú Spark szolgáltatás.

Regisztráció

Áttekintés

DBFS

A big datával kapcsolatos egyéb technológiák

Kafka

Az Apache Kafka egy elosztott folyam rendszer (distributed streaming platform). Alapfunkciója tehát végső soron az, hogy adatokat fogadjon és továbbítson, hasonlóan pl. Az Apache Camelhez, a különbség a kettő között viszont az, hogy az Apache Camel elosztott módon, akármennyi feldolgozóval tud működni párhuzamosan, így lényegesen nagyobb (felső korlát nélküli) az "áteresztő képessége". Emiatt a Camel a big data alapvető építőköve, ráadásul úgy, hogy egymáshoz rendelődnek a Kafka és Spark példányok. Ennek persze ára van: a Kafkának sokkal kevesebb funkciója van, mint a Camelnek.

Az egyes Kafka példányok vezérlését egy Zookeeper nevű alkalmazás hajtja végre. Beállítása még egy számítógépen belül is - hasonlóan a Sparkhoz - nem nyilvánvaló, és a telepítési útmutatók sajnos nem nyújtanak 100%-os támpontot; nekem az alábbi módon sikerült beállítanom.

A Zookeeper telepítése

A Kafka önmagában is tartalmaz egy Zookeepert, amit tesztelési céllal használhatunk, produkciós környezetben viszont mindenképpen szükség van egy önállóan futó Zookeeperre. Sajnos ez egy nehézkesen telepíthető komponens, viszont ingyenes. Töltsük le a Zookeeper weboldaláról kiindulva (https://zookeeper.apache.org). Talán elsőre nem egyértelmű a megtalálása; én a következőt töltöttem le: http://xenia.sote.hu/ftp/mirrors/www.apache.org/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz. A kiterjesztés (.tar.gz) Windows-on nem könnyíti meg az életünket; a kicsomagoláshoz szükség lehet egy külső programra, pl. a 7-zip alkalmazásra (https://www.7-zip.org/). Csomagoljuk ki egy könyvtárba (pl. c:\programs\zookeeper-3.4.14), maj állítsuk be a következő környezeti változókat:

  • ZOOKEEPER_HOME: ez legyen a Zookeeper program gyökere (pl. c:\programs\zookeeper-3.4.14)
  • PATH: adjuk hozzá a Zookeeper bin könyvtárát (pl. c:\programs\zookeeper-3.4.12\bin)

A conf könyvtárban található zoo_sample.cfg fájlt (c:\programs\zookeeper-3.4.14\conf\zoo_sample.cfg) nevezzük át erre: zoo.cfg, és a dataDir-t írjuk át a megfelelő Windows-os könyvtárra, pl. dataDir=c:\\programs\\zookeeper-3.4.12\\data. Ezt követően a következő paranccsal indíthatjuk a Zookeepert:

zkserver

Ha minden rendben ment, a következőt ell látnunk a konzolon:

binding to port 0.0.0.0/0.0.0.0:2181

A Kafka telepítése

Töltsük le a rendszert innen: https://kafka.apache.org/downloads (pl. http://xenia.sote.hu/ftp/mirrors/www.apache.org/kafka/2.2.0/kafka_2.12-2.2.0.tgz), majd csomagoljuk ki (ezzel kapcsolatos infó: ld. fenn), pl. a c:\programs\kafka_2.12-2.2.0 könyvtárba. Ha nem az önálló Zookeepert szeretnénk futtatni, hanem a beépítettet, akkor tegyük a következőt: szerkesszük a config\zookeeper.properties fájlt, azon belül dataDir-t állítsuk be megfelelően, pl. a következőképpen:

dataDir=c:\\programs\\kafka_2.12-2.2.0\\zookeeper-data

Majd indítsuk a beépített Zookeepert a következőképpen:

c:\programs\kafka_2.12-2.2.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

Magának a Kafkának az indításához is pár dolgot be kell állítani. Adjuk hozzá a bin könyvtárban evő windows könyvtárat (pl. c:\programs\kafka_2.12-2.2.0\bin\windows) a PATH környezeti változóhoz. Módosítsuk a config könyvtáron belüli server.properties fájlt c:\programs\kafka_2.12-2.2.0\config\server.properties, megfelelően beállítva a log.dirs értékét, pl.:

log.dirs=c:\\programs\\kafka_2.12-2.2.0\\kafka-logs

Győződjünk meg arról, hogy fut a Zookeeper, majd indítsuk el a Kafkát:

c:\programs\kafka_2.12-2.2.0>bin\windows\kafka-server-start.bat config\server.properties

Ha minden rendben történt, a kéréseket a 9092-es porton fogadja.

Használat

A három legfontosabb művelet a téma (topic) létrehozása, üzenet küldése és üzenet fogadása. Ez utóbbi kettő a valóságban tipikusan külső programmal történik, tesztelési céllal viszont használhatjuk a beépített rót és olvasót is.

Először hozzunk létre egy teszt topic-ot:

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Látható, hogy topic-ot a Zookeeper szintjén hozzuk létre. A témák listázását a következőképpen tudjuk végrehajtani:

kafka-topics.bat --list --zookeeper localhost:2181

A következő utasítással üzeneteket tudunk küldeni a test topic-ra:

kafka-console-producer.bat --broker-list localhost:9092 --topic test

Amit a konzolra írunk, az bekerül a Kafkába. Kiolvasni a következőképpen tudjuk:

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

Korábban Zookeeper szintjén tudtunk csak kiolvasni; a legtöbb leírás még ezt tartalmazza. Még működik, de már nem használandó: kafka-console-consumer.bat —zookeeper localhost:2181 —topic test.

TODO: programkód példa

Event hubs

Data Warehouse

BLOB storage

Datalake

Elastic search

Redis

Felhő szolgáltatások

Azure

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License