Párhuzamosítás

Áttekintés

A konkurens végrehajtás egy rendkívül szerteágazó téma, amiről itt egy átfogó képet szeretnék nyújtani. Az elvek nyelvfüggetlenek, a konkrét példák viszont Pythonban készültek.

Az alapprobléma a következő: hogyan tudunk függvényeket párhuzamosan futtatni? A párhuzamos futtatás igényének nagyon sok oka lehet, ahogy a többféle megoldás közül válogathatunk. Néhány gyakorlati példa:

  • A webes alkalmazásoknál célszerű, hogy egyszerre, párhuzamosan sokan tudjuk használni. Most képzeljük el, hogy a Facebook alkalmazásban addig nem tudjuk megnézi az üzenőfalunkat, amíg egyetlen valaki a világon éppen bejegyzést fogalmaz.
  • A processzoridő drága, és gyakran előfordul, hogy vár valamilyen erőforrásra. Például rendkívül gazdaságtalan egy webes letöltésnél várakoztatni a processzort; helyette jobb, ha más csinál, és visszatér, ha az adatok megérkeztek.
  • A többmagos processzorok korában ki szeretnénk használni a tényleges párhuzamosítás lehetőségét: egy jól optimalizált program kétmagos processzoron közel fele annyi ideig fut, mintha egymagos rendszeren futtatnánk.

Érdemes még az elején tisztázni pár fogalmat. A szűkebb értelembe vett párhuzamosítás valójában a tényleges párhuzamos végrehajtást jelenti, ami a többmagos processzorok kihasználásával valósul meg. Erre a leírásban multiprocessingként hivatkozunk. A témát általánosan talán a konkurens végrehajtás kifejezés vagy a konkurencia szó fejezi ki legjobban. Viszont mivel a témáról elsősorban a párhuzamosítás jut eszünkbe, ill. fordítva, a leginkább a párhuzamosításról jut eszünkbe az,a mi itt le van írva, a nem teljesen precíz megnevezés mellett döntöttem.

Ezen az oldalon ezt a témát járjuk körbe. A téma programozási nyelv független; itt a Pythont használjuk.

Aszinkron programozás

Áttekintés

Első körben megnézzük, milyen párhuzamosítási lehetőségeink vannak akkor, ha összesen egy processzormagunk van, és egy programszálat használhatunk. Ilyen esetben természetesen nincs szó valódi párhuzamosításról, hanem annak szimulálásáról: egy ideig az egyik fut, utána a másik, majd ismét az egyik, így folytatva, amíg be nem fejeződnek.

Ennek jelentősége leginkább az input-output műveletek esetében van. Tegyük fel, hogy le szeretnénk tölteni 3 weboldalat. Alapértelmezésben sorra, egyesével letöltjük mindhármat. Viszont a letöltés indítása gyors, maga a letöltés lassú, és a feldolgozás szintén gyors. Pazarló tehát az a megoldás, hogy egyesével végrehajtjuk háromszor az említett lépéseket, hiszen egy processzormagunk van, ami az idő nagy részében vár. Sokkal hatékonyabb azonnal elindítani mindhárom letöltést, majd visszatérni a feldolgozásukra, miután megérkeztek. A kettő között pedig esetleg további hasznos műveleteket tudunk végrehajtani.

Az aszinkron futtatás pont erről szól. Az aszinkron függvények képesek egy hosszabb, processzort nem igénylő művelet előtt átadni a futási lehetőséget, majd visszatérni, ha ismét lehetőséget kapnak és a művelet véget ért. Ebben az alfejezetben ezt a lehetőséget nézzük meg. A részleteket lent olvashatjuk; elöljáróban annyit, hogy ezt két kulcsszó (async és await), egy belső könyvtár (asyncio) és számos külső könyvtár (aiofiles, aiohttp) támogatja.

Kitérő: generátorok

A generátorok nem tartoznak szorosan a témához, egy rövid kitérő erejéig viszont érdemes őket megemlíteni, ugyanis bizonyos szintű párhuzamos futás hatást ezekkel is el lehet érni, ls valójában ebből "nőtte ki magát" Pythonban az aszinkron futtatás.

Röviden: a generátorok olyan függvények, amelyek az eredményt nem a return, hanem yield kulcsszó segítségével adják vissza. Lényeges eltérés a két megközelítés között az, hogy mígy a return befejezi a függvény futását, a yield később folytatja. Így egy programon belül több yield is lehet, ugyanabban a szekvenciában, vagy akár cikluson belül.

A generátorok technikailag átmenetet képeznek a függvények és az osztályok között. Kinézetre szinte egy az egyben olyanok mint a függvények (a fenti kivételtől eltekintve), viszont a futtatáshoz példányosítani kell mint az osztályokat, és a next() globális függvény hívásával vagy cikluson belül tudjuk elérni az eredményeket. Ha a generátor véget ért, azaz már nincs további yield, akkor a következő next() hívás eredménye a StopIteration kivétel. Ezt a ciklus automatikusan lekezeli.

Az alábbi példa egy kicsit mesterkélt, ám jól mutatja a generátorok lényegét:

def generátor(n):
    print(f'generátor {n} eleje')
    yield 10 * n + 1
    print(f'generátor {n} közepe')
    yield 10 * n + 2
    print(f'generátor {n} vége')
    yield 10 * n + 3
 
generátorok = [generátor(1), generátor(2), generátor(3)]
while True:
    vége = 0
    for generátor in generátorok:
        try:
            részeredmény = next(generátor)
            print(részeredmény)
        except StopIteration:
            vége += 1
    if vége == len(generátorok):
        break

A példában a generátornak 3 yield-je van, ami visszatér valamilyen értékkel, és közben kiír valamit. A hívó oldalon példányosítjuk háromszor, és sorra egyesével olvassuk ki a next() hívással az eredményeket. A program futásának eredménye az alábbi:

generátor 1 eleje
11
generátor 2 eleje
21
generátor 3 eleje
31
generátor 1 közepe
12
generátor 2 közepe
22
generátor 3 közepe
32
generátor 1 vége
13
generátor 2 vége
23
generátor 3 vége
33

Látható tehát, hogy azt a hatást kelti, mintha a 3 függvényhívás párhuzamosan futna.

