PySpark

Kategória: Python.
Alkategória: Adatkezelés Pythonban.

Áttekintés

A Big Data témáról a linkelt oldalon van részletesen szó, így itt nem foglalkozunk ezzel részletesen.

A lényeg röviden: big datáról akkor beszélünk, ha egy számítógép nem elég az adatok feldolgozására. Ennek leggyakoribb oka az adatok keletkezési sebessége. Ilyenkor a feladatot párhuzamosan elvégezhető részekre kell osztani, majd az eredményt összefűzni.

Az Apache Spark egy ilyen megvalósítás. Ez olyan adatszerkezeteket definiál, amelyen a műveletek párhuzamosan hajtódnak végre. Magát a Sparkot Scala-ban írták, de támogatja a Java, az R és a Python programozási nyelveket is. Ez utóbbihoz a PySpark modulra van szükség.

A gyakorlatban a legritkább esetben használjuk közvetlenül a Sparkot, annak valóban párhuzamos konfigurálása nem egyszerű feladat. Ehelyett tipikusan olyan eszközöket használunk, amelyek automatikusan elvégzik a konfigurálást, és ezt felhő szolgáltatásként vesszük igénybe. Egy ilyen módszer a Databricks, és ez megtalálható a saját szervereiken, valamint az ismert külső felhőszolgáltatókban is. Használatáért fizetni kell, ám van ingyenes kipróbálási lehetőség is.

Amint arról fent volt szó, több olyan adatszerkezet létezik, amely támogatja az elosztott végrehajtást. Ezek közül talán a legfontosabb az adat keret, angolul Data Frame. Ezen az oldalon azt vizsgáljuk meg, hogy hogyan tudjuk ezt használni Pythonban. Az oldalnak nem célja a különböző big data technológiák részletes ismertetése.

Akik a Python leírásokat lineáris sorrendben olvassák, találkozhattak már a Pythonban data kerettel: a Pandas könyvtár valósítja ezt meg. A Spark adat keret és a Pandas adat keret nem összetévesztendő. Ám mivel a Pandas nagyon elterjedt, van lehetőség az ott megtanult módszereket is használni elosztott környezetben is, de nem közvetlenül, hanem átalakítással. Erről is lesz szó.

Előkészületként tehát:

  • Regisztráljunk egy Databricks példányt, akár a https://databricks.com/try-databricks linken, akár más felhő szolgáltatónál. (Vagy használhatjuk a community változatot: https://community.cloud.databricks.com/.)
  • Hozzunk létre egy fürtöt (clustert). (Az ingyenes verzióban csak egyet tudunk létrehozni.)
  • Hozzunk létre olyan Notebook-ot, ami a Python nyelvet támogatja.

A témával kapcsolatos hasznos olvasnivalók az alábbi oldalon találhatóak:

Spark Dataframe

DataFrame létrehozása

Spark adat keretet sokféleképpen létrehozhatunk: beolvasással, konverzióval stb. Mi most itt a spark.createDataFrame() függvényt használjuk:

gyumolcsok_df = spark.createDataFrame([
  ('alma', 5), 
  ('körte', 8), 
  ('szilva', 3), 
  ('alma', 7), 
  ('banán', 2), 
  ('körte', 1)
]).toDF('nev', 'darab')

DataFrame megjelenítése

Ezen használhatjuk a szokásos megjelenítő műveleteket, pl. a DataBricks-ben a display(gyumolcsok_df) hívást, vagy a karakteres eredményt nyújtó show() függvényt:

gyumolcsok_df.show()

melynek eredménye:

+------+-----+
|   nev|darab|
+------+-----+
|  alma|    5|
| körte|    8|
|szilva|    3|
|  alma|    7|
| banán|    2|
| körte|    1|
+------+-----+

A sémát is a Scala-ban megszokott módon jeleníthetjük meg:

gyumolcsok_df.printSchema()
root
 |-- nev: string (nullable = true)
 |-- darab: long (nullable = true)

Spark SQL

SQL lekérdezést szintén a Scala-hoz hasonlóan hajthatunk végre. Itt és az összes további példában is ugyanazon az inputon ugyanazt a műveletsort hajtjuk végre:

  • Szűrés: a példában csak azokat a gyümölcsöket vesszük figyelembe, melynek darabszáma nagyobb mint 2.
  • Csoportosítás: az ugyanolyan gyümölcsnévhez tartozó darabszámot összegezzük.
  • Rendezés: darabszám szerint.

Spark SQL lekérdezés formájában ez az alábbi:

gyumolcsok_df.createOrReplaceTempView('gyumolcsok')
gyumolcsok_grouped = spark.sql('SELECT nev, sum(darab) AS darab FROM gyumolcsok WHERE darab > 2 GROUP BY nev ORDER BY darab ASC')
gyumolcsok_grouped.show()
+------+-----+
|   nev|darab|
+------+-----+
|szilva|    3|
| körte|    8|
|  alma|   12|
+------+-----+

Ha lefuttatjuk, akkor láthatjuk, hogy a futás párhuzamos.

Spark DataFrame műveletek