A Python nyelv a 3-as főverzió első alverzióiban ebbe az irányba indult el (ld. pl. a yield from struktúrát), ám később inkább a coroutine megoldást választották (ld. következő alfejezetet), és a generátorokból egyes részeket ki is vettek.

Coroutine létrehozása

A coroutine technikailag egy függvény, ami elé a fejlécben odaírjuk azt, hogy async. Az ilyen függvényt (tehát a coroutine-t) nem lehet hagyományos módon meghívni. Meghívásakor a függvény elé kell írni az await kulcsszót. Az await kulcsszót viszont csak olyan függvénybe tudjuk írni, ami maga is async. A róka fogta csuka, csuka fogta róka problémát az asyncio.run() függvény segítségével oldhatjuk fel.

Az aszinkron függvények hívásának akkor van értelme, ha egyszerre többet hívunk. Ehhez taszkokat hozhatunk létre az asyncio.create_task() függvény segítségével..

Lássunk egy példát! Itt egy lassú függvényt valósítunk meg, ami az elején kiír valamit, után 1 másodpercig nem csinál semmit, és a végén is kiír valamit. Ezt hívjuk meg háromszor. Hagyományos módon a következőképpen valósítanánk meg:

import time
 
def lassú_függvény(n):
    print(f'Lassú függvény {n} eleje')
    time.sleep(1)
    print(f'Lassú függvény {n} vége')
 
for i in range(3):
    lassú_függvény(i)
    time.sleep(0.1)

Az eredmény:

Lassú függvény 0 eleje
(1 másodperc szünet)
Lassú függvény 0 vége
Lassú függvény 1 eleje
(1 másodperc szünet)
Lassú függvény 1 végezz
Lassú függvény 2 eleje
(1 másodperc szünet)
Lassú függvény 2 vége

Tehát a teljes lefutás kb. 3 másodpercig tart, miközben az idő jelentős hányadűban a processzor blokkolódik.

A fenti program szinkron megvalósítása:

import asyncio
 
async def lassú_függvény(n):
    print(f'Lassú függvény {n} eleje')
    await asyncio.sleep(1)
    print(f'Lassú függvény {n} vége')
 
async def main():
    feladatok = []
    for i in range(3):
        feladatok.append(asyncio.create_task(lassú_függvény(i)))
    for feladat in feladatok:
        await feladat
 
asyncio.run(main())

A futás eredménye:

Lassú függvény 0 eleje
Lassú függvény 1 eleje
Lassú függvény 2 eleje
(1 másodperc szünet)
Lassú függvény 0 vége
Lassú függvény 1 vége
Lassú függvény 2 vége

Tehát az 1 másodperces várakozást párhuzamosította. Vegyük észre, hogy a várakozás nem a blokkoló time.sleep(1), hanem az await asyncio.sleep(1) nem blokkoló hívással történt. Így a program kb. 1 másodperc alatt befejeződött.

Visszatérési érték

Az async függvénynek lehet visszatérési értéke, amit értékül kap az await hívás. Példaként tekintsünk egy lassú összeadást, ahol a függvénynek két paramétere van, annak összegéve tér vissza, de előtte nem blokkoltan egy másodperc szüntet tart:

import asyncio
 
async def lassú_összead(n, a, b):
    print(f'lassú összead {n} eleje')
    await asyncio.sleep(1)
    print(f'lassú összead {n} vége')
    return a + b
 
async def main():
    összeadások = [
        (3, 2),
        (1, 5),
        (2, 6),
    ]
    feladatok = []
    for i, összeadás in enumerate(összeadások):
        feladatok.append(asyncio.create_task(lassú_összead(i, *összeadás)))
    eredmények = []
    for feladat in feladatok:
        eredmények.append(await feladat)
    print(eredmények)
 
asyncio.run(main())

Az eredmény:

lassú összead 0 eleje
lassú összead 1 eleje
lassú összead 2 eleje
(1 másodperc szünet)
lassú összead 0 vége
lassú összead 1 vége
lassú összead 2 vége
[5, 6, 8]

A gather() függvény

Az aszinkron programozásban gyakori, hogy több függvényt szeretnénk szimuláltan párhuzamosan futtatni, majd összevárni mindegyik eredményét. Fent láthattunk egy módszert: egy listába tesszük a taszkokat, majd egy másik ciklusban mindegyikre meghívjuk az await-et. Ezt egyszerűsíti a gather() függvény: paraméterül tetszőleges számú aszinkron függvényt kell átadni, és az gondoskodik az összevárásról.

A fenti feladat megoldása gather() segítségével:

import asyncio
 
async def lassú_összead(n, a, b):
    print(f'lassú összead {n} eleje')
    await asyncio.sleep(1)
    print(f'lassú összead {n} vége')
    return a + b
 
async def main():
    összeadások = [
        (3, 2),
        (1, 5),
        (2, 6),
    ]
    feladatok = []
    for i, összeadás in enumerate(összeadások):
        feladatok.append(lassú_összead(i, *összeadás))
    eredmények = await asyncio.gather(*feladatok)
    print(eredmények)
 
asyncio.run(main())

Az eredmény ugyanaz.

Aszinkron fájlkezelés

A fenti példákban a hosszú műveletet aszinkron altatással szimuláltuk. A valóságban nyilván nem erről van szó, hanem valóban olyan, tipikusan input-output műveletekről, ahol a CPU blokkolva van. A két leggyakoribb ilyen művelet a fájlkezelés és a netes letöltés. Most ezt a két lehetőséget nézzük meg. Mindkettőhöz külső könyvtárat kell telepíteni.

Ebben az alfejezetben az aszinkron fájlkezelésről lesz szó: a fájl megnyitása és az olvasás befejezése között a processzor tud mást is csinálni. Ehhez fel kell telepíteni az aiofiles könyvtárat:

pip install aiofiles

A példában 5 fájlt fogunk kezelni, egyenként tízmillió véletlen számmal, és az összeget számoljuk ki. Először generáljuk le a fájlokat:

import random
 
random.seed(12345)
for i in range(5):
    fájlnév = f'számok{i}.txt'
    print(fájlnév)
    with open(fájlnév, 'w') as fájl:
        for _ in range(10_000_000):
            print(random.randint(1, 100), file=fájl)

Az aszinkron fájlkezelés kódja:

import asyncio
import aiofiles
 
async def aszinkron_fájlolvasás(fáljnév):
    print(f'Aszinkron fájlolvasás eleje: {fáljnév}')
    async with aiofiles.open(fáljnév) as f:
        tartalom = await f.read()
    print(f'Aszinkron fájlolvasás vége: {fáljnév}')
    összeg = ([int(sor) for sor in tartalom.split()])
    print(f'Összeg kiszámolva: {fáljnév}')
    return sum(összeg)
 
async def main():
    feladatok = []
    for i in range(5):
        feladatok.append(asyncio.create_task(aszinkron_fájlolvasás(f'számok{i}.txt')))
    eredmény = await asyncio.gather(*feladatok)
    print(eredmény)
 
asyncio.run(main())

Az eredmény:

Aszinkron fájlolvasás eleje: számok0.txt
Aszinkron fájlolvasás eleje: számok1.txt
Aszinkron fájlolvasás eleje: számok2.txt
Aszinkron fájlolvasás eleje: számok3.txt
Aszinkron fájlolvasás eleje: számok4.txt
Aszinkron fájlolvasás vége: számok0.txt
Összeg kiszámolva: számok0.txt
Aszinkron fájlolvasás vége: számok2.txt
Összeg kiszámolva: számok2.txt
Aszinkron fájlolvasás vége: számok4.txt
Összeg kiszámolva: számok4.txt
Aszinkron fájlolvasás vége: számok3.txt
Összeg kiszámolva: számok3.txt
Aszinkron fájlolvasás vége: számok1.txt
Összeg kiszámolva: számok1.txt
[505060112, 505119193, 505059688, 505007236, 504869347]

Olyan szempontból ez egy jó lefutás, hogy a befejezés sorrendje nem ugyanaz mint a kezdésé: mivel pl. a később kezdett számok2.txt beolvasása előbb fejeződött be mint a számok1.txt fájlé, a feldolgozása is előbb kezdődött, azaz a lassúbb nem blokkolta a gyorsabbat.

Aszinkron netes letöltés

A netes letöltéshez az aiohttp könytárat kell telepíteni:

pip install aiohttp

Az alábbi példában 5 weboldalt töltünk le párhuzamosan, és az egyes oldalak hosszát írjuk ki:

import asyncio
import aiohttp
 
async def weboldal_méret(url):
    print(f'Letölt: {url}')
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as válasz:
            tartalom = await válasz.text()
            print(f'{url} letöltése kész')
            return len(tartalom)
 
async def main():
    weboldalak = [
        'http://faragocsaba.hu/python',
        'https://www.python.org/',
        'https://www.w3schools.com/python/',
        'https://www.tutorialspoint.com/python/',
        'https://www.pythontutorial.net/',
    ]
    feladatok = []
    for weboldal in weboldalak:
        feladatok.append(asyncio.create_task(weboldal_méret(weboldal)))
    eredmények = []
    for feladat in feladatok:
        eredmények.append(await feladat)
    print(eredmények)
 
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())

Az eredmény:

Letölt: http://faragocsaba.hu/python
Letölt: https://www.python.org/
Letölt: https://www.w3schools.com/python/
Letölt: https://www.tutorialspoint.com/python/
Letölt: https://www.pythontutorial.net/
https://www.pythontutorial.net/ letöltése kész
https://www.tutorialspoint.com/python/ letöltése kész
https://www.python.org/ letöltése kész
https://www.w3schools.com/python/ letöltése kész
http://faragocsaba.hu/python letöltése kész
[39850, 50791, 96749, 29844, 69392]

Majd később az összehasonlításnál látni fogjuk, hogy itt lényeges különbség van futási időben a szinkron és az aszinkron megvalósítás között. (A fájlkezelés esetén nincs túl nagy eltérés.)

A kódban van egy "csúnya" utasítás: asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()). Windows operációs rendszereken jön elő egy hiba, amit sajnos csak ezzel a sorral tudunk kezelni.

Többszálú programozás

Áttekintés

A fentiekben egy processzor magon mi magunk szimuláltuk a párhuzamosítást. Valójában operációs rendszer szinten, úgynevezett szálak (thread) segítségével is tudunk párhuzamos hatást elérni, szintén egyetlen processzormagon futtatva. Ez kezdetben - pl. a DOS vagy a Windows 3.1 világában - nem volt így, Windows 95-től kezdve viszont már létezik; Linux esetén a kezdetektől fogva. A többszálú operációs rendszerek tehát az 1990-es évek első felében jelentek meg.

Működési elve nagyjából a következő: rövid ideig fut az egyik szál, utána a másik, stb., majd ismét az első. Ha kellő gyakorisággal változik az, hogy melyik szál fut éppen, akkor párhuzamos hatás érhető el, ugyanakkor túl nagy gyakorisággal sem érdemes váltani a szálak között, mert akkor aránytalanul sok erőforrás megy el magára az egyébként improduktív váltásra. Ez persze az operációs rendszer feladata; a fejlesztőnek csak annyit kell tudnia, hogy hogyan lehet a szálakat indítani, kezelni.

A többmagos processzorok megjelenésével lehetővé vált a valódi többszálúság. A Pythonban viszont van egy olyan fogalom, hogy Global Interpreter Lock (GIL), ami megakadályozza a tényleges párhuzamos futtatást.

A szálakra jellemző, hogy közös memóriaterületet használnak. Ennek vannak előnyei és hátrányai is. Az előnye nyilvánvalóan az, hogy az egyik szál látja a másik szál eredményét. A hátránya ugyanez: ha egyszerre kettő szál is szeretné írni az adott memóriaterületet, akkor inkonzisztens állapot jöhet létre. Például ha egy memóriaérték úgy változik, hogy kiolvassuk a régi értéket, módosítjuk, majd visszaírjuk, és ezt két külön szálon tesszük, akkor ha a szálváltás a kiolvasás és a kiírás között történik, akkor inkonzisztens állapot alakulhat ki. Emiatt az ilyen kódrészeket jól el kell határolni, hogy elkerüljük az ilyen állapotokat. Ez viszont számos további problémához vezet; a többszálúságból adódó hibák nagyon nehezen felderíthetőek és nehezen javíthatóak.

A szálak létrehozásának van valamekkora erőforrás igénye; ezt is figyelembe kell venni, amikor szálakat indítunk.