A fentivel ekvivalens (és az eredmény is az) az alábbi:

from pyspark.sql.functions import sum, col, asc
 
gyumolcsok_df\
  .select('nev', 'darab')\
  .where(col('darab') > 2)\
  .groupBy('nev')\
  .agg(sum('darab').alias('darab'))\
  .sort(asc('darab'))\
  .show()

Ennek a szintaxisa valamelyest eltér a Scala-tól (személyes véleményem: a hasznára). Figyeljük meg azt, hogy importálni kell függvényeket. Nagyon fontos ez a sum függvény esetén: nem az alapkönyvtár sum függvényét kell hívni, hanem a párhuzamosan futni képes Spark SQL változatot.

Koalas

Ha valaki hozzászokott a Pandas adatkeret szintaxisához, azt Sparkban is használhatja, a Koalas könyvtár segítségével.

Pandas kód

Lássuk először, hogyan valósítanánk meg a fenti programot Pandas segítségével!

import pandas as pd
 
gyumolcsok_pandas = pd.DataFrame({
  'nev': ['alma', 'körte', 'szilva', 'alma', 'banán', 'körte'],
  'darab': [5, 8, 3, 7, 2, 1],
})
gyumolcsok_pandas_filtered = gyumolcsok_pandas[gyumolcsok_pandas['darab'] > 2]
gyumolcsok_pandas_grouped = gyumolcsok_pandas_filtered.groupby('nev').sum().reset_index()
gyumolcsok_pandas_sorted = gyumolcsok_pandas_grouped.sort_values('darab')
print(gyumolcsok_pandas_sorted)
      nev  darab
2  szilva      3
1   körte      8
0    alma     12

Látható, hogy a szintaxis egészen más, mint a Spark adatkeret esetén, így nem szabad abba a hibába esni, hogy ha ismerjük a Pandast, akkor egyúttal ismerjük a PySpark-ot is. Ha lefuttatjuk a fenti programot, akkor az eredmény jónak tűnik. Csakhogy a futás nem párhuzamosan, worker node-okon futott futott, hanem hagyományosan, a vezérlő komponensen. Ekkora méretnél természetesen ennek nincs jelentősége, de most olyan méreteket képzeljünk el, amelyhez nem lenne elég a vezérélő komponens.

Koalas kód

Töltsük be a databricks.koalas könyvtárat! Itt is számos módon hozhatunk létre adat keretet; a példában a fenti Pandas adatkeretből konvertálunk a from_pandas() hívással. Minden más lépés egy az egyben ugyanaz, és az eredmény is:

import databricks.koalas as ks
 
gyumolcsok_koalas = ks.from_pandas(gyumolcsok_pandas)
gyumolcsok_koalas_filtered = gyumolcsok_koalas[gyumolcsok_koalas['darab'] > 2]
gyumolcsok_koalas_grouped = gyumolcsok_koalas_filtered.groupby('nev').sum().reset_index()
gyumolcsok_koalas_sorted = gyumolcsok_koalas_grouped.sort_values('darab')
print(gyumolcsok_koalas_sorted)

Megfigyelhetjük, hogy futáskor a Spark párhuzamosítja a feladatokat.

Koalas átalakítása Spark adatkeretté

A Koalas adatkeretet a to_spark() hívással tudjuk Spark adatkeretté konvertálni:

from pyspark.sql.functions import sum, col, asc
 
gyumolcsok_spark = gyumolcsok_koalas.to_spark()
gyumolcsok_spark.select('nev', 'darab').where(col('darab') > 2).groupBy('nev').agg(sum('darab').alias('darab')).sort(asc('darab')).show()

Itt tehát Pandas adatkeretből indultunk, és konvertáltuk 2 lépésben Spark adatkeretté, majd hajtottunk végre Spark műveleteket. Az eredmény ugyanaz, mint a fenti.

Koalas műveletek Spark adatkereten

Ha az adat Spark adatkeret formájában adott, de Pandas szintaxist szeretnénk használni, akkor a to_koalas() hívással tudjuk Koalas adatkeretté konvertálni, azon Pandas szintaxisban programozni, amit párhuzamosítva hajt végre, majd a végén visszaalakítani ismét Spark adatkeretté a to_spark() hívással:

gyumolcsok_spark = spark.createDataFrame([
  ('alma', 5), 
  ('körte', 8), 
  ('szilva', 3), 
  ('alma', 7), 
  ('banán', 2), 
  ('körte', 1)
]).toDF('nev', 'darab')
gyumolcsok_koalas = gyumolcsok_spark.to_koalas()
gyumolcsok_koalas_filtered = gyumolcsok_koalas[gyumolcsok_koalas['darab'] > 2]
gyumolcsok_koalas_grouped = gyumolcsok_koalas_filtered.groupby('nev').sum().reset_index()
gyumolcsok_koalas_sorted = gyumolcsok_koalas_grouped.sort_values('darab')
gyumolcsok_spark_result = gyumolcsok_koalas_sorted.to_spark()
gyumolcsok_spark_result.show()

Az eredmény megegyezik az összes fent bemutatott módszer eredményével.

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License