Ebben a fejezetben azt nézzük meg, hogy a Python milyen lehetőségeket rejt a szálkezeléssel kapcsolatban.

Szálak indítása

A szálak kezeléséhez a threading belső könyvtárra van szükség. Szálat a threading.Thread() segítségével tudunk létrehozni. Számunkra lényeges paraméterek:

  • target: a függvény neve;
  • args: az argumentumok, tuple formátumban.

Ha létrehozunk egy szálat, az alapból nem fut; elindítani a start() függvény hívásával tudjuk.

A fenit lassú függvényes példa szálak segítségével az alábbi:

import threading
import time
 
def lassú_függvény(n):
    print(f'Lassú függvény {n} eleje')
    time.sleep(1)
    print(f'Lassú függvény {n} vége')
 
for i in range(3):
    szál = threading.Thread(target=lassú_függvény, args=(i,))
    szál.start()
    time.sleep(0.01)

A time.sleep(0.01) nélkül a kiírás összekuszálódik, amiről később lesz még szó. Így az eredmény:

Lassú függvény 0 eleje
Lassú függvény 1 eleje
Lassú függvény 2 eleje
(kb. 1 másodperc szünet)
Lassú függvény 0 vége
Lassú függvény 1 vége
Lassú függvény 2 vége

Daemon szál

Normál esetben egy program futása akkor fejeződik be, amikor mindegyik szál futása befejeződött. Ez alól egyetlen kivétel van, az ún. daemon szál, ami tipikusan egy végtelen ciklusban megvalósított függvény, és automatikusan leáll az utolsó nem daemon szál befejeződésével.

Daemon szálat a daemon=True paraméter megadásával lehet létrehozni:

import threading
import time
 
def véges_függvény():
    print('Véges függvény eleje')
    for i in range(5):
        print(f'Véges függvény: {i}')
        time.sleep(0.5)
    print('Véges függvény vége')
 
def végtelen_függvény():
    print('Végtelen függvény eleje')
    i = 0
    while True:
        print(f'Végtelen függvény: {i}')
        i += 1
        time.sleep(0.5)
 
végtelen_függvény_szál = threading.Thread(target=végtelen_függvény, daemon=True)
véges_függvény_szál = threading.Thread(target=véges_függvény)
végtelen_függvény_szál.start()
time.sleep(0.25)
véges_függvény_szál.start()

A futáskor látható, hogy amikor a véges függvényt futtató szál befejeződik, a damenon szál automatikusan leáll:

Végtelen függvény eleje
Végtelen függvény: 0
Véges függvény eleje
Véges függvény: 0
Végtelen függvény: 1
Véges függvény: 1
Végtelen függvény: 2
Véges függvény: 2
Végtelen függvény: 3
Véges függvény: 3
Végtelen függvény: 4
Véges függvény: 4
Végtelen függvény: 5
Véges függvény vége

Visszatérési érték

Ha a szálaknak van eredménye, azt úgy tudjuk kezelni, hogy egy olyan változóba tesszük, amit a többi szál is elér. Az alábbi példában a lassú összeadó programot valósítjuk meg szálak segítségével. Ebben az egyes szálak megkapnak egy listát, és a neki megfelelő cellába írja az eredményt.

A példában megismerünk egy újabb szál-specifikus függvényt: a join()-t. Ez azt jelenti, hogy az aktuális szál megvárja annak a szálnak a befejezését, amelyiken ezt a függvényt meghívjuk. A szálak referenciáit egy listába gyűjtjük, és sorra mindegyiken meghívjuk a join() metódust:

import threading
import time
 
def lassú_összead(eredmények, n, a, b):
    print(f'lassú összead {n} eleje')
    time.sleep(1)
    print(f'lassú összead {n} vége')
    eredmények[n] = a + b
 
összeadások = [
    (3, 2),
    (1, 5),
    (2, 6),
]
összeadások_száma = len(összeadások)
eredmények = összeadások_száma * [None]
szálak = []
for i, összeadás in enumerate(összeadások):
    szál = threading.Thread(target=lassú_összead, args=(eredmények, i, *összeadás))
    szálak.append(szál)
    szál.start()
    time.sleep(0.01)
for szál in szálak:
    szál.join()
print(eredmények)

Az eredmény:

lassú összead 0 eleje
lassú összead 1 eleje
lassú összead 2 eleje
(kb. 1 másodperc szünet)
lassú összead 0 vége
lassú összead 1 vége
lassú összead 2 vége
[5, 6, 8]

Thread Pool

A szálak futtatását egyszerűsíti az ún. thread pool. Ez bizonyos számú szálat menedzsel. Végrehajtót a concurrent.futures.ThreadPoolExecutor() példányosításával tudunk létrehozni. Feladatot a submit() függvénnyel tudunk küldeni, ahol meg kell adnunk a függvény nevét és a paraméterlistát, itt tvábbi paraméterekként (nem tuple-ként).

Az eredmény egy ún. future, azaz jövőbeli eredmény. Ennek két fontos függvénye a következő:

  • done(): segítségével lekérdezzük, hogy meg van-e már az eredmény.
  • result(): maga az eredmény.

Az alábbi példa kicsit bonyolult: a lassú összeadást valósítjuk meg úgy, hogy elküldjük a ThreadPoolExecutor-nak az összeadandókat, tizedmásodpercenként leellenőrizzük, hogy kész van-e már, és ha mindegyik kész, akkor kiolvassuk és kiírjuk az eredményt:

import concurrent.futures
import time
 
def lassú_összead(n, a, b):
    print(f'lassú összead {n} eleje')
    time.sleep(1)
    print(f'lassú összead {n} vége')
    return a + b
 
összeadások = [
    (3, 2),
    (1, 5),
    (2, 6),
]
végrehajtó = concurrent.futures.ThreadPoolExecutor()
jövőbeli_eredmények = []
for i, összeadás in enumerate(összeadások):
    jövőbeli_eredmények.append(végrehajtó.submit(lassú_összead, i, *összeadás))
    time.sleep(0.1)
kész = False
while not kész:
    kész = True
    for jövőbeli_eredmény in jövőbeli_eredmények:
        if not jövőbeli_eredmény.done():
            kész = False
    time.sleep(0.1)
eredmények = [jövőbeli_eredmény.result() for jövőbeli_eredmény in jövőbeli_eredmények]
print(eredmények)
végrehajtó.shutdown()

Az eredmény:

lassú összead 0 eleje
lassú összead 1 eleje
lassú összead 2 eleje
(kb. 1 másodperc szünet)
lassú összead 0 vége
lassú összead 1 vége
lassú összead 2 vége
[5, 6, 8]

A fenti kód bonyolult. Első körben egyszerűsítsünk kettő ponton:

  • A ThreadPoolExecutor-t with szerkezettel használva nem kell lezárni a végén, azaz nem kell a shitdown().
  • A concurrent.futures.wait() megvárja az összes eredményt.

A ThreadPoolExecutor paramétere azt jelenti, hogy hány szál futhat egyszerre. Az alábbi példában ezt kettőre csökkentjük. Csak a főprogram rész változott:

with concurrent.futures.ThreadPoolExecutor(2) as végrehajtó:
    jövőbeli_eredmények = []
    for i, összeadás in enumerate(összeadások):
        jövőbeli_eredmények.append(végrehajtó.submit(lassú_összead, i, *összeadás))
        time.sleep(0.1)
    concurrent.futures.wait(jövőbeli_eredmények)
    eredmények = [jövőbeli_eredmény.result() for jövőbeli_eredmény in jövőbeli_eredmények]
    print(eredmények)

A lefutása:

lassú összead 0 eleje
lassú összead 1 eleje
(kb. 1 másodperc szünet)
lassú összead 0 vége
lassú összead 2 eleje
lassú összead 1 vége
(kb. 1 másodperc szünet)
lassú összead 2 vége
[5, 6, 8]

A végrehajtónak van egy map() függvénye, ami tovább egyszerűsíti a dolgot. Itt megadhatunk egy függvényt, valamint egy vagy több iterable objektumot, és elvégzi a teljes műveletet. Hátránya, hogy a fenti struktúra ebben a formában nem használható. Az így módosított kód az alábbi:

import concurrent.futures
import time
 
def lassú_összead(n, a, b):
    time.sleep(1)
    return a + b
 
with concurrent.futures.ThreadPoolExecutor() as executor:
    eredmények = executor.map(lassú_összead, range(3), [3, 1, 2], [2, 5, 6])
print([eredmény for eredmény in eredmények])

A kiírásokat kivettem, mert itt kijön a később részletezett probléma. Önmagában az eredmények értékét - meglepő módon - nem lehet kiírni, csak így végiglépkedve.

Lockolás

A fentiekben óvatosan kezeltük a kiírást: több esetben kerültük, ill. rövid várakozásokkal tettük "biztonságossá" a dolgot. Most nézzük a következő példát:

import time
import threading
 
szál_kulcs = threading.Lock()
 
def függvény(n):
    print('Függvény',  n, 'eleje')
    for i in range(3):
        time.sleep(0.1)
        print('Függvény',  n, 'lépés', i)
    print('Függvény',  n, 'vége')
 
print('Eleje')
szálak = []
for i in range(3):
    szál = threading.Thread(target=függvény, args=(i,))
    szálak.append(szál)
    szál.start()
for szál in szálak:
    szál.join()
print('Vége')

A függvény háromszor vár egy-egy tizedmásodpercet, miközben kiír adatokat. Ezt futtatjuk párhuzamosan háromszor. A futás eredménye egészen döbbenetes:

Eleje
Függvény Függvény0  1 Függvényelejeeleje

 2 eleje
FüggvényFüggvényFüggvény   0 21  lépéslépés 0
 lépés 00

FüggvényFüggvény Függvény 0 1 lépés 2 lépés 1 1
lépés
 1
Függvény Függvény 2Függvény0   lépés1 lépés 2
Függvénylépés 2
 2Függvény
 1 vége0 vége

Függvény 2 vége
Vége

A probléma az, hogy a print() utasítás nem atomi. Ha vesszővel választunk el dolgokat, akkor azok "alul" külön utasításokként futnak le, és ugyanez történek az új sor kiírással is. Mivel a művelet nem atomi, az operációs rendszer megteheti, hogy közben váltson szálat.

Ha azt szeretnénk, hogy a kiírás atomi legyen, akkor az ún. lockolási technikát kell alkalmazni. Ennek módja az alábbi:

  • Létrehozunk egy kulcsot a kulcs threading.Lock() segítségével.
  • Amikor belépünk a kritikus szakaszba, akkor magunkhoz vesszük a lockot: kulcs.acquire().
  • Elengedjük a kritikus szakasz után: kulcs.release().

Valójában az összes kiírásnak ki kell zárnia az összes többit, így ugyanazt a kulcsot kell mindegyiknél alkalmazni:

import time
import threading
 
szál_kulcs = threading.Lock()
 
def függvény(n):
    szál_kulcs.acquire()
    print('Függvény',  n, 'eleje')
    szál_kulcs.release()
    for i in range(3):
        time.sleep(0.1)
        szál_kulcs.acquire()
        print('Függvény',  n, 'lépés', i)
        szál_kulcs.release()
    szál_kulcs.acquire()
    print('Függvény',  n, 'vége')
    szál_kulcs.release()
 
print('Eleje')
szálak = []
for i in range(3):
    szál = threading.Thread(target=függvény, args=(i,))
    szálak.append(szál)
    szál.start()
for szál in szálak:
    szál.join()
print('Vége')

Az eredmény:

Eleje
Függvény 0 eleje
Függvény 1 eleje
Függvény 2 eleje
Függvény 1 lépés 0
Függvény 2 lépés 0
Függvény 0 lépés 0
Függvény 2 lépés 1
Függvény 0 lépés 1
Függvény 1 lépés 1
Függvény 1 lépés 2
Függvény 1 vége
Függvény 2 lépés 2
Függvény 2 vége
Függvény 0 lépés 2
Függvény 0 vége
Vége

Az eredmény szép, a kód viszont nem.

A with kulcsszóval nem kell elengednünk a kulcsot, ráadásul azzal, hogy egyetlen utasítás tartozik mindegyik esetben a kritikus szakaszban, a következőképpen is hazsnálhatjuk, ami nem sokkal rondább mint az alap kód:

import time
import threading
 
szál_kulcs = threading.Lock()
 
def függvény(n):
    with szál_kulcs: print('Függvény',  n, 'eleje')
    for i in range(3):
        time.sleep(0.1)
        with szál_kulcs: print('Függvény',  n, 'lépés', i)
    with szál_kulcs: print('Függvény',  n, 'vége')
 
print('Eleje')
szálak = []
for i in range(3):
    szál = threading.Thread(target=függvény, args=(i,))
    szálak.append(szál)
    szál.start()
for szál in szálak:
    szál.join()
print('Vége')

Az eredmény ugyanaz.

A lockolást egyébként elsősorban nem a szép kiírásnál alkalmazzuk, hanem azért, hogy a változók konzisztensek maradjanak. Konkrétan a Pythonnál erre egyébként nincs szükség, ugyanis az ún. GIL (Global Interpreter Lock) gondoskodik erről.

Deadlock

A többszálúság nagyon alattomos hibákhoz vezethet, az egyik ilyen az ún. deadlock. A lockolást sok esetben nem tudjuk elkerülni. Az alábbi példában két kulcs van, az első függvény lefoglalja az egyiket, a második a másikat, majd belül mindkét függvény megpróbálja lefoglalni azt a kulcsot, amit már a másik lefoglalt. Mivel egymásra várnak, ún. deadlock alakul ki:

import time
import threading
 
szál_kulcs_1 = threading.Lock()
szál_kulcs_2 = threading.Lock()
 
def függvény1():
    print('függvény1() eleje')
    print('függvény1() várakozás szál_kulcs_1-re')
    with szál_kulcs_1:
        print('függvény1() szál_kulcs_1 lefoglalva')
        time.sleep(1)
        print('függvény1() várakozás szál_kulcs_2-re')
        with szál_kulcs_2:
            print('függvény1() szál_kulcs_2 lefoglalva')
            time.sleep(1)
        print('függvény1() szál_kulcs_2 elengedve')
    print('függvény1() szál_kulcs_1 elengedve')
    print('függvény1() vége')
 
def függvény2():
    print('függvény2() eleje')
    print('függvény2() várakozás szál_kulcs_2-re')
    with szál_kulcs_2:
        print('függvény2() szál_kulcs_2 lefoglalva')
        time.sleep(1)
        print('függvény2() várakozás szál_kulcs_1-re')
        with szál_kulcs_1:
            print('függvény2() szál_kulcs_1 lefoglalva')
            time.sleep(1)
        print('függvény2() szál_kulcs_1 elengedve')
    print('függvény2() szál_kulcs_2 elengedve')
    print('függvény2() vége')
 
szál_1 = threading.Thread(target=függvény1)
szál_2 = threading.Thread(target=függvény2)
szál_1.start()
time.sleep(0.1)
szál_2.start()

A futás eredménye:

függvény1() eleje
függvény1() várakozás szál_kulcs_1-re
függvény1() szál_kulcs_1 lefoglalva
függvény2() eleje
függvény2() várakozás szál_kulcs_2-re
függvény2() szál_kulcs_2 lefoglalva
függvény1() várakozás szál_kulcs_2-re
függvény2() várakozás szál_kulcs_1-re
(végtelen ciklus)

Ez most egy erőltetett és szándékolt hiba, ám nagyobb programok esetében ez könnyen előfordulhat. Felderítése nagyon nehéz. Emiatt egyébként léteznek olyan keretrendszerek (elsősorban más programozási nyelvekben), amelyek eleve lehetetlenné teszik az "ős gonosz" szálak létrehozását.

Szemafor

Sokszor előfordul az is, hogy egy bizonyos szakaszban csak korlátozni szeretnénk a szálak számát, de nem szeretnénk a párhuzamos futtatást megakadályozni. Ilyet már láttunk a thread pool executor részben; most ugyanezt az alacsonyabb szintű- ezáltal jobban finomhangolható - szemafor segítségével valósítjuk meg.

Szemafort a threading.Semaphore() segítségével tudunk létrehozni, paraméterként megadva azt, hogy egyszerre hány szál futhat. Használata hasonló a kulcshoz. Az alábbi példában öt szálon indítunk egy függvényt, de a szemafor egyszerre csak hármat enged:

import time
import threading
 
szemafor = threading.Semaphore(3)
 
def függvény(n):
    print('Függvény',  n, 'eleje')
    with szemafor:
        time.sleep(0.5)
        print('Függvény',  n, 'közepe 1')
        time.sleep(0.5)
    print('Függvény',  n, 'vége')
 
print('Eleje')
szálak = []
for i in range(5):
    szál = threading.Thread(target=függvény, args=(i,))
    szálak.append(szál)
    szál.start()
    time.sleep(0.1)
for szál in szálak:
    szál.join()
print('Vége')

A lefutás szemaforral:

Eleje
Függvény 0 eleje
Függvény 1 eleje
Függvény 2 eleje
Függvény 3 eleje
Függvény 4 eleje
Függvény 0 közepe 1
Függvény 1 közepe 1
Függvény 2 közepe 1
Függvény 0 vége
Függvény 1 vége
Függvény 2 vége
Függvény 3 közepe 1
Függvény 4 közepe 1
Függvény 3 vége
Függvény 4 vége
Vége

Szemafor nélkül az alábbi lenne:

Eleje
Függvény 0 eleje
Függvény 1 eleje
Függvény 2 eleje
Függvény 3 eleje
Függvény 4 eleje
Függvény 0 közepe 1
Függvény 1 közepe 1
Függvény 2 közepe 1
Függvény 3 közepe 1
Függvény 4 közepe 1
Függvény 0 vége
Függvény 1 vége
Függvény 2 vége
Függvény 3 vége
Függvény 4 vége
Vége

Multiprocessing

Áttekintés

A Python a Global Interpreter Lock (GIL) által megakadályozza, hogy a többszálú programok fizikailag is több szálon, több processzormagon fussanak. Ez az egyik legtöbbet támadott tulajdonsága a Python programozási nyelvnek, és tervbe van véve a kivezetése. Amíg ez nem történik meg, és azt szeretnénk, hogy a programunk fizikailag is több processzormagon fusson, a multiprocessing belső könyvtárat használhatjuk. Ez hasonlóan működik mint a multithreading, lényeges eltérs viszont, hogy itt az egyes processzek futhatnak különböző processzormagon.

Lényeges különbség még a többszálúság és a multiprocessing között az, hogy ez utóbbi esetében a memória nem osztott, hanem mindegyik processznek sajt memóriaterülete van. Ez megnehezíti az adatátadást; majd látni fogjuk, hogy ez miként történik.

Másik fontos eltérés az, hogy a multiprocessingnek nagyobb a plusz erőforrásigénye, tehát az a processzoridő, ami ahhoz szükséges, hogy a processzek egyáltalán elinduljanak. Emiatt tényleg csak indokolt esetben célszerű a multiprocessinget használni, tehát ha valóban ki szeretnénk használni a fizikai több processzor adta lehetőségeket.

Processzek indítása

Technikailag nagyon hasonlóan működik a processzek indítása, mint a szálaké:

import multiprocessing
import time
 
def lassú_függvény(n):
    print(f'Lassú függvény {n} eleje')
    time.sleep(1)
    print(f'Lassú függvény {n} vége')
 
if __name__ == '__main__':
    for i in range(3):
        folyamat = multiprocessing.Process(target=lassú_függvény, args=(i,))
        folyamat.start()
        time.sleep(0.01)

Két lényeges eltérés van:

  • threading.Thread helyett multiprocessing.Process.
  • Kell az if name == 'main':. Ez a többszálú esetben sem árt, itt viszont a hiánya hibás futást eredményez.

Visszatérési érték

Ha egy eljárásnak van eredménye, akkor hasonlóan kezelhetjük mint a többszálú esetben. Viszont mivel a memória alapból nem osztott, azt vezérelt adatszerkezetekkel tudjuk a processzek között átadni. Ehhez a multiprocessing.Manager()-t kell példányosítani. Menedzselt listát az alábbi módon tudunk létrehozi:

import multiprocessing
import time
 
def lassú_összead(eredmények, n, a, b):
    print(f'lassú összead {n} eleje')
    time.sleep(1)
    print(f'lassú összead {n} vége')
    eredmények[n] = a + b
 
if __name__ == '__main__':
    összeadások = [
        (3, 2),
        (1, 5),
        (2, 6),
    ]
    összeadások_száma = len(összeadások)
    manager = multiprocessing.Manager()
    eredmények = manager.list(összeadások_száma * [None])
    processzek = []
    for i, összeadás in enumerate(összeadások):
        processz = multiprocessing.Process(target=lassú_összead, args=(eredmények, i, *összeadások[i]))
        processzek.append(processz)
        processz.start()
        time.sleep(0.01)
    for processz in processzek:
        processz.join()
    print(eredmények)

Process Pool

A concurrent.futures.ThreadPoolExecutor() mintájára a concurrent.futures.ProcessPoolExecutor()-t is létrehozhatjuk, ami technikailag mindenben megegyezik a szál társával, a mögöttes működésben tér csak el.

Összehasonlítás

A módszerek összehasonlítása

Lássuk a 3 lehetőség összefoglalását:

  • Aszinkron programozás: egy processzormagon, egy szálon szimuláljuk a párhuzamos futtatást úgy, hogy a nem blokkoló várakozó utasításokat használjuk. Input/output műveletek esetén érdemes használni, CPU erőforrási igényes feladatok esetén nem.
  • Többszálúság: operációs rendszer szintű szálak segítségével történik a párhozamos futtatás, viszont a GIL megakadályozza a több processzormag használatát. Ezt is input/output műveletek esetén érdemes használni, CPU erőforrási igényes feladatok esetén nem.
  • Multiprocessing: ennek segítségével ki tudjuk használni a több processzormag adta lehetőségeket. Számításigényes feladatok esetén érdemes használni, input/output műveletek esetén nem, mert nagy a rezsiköltsége.

IO igényes függvények

Az alábbi példákban 5 weboldal főoldalát olvassuk be és visszaadjuk annak hosszát. Négyféle megoldást nézünk:

  • Az alapot, melyben sorban egymás után letöltjük az oldalakt.
  • Az aszinkront, melyben nem blokkoló letöltést használunk.
  • A többszálú módszert.
  • A multiprocesszinget.

Mindegyik esetben kiírjuk az eltelt időt, amit egy későbbi táblázatban összesítünk.

Alap megvalósítás:

import urllib.request
import time
 
def weboldal_méret(url):
    print(f'Letölt: {url}')
    tartalom = urllib.request.urlopen(url).read()
    print(f'{url} letöltése kész')
    return len(tartalom)
 
előtte = time.perf_counter()
weboldalak = [
    'http://faragocsaba.hu/python',
    'https://www.python.org/',
    'https://www.w3schools.com/python/',
    'https://www.tutorialspoint.com/python/',
    'https://www.pythontutorial.net/',
]
eredmények = []
for weboldal in weboldalak:
    eredmények.append(weboldal_méret(weboldal))
print(eredmények)
print(f'{time.perf_counter() - előtte:.3f}')

Aszinkron módszer:

import asyncio
import aiohttp
import time
 
async def weboldal_méret(url):
    print(f'Letölt: {url}')
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as válasz:
            tartalom = await válasz.text()
            print(f'{url} letöltése kész')
            return len(tartalom)
 
async def main():
    weboldalak = [
        'http://faragocsaba.hu/python',
        'https://www.python.org/',
        'https://www.w3schools.com/python/',
        'https://www.tutorialspoint.com/python/',
        'https://www.pythontutorial.net/',
    ]
    feladatok = []
    for weboldal in weboldalak:
        feladatok.append(asyncio.create_task(weboldal_méret(weboldal)))
    eredmények = []
    for feladat in feladatok:
        eredmények.append(await feladat)
    print(eredmények)
 
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
előtte = time.perf_counter()
asyncio.run(main())
print(f'{time.perf_counter() - előtte:.3f}')

Többszálú megvalósítás:

import threading
import urllib.request
import time
 
def weboldal_méret(url, eredmények, n):
    print(f'Letölt: {url}')
    tartalom = urllib.request.urlopen(url).read()
    print(f'{url} letöltése kész')
    eredmények[n] = len(tartalom)
 
előtte = time.perf_counter()
weboldalak = [
    'http://faragocsaba.hu/python',
    'https://www.python.org/',
    'https://www.w3schools.com/python/',
    'https://www.tutorialspoint.com/python/',
    'https://www.pythontutorial.net/',
]
eredmények = len(weboldalak) * [None]
szálak = []
for i, weboldal in enumerate(weboldalak):
    szál = threading.Thread(target=weboldal_méret, args=(weboldal, eredmények, i))
    szálak.append(szál)
    szál.start()
    time.sleep(0.01)
for szál in szálak:
    szál.join()
print(eredmények)
print(f'{time.perf_counter() - előtte:.3f}')

Multiprocessing:

import multiprocessing
import urllib.request
import time
 
def weboldal_méret(url, eredmények, n):
    print(f'Letölt: {url}')
    tartalom = urllib.request.urlopen(url).read()
    print(f'{url} letöltése kész')
    eredmények[n] = len(tartalom)
 
if __name__ == '__main__':
    előtte = time.perf_counter()
    weboldalak = [
        'http://faragocsaba.hu/python',
        'https://www.python.org/',
        'https://www.w3schools.com/python/',
        'https://www.tutorialspoint.com/python/',
        'https://www.pythontutorial.net/',
    ]
    manager = multiprocessing.Manager()
    eredmények = manager.list(len(weboldalak) * [None])
    processzek = []
    for i, weboldal in enumerate(weboldalak):
        processz = multiprocessing.Process(target=weboldal_méret, args=(weboldal, eredmények, i))
        processzek.append(processz)
        processz.start()
    for processz in processzek:
        processz.join()
    print(eredmények)
    print(f'{time.perf_counter() - előtte:.3f}')

CPU igényes függvények

Ebben az esetben egymillió darab, 0 és 10 közötti véletlen számot adunk össze, mindezt ötször. Ugyancsak a fenti 4 módszerrel valósítjuk meg.

Alap megvalósítás:

import time
import random
 
def véletlen_összead(n):
    print(f'Véletlen összead {n} eleje')
    összeg = 0
    for _ in range(1_000_000):
        összeg += random.randint(0, 10)
    print(f'Véletlen összead {n} vége')
    return összeg
 
előtte = time.perf_counter()
random.seed(12345)
eredmények = []
for i in range(5):
    eredmények.append(véletlen_összead(i))
print(eredmények)
print(f'{time.perf_counter() - előtte:.3f}')

Az aszinkron módszer esetében a párhuzamosítást úgy szimuláljuk, hogy a százezredik, háromszázezredik stb. lépésben átadjuk a vezérlést:

import asyncio
import time
import random
 
async def véletlen_összead(n):
    print(f'Véletlen összead {n} eleje')
    összeg = 0
    for j in range(1_000_000):
        összeg += random.randint(0, 10)
        if j % 200_000 == 100_000:
            await asyncio.sleep(0)
            print(f'Véletlen összead {n} ...')
    print(f'Véletlen összead {n} vége')
    return összeg
 
async def main():
    eredmények = []
    feladatok = []
    for i in range(5):
        feladatok.append(asyncio.create_task(véletlen_összead(i)))
    for feladat in feladatok:
        eredmények.append(await feladat)
    print(eredmények)
 
előtte = time.perf_counter()
random.seed(12345)
asyncio.run(main())
print(f'{time.perf_counter() - előtte:.3f}')

Többszálúság:

import threading
import time
import random
 
def véletlen_összead(eredmények, n):
    print(f'Véletlen összead {n} eleje')
    összeg = 0
    for _ in range(1_000_000):
        összeg += random.randint(0, 10)
    print(f'Véletlen összead {n} vége')
    eredmények[n] = összeg
 
előtte = time.perf_counter()
random.seed(12345)
futások_száma = 5
eredmények = futások_száma * [None]
szálak = []
for i in range(futások_száma):
    szál = threading.Thread(target=véletlen_összead, args=(eredmények, i))
    szálak.append(szál)
    szál.start()
    time.sleep(0.01)
for szál in szálak:
    szál.join()
print(eredmények)
print(f'{time.perf_counter() - előtte:.3f}')

Multiprocessing:

import multiprocessing
import time
import random
 
def véletlen_összead(eredmények, n):
    print(f'Véletlen összead {n} eleje')
    összeg = 0
    for _ in range(1_000_000):
        összeg += random.randint(0, 10)
    print(f'Véletlen összead {n} vége')
    eredmények[n] = összeg
 
if __name__ == '__main__':
    előtte = time.perf_counter()
    futások_száma = 5
    manager = multiprocessing.Manager()
    eredmények = manager.list(futások_száma * [None])
    folyamatok = []
    for i in range(futások_száma):
        folyamat = multiprocessing.Process(target=véletlen_összead, args=(eredmények, i))
        folyamatok.append(folyamat)
        folyamat.start()
    for folyamat in folyamatok:
        folyamat.join()
    print(eredmények)
    print(f'{time.perf_counter() - előtte:.3f}')

Sebesség összefoglaló táblázat

Az alábbi táblázat az egyes futási időket tartalmazza, másodpercben kifejezve, 3 tizedes pontosságra kerekítve:

Alap Aszinkron Többszálúság Multiprocessing
IO 1.840 0.489 0.549 0.899
CPU 2.151 2.433 2.294 0.964

Az IO műveleteknél messze a legrosszabb az egymás után végrehajtott alap megvalósítás teljesített legrosszabbul, ahol a processzor pazarló módon várt az adatok megérkezésére. Itt az aszinkron megvalósítás a legjobb; sikerült a lehető legjobban optimalizálni A többszálú megvalósításnak van valamekkora extra költsége az szinkronhoz képest; egy fél másodpercnyivel lett lassúbb. A többszálú megvalósítás tehát IO-esetben egész jól teljesít. A multiprocessing érdemben javított az alaphoz képest, viszont nagyon jól kirajzolódik a processzek indításának pluszköltsége: IO-intenzív műveletek esetén nem érdemes azt használni.

A CPU igényes műveletek esetében az alap megvalósítás lett a második legjobb. Az aszinkron és a többszálú rosszabb lett az alapnál, ugyanis azon felül, hogy a processzor folyamatosan dolgozik, még a váltások extra költségét is bele kell számolni. A multiprocessing viszont határozottan jobban teljesített, ahol ki tudnuk használni a tényleges párhuzamos futást.

Weboldalak

Az alábbi weboldalak segítettek a legtöbbet az leírás elkészítésében:

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License