Vert.x

Kategória: Java keretrendszerek.

Áttekintés

A Vert.x a reaktív programozási modellt (reactive programming) megvalósító, JVM-en futó rendszer.

A reaktív programozási modell

A reaktív programozás fogalma független a Vert.x-től, valójában nem is ez számít a referencia megvalósításának, de a Vert.x megismeréséhez nélkülözhetetlen ennek a technikának legalább a magas szintű ismerete. Néhány szempont:

  • A reaktív programozási módszert követő nyelvek tipikusan deklaratív programozási paradigmát követik. Hogy ezt megértsük, meg kell értenünk azt, hogy minek a tagadására épül. Számos programozási paradigma létezik; most vegyük az "alapot", mégpedig az imperatív programozási paradigmát. Azt gondolom, hogy ez áll legközelebb az "alap" programozó gondolkodásához: ebben az utasításoknak egy jól meghatározott lefutása van, szépen sorban hajtódnak végre és értékelődik ki az eredmény. Elvileg bármit meg lehet valósítani ebben a paradigmában, de egy bizonyos komplexitáson túl nehezen kezelhetővé válik a dolog, és képbe jön a többi paradigma: a strukturális, a procedurális, az objektumorientált, a funkcionális stb., és most a deklaratív. A deklaratív programozási paradigma a program logikájára fókuszál: azt adjuk meg, hogy mi történjen, a vezérlés viszont nincs a fókuszban.
  • Az előzőből következik, hogy a reaktív modell eseményvezérelt: valójában azt adjuk meg deklaratív módon, hogy milyen esemény hatására mi történjen. A tényleges lefutást viszont maga az esemény váltja ki. Egy elnagyolt példa: a hagyományos technikával két számot úgy adunk össze, hogy valahogy megkapjuk az egyiket, majd a másikat, végül az a = b + c hatására eredményül az összeget kapjuk. A reaktív programozási modellben ellenben - ezt a példát tovább gondolva - megadjuk azt, hogy amikor megérkezik a két összeadandó értéke, akkor számolja ki azt. Ebben a példában az esemény az érték megérkezése. Vagy egy másik példa: figyelje a két összeadandót (a példában a b és a c változók értékeit), és ha azok megváltoztak (az esemény tehát itt a változó megváltozása), akkor automatikusan módosítsa az eredmény (azaz az a változó) értékét.
  • Egy reaktív program tipikusan aszinkron. A szálak nem blokkolódhatnak. Az összeadásos példát folytatva: tegyük fel, hogy a művelet elvégzése hosszú időt vesz igénybe. Ezt nehéz egy egyszerű összeadásnál elképzelni, de gondoljunk mondjuk arra, hogy van egy web szolgáltatás valahol az interneten, és közvetlen összeadás helyett elküldjük a kérést a távoli szervernek. Az a szerver lehet aktuálisan túlterhelt, lehet lassú a hálózati kapcsolat stb., szóval egy ilyen modellben nem lehetünk biztosak a gyors válaszban. A deklaratív (ill. lényegében tetszőleges más) paradigmában elküldenénk a kérést, megvárnánk a választ és visszatérnénk az eredménnyel. A reaktív program ellenben elküldi a kérést, majd azonnal visszatér, és a kliensnek ad egy olyan objektumot, amely a visszatérés pillanatában még üres, és az eredmény akkor kerül bele, ha megérkezett. A kliens eldönti, hogy addig vár, amíg meg nem érkezik, vagy időközönként lekérdezi, hogy megérkezett-e már, és ha nem, más feladatot hajt végre.
  • A reaktív programozásra igen gyakran jellemző a folyamok (stream) használata. A hagyományos gyűjtemény (collection) típusokkal ellentétben, melyekben egyszerre elérhető a teljes adathalmaz, a folyamok során egyszerre kevés, tipikusan csak egy elemet "látunk", és deklaratív módon megadjuk, hogy mi történjen azzal. Egy külső rendszer "húzza el előttünk" a folyamban levő adatokat, a programunk pedig végrehajtja a műveletet. Ez is egyébként tipikusan deklaratív: csak azt mondjuk meg, hogy egy-egy elemmel mi történjen, és a program szempontjából az egy esemény, hogy jött egy konkrét elem, amin végre kell hajtani valamit.
  • Bár nem része közvetlenül a reaktív programozásnak, mégis igen gyakori a fluent API használata: tipikusan egyszerre egész sok mindent meg szoktunk adni, és ahelyett, hogy ezeket egymás után írnánk külön utasításként, ponttal elválasztva egymás után írjuk.

A reaktív kiáltvány (reactive manifesto, https://www.reactivemanifesto.org/) az alábbi módon határozza meg a reaktív rendszereket:

  • Válaszoló (responsive): a kérésre a válasz gyorsan megérkezik.
  • Rugalmas (resilient): a rendszer hiba esetén is válaszoló marad.
  • Rugalmas (elastic): a rendszer megváltozott terhelés esetén is válaszoló marad.
  • Eseményvezérelt (message-driven): a reaktív rendszerek aszinkron eseményeket dolgoznak fel.

Reaktív rendszert nem egyszerű létrehozni, melynek leginkább az imperatív programozáshoz szokott programozói agy számára szokatlan módszer az elsődleges oka. Mégis, ha vesszük a fáradtságot, akkor tömör kódot és jól skálázható rendszert kapunk, melyben nem kell törődnünk pl. a többszálúsággal.

A Vert.x rendszer

A Vert.x tehát a fenti modellt valósítja meg. Az Eclipse gondozásában fejlesztik. Felfoghatjuk egyfajta alkalmazásszerverként is (letöltjük a rendszert, elkészítjük a kódot, majd a rendszer segítségével indítjuk), de felfoghatjuk úgy is, mint egy hagyományos Java könyvtár. Jól modularizált, így ha a rendszernek csak egy kis szeletét használjuk, akkor az eredmény is kicsi marad. Több programozási nyelvet támogat (az írás pillanatában: Java, Kotlin, JavaScript, Groovy, Ruby és Scala), és a komponenseket akár külön nyelven is megírhatjuk. Az alap rendszer mellett tartalmaz kiegészítéseket is (pl. támogatja a legelterjedtebb adatbázis rendszereket), valamint kiterjeszthető.

A Vert.x rendszer alapvető komponense az ún. verticle. A Vert.x keretrendszer esemény cikluson keresztül (event loop) hajtja végre az utasításokat, mellyel a Multi-Reator mintát valósítja meg.

Néhány fontosabb, a neten fellelhető anyag:

Hello, Vert.x world!

Ennyi filozófiai bevezető után lássunk is valamit! A példaprogram vertxwebserver rögtön egy web szervert tartalmaz, amelynek a lényegi része - amint azt látni fogjuk, mindössze egyetlen sor. A példákhoz a Maven rendszert használom. A példában a pom.xml tartalmazza a vertx-core függőséget:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>hu.faragocsaba</groupId>
    <artifactId>vertxwebserver</artifactId>
    <version>1.0</version>
 
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.9.0</version>
        </dependency>
    </dependencies>    
</project>

A webszervert megvalósító verticle az alábbi:

package hu.faragocsaba.vertxwebserver;
 
import io.vertx.core.AbstractVerticle;
 
public class HelloVerticle extends AbstractVerticle {
    @Override
    public void start() {
        vertx.createHttpServer().requestHandler(request -> request.response().end("Hello Vert.x world!")).listen(8080);
    }
}

Egyetlen sorban megvalósítottunk egy HTTP szervert! Imperatív módban ennél sokkal bonyolultabb lenne: gondoskodni kellene ugyanis arról, hogy párhuzamosan több klienst is ki tudjon szolgálni, ne legyen blokkoló (ha nem egy fix stringgel térnénk vissza, akkor azt célszerűen egy másik szálon hajtanánk végre, azaz azonnal a nyakunkban lenne a párhuzamosítás minden nyűgje); a hosszú kódban elveszne a lényeg.

Némi magyarázatot fűzök a kódhoz:

  • A verticle osztály az AbstractVerticle osztályból származik; ez a tipikus megvalósítása a verticle-nek.
  • Az AbstractVerticle osztálynak a következő két metódusát tudjuk felülírni: void start() és void stop(). A példában a start() metódust írtuk felül.
  • A példában lambda kifejezéssel adtuk meg a kezelő paraméterét, ami egy funkcionális interfészt várt. Ott azt adtuk meg, hogy kérés esetén mi történjen: fix válasszal térjen vissza.
  • Az imperatív paradigmához szokott programozónak furcsa lehet, hogy előbb adjuk meg a visszatérési szöveget, és csak utána a port számát. Nem azt adtuk meg ugyanis, hogy hajtsd végre ezt a lépést, majd azt, végül amazt, hanem azt adtuk meg neki, hogy figyelje a 8080-as portot, és megmondtuk, hogy mi történjen akkor, ha jön egy kérés.
  • A vertx referencia az ősosztály egy protected hivatkozása. Ezen keresztül érhetjük el magát a Vert.x rendszert. A másik ilyen referencia a context.

Fordítsuk le a szokásos módon: mvn clean install. Érdemes szemügyre venni az eredmény méretét: alig egy-két kilobájt.

A programot többféleképpen is elindíthatjuk.

Indítás parancssorból

A Vert.x rendszerre tekinthetünk úgy, mint alkalmazás szerver. Ehhez le kell töltenünk magát a rendszert: a https://vertx.io/download/ oldalon válasszunk egy disztribúciót. Én a legfrissebb (az írás pillanatában ez a 3.9.0) teljes (full) verzió zip tömörített változatát töltöttem le (nincs jelentős különbség a minimális és a teljes között). Nem igényel különleges telepítést, csak ki kell csomagolni egy tetszőleges könyvtárba. A bin könyvtárból adjuk ki a következő parancsot:

vertx run hu.faragocsaba.vertxwebserver.HelloVerticle -cp vertxwebserver-1.0.jar

A -cp kapcsolóval a classpath-t tudjuk megadni, és az adott operációs rendszeren szokásos elérési útvonalat kell megadnunk. Ha mindent jól csináltunk, akkor pár másodperc elteltével kapunk egy ilyen üzenetet: Succeeded in deploying verticle. Ezt követően egy tetszőleges böngészőből nyissuk meg a http://localhost:8080/ oldalt; a Hello Vert.x world! feliratot kell látnunk.

A parancssorban még számos más paramétere van, mellyel meg tudjuk adni a párhuzamosan futó szálakat, fürtöket tudunk létrehozni stb. A parancssori lehetőségek részletes ismertetése túlmutat az oldal keretein. A https://vertx.io/docs/vertx-shell/java/ oldalon olvashatunk erről részletesebben.

Indítás programból

Nincs szükségünk a Vert.x alkalmazás szerverre, anélkül is el tudjuk indítani a programot. Ehhez persze kell egy main függvény, ahol példányosítjuk a verticle-t. A fenti példát is folytathatjuk; én külön programot hoztam létre vertxstandalone néven, hogy mindegyik lépés megmaradjon, melybe belemásoltam a fenti program tartalmát, a megfelelő módosításokkal. A főprogram a következő:

package hu.faragocsaba.vertxstandalone;
 
import io.vertx.core.Vertx;
 
public class Main {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(HelloVerticle.class.getName());
    }
}

A deployVerticle() eljárás telepíti a verticle-t. A fejlesztőkörnyezetből elindítva (Eclipse-t használok) ugyanúgy fut. (Parancssorból össze kellene gyűjteni a függőségeket, és megfelelően felparaméterezve elindítani, de pár perc kínlódást követően feladtam. A megoldást ld. lejjebb.)

Futtatható jar készítése

A parancssoros indításhoz célszerű egy olyan jar fájlt készíteni, ami mindent tartalmaz a futtatáshoz. Ehhez hozzá kell adni egy build beépülőt. Folytathatjuk a fenti példát; én az említett ok miatt erre is egy újat hoztam létre, vertxselfcontained néven. A program lényegi része maradjon a fenti (a Main osztállyal együtt), a pom.xml pedig a következő:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>hu.faragocsaba</groupId>
    <artifactId>vertxselfcontained</artifactId>
    <version>1.0</version>
 
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-core</artifactId>
            <version>3.9.0</version>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                     <manifestEntries>
                                        <Main-Class>hu.faragocsaba.vertxselfcontained.Main</Main-Class>
                                        <Main-Verticle>hu.faragocsaba.vertxselfcontained.HelloVerticle</Main-Verticle>
                                    </manifestEntries>
                                </transformer>
                            </transformers>
                            <artifactSet/>
                            <outputFile>${project.build.directory}/${project.artifactId}-${project.version}-fat.jar</outputFile>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
 
</project>

Nem egyszerű, az biztos! A <Main-Class> és <Main-Verticle> részeket állítsuk be megfelelően. Fordítás után a java -jar vertxselfcontained-1.0-fat.jar paranccsal indíthatjuk. Ez sajnos nem ír ki semmit (egy kiírás sort beszúrhatunk, ha ez zavar), de ha mindent jól csináltunk, elindul, és a fenti oldalt betöltve megjelenik az üdvözlő szöveg.

Web

HTTP szerver

A bevezető példa pont egy HTTP szervert tartalmazott. Hogy legyen itt is valami, de ne csak ismételjük a fentit, a fenti példát kicsit átformáztam és egy ponton tovább alakítottam. Folytathatjuk a fenti programot is, én újat hoztam létre vertxfuture néven; mindjárt látjuk, hogy miért. A verticle kódja a következő:

package hu.faragocsaba.vertxfuture;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
 
public class HelloVerticle extends AbstractVerticle {
    @Override
    public void start(Promise<Void> promise) {
        vertx
            .createHttpServer()
            .requestHandler(request -> request.response().end("Hello Vert.x world!"))
            .listen(8080, result -> {
                if (result.succeeded()) {
                    System.out.println("Success");
                    promise.complete();
                } else {
                    System.out.println("Failure: " + result.cause().getMessage());
                    promise.fail(result.cause());
                }
            });
    }
}

Mivel a Vert.x aszinkron, a legtöbb esetben Future objektumokon keresztül kommunikál, egészen pontosan annak Vert.x megvalósításán, amit Promise-nak neveztek el. A példában a listen() hívás második paraméterét illusztrálom. Valójában ott is azonnal visszatér, blokkolás mentesen, és a tényleges port figyelés egy picivel később kezdődik. A figyelés lehet sikeres vagy sikertelen, és erre a visszahívó (callback) mechanizmust használja. A példában ezt egy lambda függvénnyel adtuk meg helyben.

Milyen hibák léphetnek fel? A leggyakoribb az, hogy az előző programot elfelejtjük leállítani, majd a következő ugyanazt a portot próbálja használni, és a Address already in use: bind hibaüzenettel leáll.

A fenti példa egyben a fluent API szokásos formázását is illusztrálja: a bevezetőben megadott egysoros megoldás csak rövid esetekben használható, általában az újabb hívás a forrásban új sorba kerül.

Láthattuk a listen() függvény két formáját: az egy ill. a két paraméterest. A Vert.x rendszert egyébként a túlterhelt metódusok sokasága jellemzi.

Még ezzel a kibővített példával is a lehetőségek felszínét épp hogy karcoltuk. A HTTP beállításokat a HttpServerOptions osztály segítségével tudjuk beállítani, amit a createHttpServer() függvény paramétereként tudunk átadni (még egy túlterhelt függvény!). A témáról részletesen olvashatunk az API-ban, valamint a https://vertx.io/docs/vertx-core/java/#_writing_http_servers_and_clients oldalon.

Router

Web szerver létrehozásának van egy másik módszere, ami más megközelítést takar: az útválasztó (router) technológia alkalmazása. Ez az integrációs mintákra hajaz. Lássunk egy példát (vertxrouter)! A pom.xml-nek tartalmaznia kell a vertx-web függőséget (a vertx-core-ra ebben a példában nincs szükség):

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web</artifactId>
            <version>3.9.0</version>
        </dependency>

A routert tartalmazó verticle az alábbi:

package hu.faragocsaba.vertxrouter;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.Router;
 
public class RouterVerticle extends AbstractVerticle {
    @Override
    public void start() {
        HttpServer httpServer = vertx.createHttpServer();
        Router router = Router.router(vertx);
        router.route().handler(routingContext -> {
            HttpServerResponse response = routingContext.response();
            response.putHeader("content-type", "text/plain");
            response.end("Hello Vert.x-web world!");
        });
        httpServer.requestHandler(router).listen(8080);
    }
}

A főprogramot írjuk át megfelelően (a verticle neve RouterVerticle). Ebben a programban arra is láthatunk példát, hogy hogyan tudunk fejléc információt beállítani.

További információt erről a módszerről (ill. a vertx-web könyvtárról) a https://vertx.io/docs/vertx-web/java/ oldalon olvashatunk.

HTTP kliens

Felmerülhet a kérdés, hogy miért van egyáltalán szükség külön HTTP kliensre a Vert.x világban, miért nem jó a hagyományos. Ne feledjük: a Vert.x-ben semmi sem blokkolódhat! Márpedig egy HTTP lekérdezés ideje eléggé kiszámíthatatlan. A megoldás a következő: elküldjük a kérést, de a választ nem várjuk meg, hanem egyből visszatérünk. Ha megérkezik az eredmény, akkor arról kapunk értesítést.

Lássuk a példát (webxhttpclient)!

package hu.faragocsaba.vertxhttpclient;
 
import io.vertx.core.AbstractVerticle;
 
public class HttpClientVerticle extends AbstractVerticle {
    @Override
    public void start() {
        vertx
            .createHttpClient()
            .get(8080, "localhost", "/", httpClientResponse -> {
                System.out.println("Response: " + httpClientResponse.statusMessage());
                httpClientResponse.bodyHandler(buffer -> System.out.println("Data: " + buffer.toString()));
            })
            .end();
    }
}

A válasz egy pufferen keresztül (buffer) érkezik, amit kiírunk a képernyőre. A fő függvényben a megfelelő verticle-t kell telepítenünk:

package hu.faragocsaba.vertxhttpclient;
 
import io.vertx.core.Vertx;
 
public class Main {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(HttpClientVerticle.class.getName());
    }
}

Ha valami módon elindítjuk a fenti szerver bármelyikét, majd a klienst, akkor ha mindent jól csináltunk, a kliens konzolán a következő felirat jelenik meg:

Response: OK
Data: Hello Vert.x world!

Web kliens

A fenti megoldással van egy kis probléma: a fordító figyelmeztet arra, hogy ez a módszer elavult, bár még a hivatalos példáknak is része. A javasolt módszer a WebClient. Lássunk erre is egy példát (vertxwebclient)! Ehhez bele kell nyúlnunk a függőségekbe; a vertx-core helyére írjuk be ezt (a vertx-core-ra ebben a példában nincs szükség):

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web-client</artifactId>
            <version>3.9.0</version>
        </dependency>

A kliens kód az alábbi:

package hu.faragocsaba.vertxwebclient;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
 
public class ClientVerticle extends AbstractVerticle {
    @Override
    public void start() {
        WebClient.create(vertx)
            .get(8080, "localhost", "/")
            .send(asyncResult -> {
                if (asyncResult.succeeded()) {
                    HttpResponse<Buffer> response = asyncResult.result();
                    System.out.println("Status code: " + response.statusCode());
                    System.out.println("Body: " + response.bodyAsString());
                } else {
                    System.out.println("Error occurred: " + asyncResult.cause().getMessage());
                }
            });
    }
}

Az eredménynek ugyanannak kell lennie mint a fenti példában, de más a szintaxis. Az aszinkron módszer itt is jelen van.

A WebClient osztályról további hasznos dolgokat olvashatunk a https://vertx.io/docs/vertx-web-client/java/ oldalon.

Integrációs teszt

A fejlesztői tesztek alapvető fontosságú része az egységtesztelés, valamint külső keretrendszerek használata esetén az integrációs tesztelésre való képesség. Lássuk, hogyan működik ez a Vert.x esetében!

HTTP kliens példa

Valamelyik alap webszervert tartalmazó példát bővítsük egy egy egységteszttel! Ehhez én egy új projektet hoztam létre vertxunittest néven, melybe bemásoltam az alap példát. A teszteléshez két új függőségre is szükség lesz a vertx-unit-ra, valamint a junit-ra is. Ezt írjuk bele a pom.xml-be:

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-unit</artifactId>
            <version>3.9.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

A megfelelő teszt osztályt a @RunWith(VertxUnitRunner.class) annotációval kell ellátni. Ezt a programot a példában a neten fellelhető példák alapján hoztam létre:

package hu.faragocsaba.vertxunittest;
 
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
 
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
 
@RunWith(VertxUnitRunner.class)
public class HelloVerticleTest {
 
    private Vertx vertx;
 
    @Before
    public void setUp(TestContext context) {
        vertx = Vertx.vertx();
        vertx.deployVerticle(HelloVerticle.class.getName(), context.asyncAssertSuccess());
    }
 
    @After
    public void tearDown(TestContext context) {
        vertx.close(context.asyncAssertSuccess());
    }
 
    @Test
    public void testMyApplication(TestContext context) {
        Async async = context.async();
        vertx.createHttpClient().getNow(8080, "localhost", "/", response -> {
            response.bodyHandler(body -> {
                context.assertEquals("Hello Vert.x world!", body.toString());
                async.complete();
            });
        });
    }
 
}

Web kliens példa

A fenti példa problémája az, hogy tartalmaz elavult kódot. Átírtam (vertxunittestenhanced) úgy, hogy a WebClient osztályt használja. Ehhez már kell a vertx-web-client függőség is, de most test scope-pal.

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-unit</artifactId>
            <version>3.9.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-web-client</artifactId>
            <version>3.9.0</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

A főprogram:

package hu.faragocsaba.vertxunittestenhanced;
 
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
 
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
 
@RunWith(VertxUnitRunner.class)
public class HelloVerticleTest {
 
    private Vertx vertx;
    private WebClient webClient;
 
    @Before
    public void setUp(TestContext context) {
        vertx = Vertx.vertx();
        vertx.deployVerticle(HelloVerticle.class.getName(), context.asyncAssertSuccess());
        webClient = WebClient.create(vertx);
    }
 
    @After
    public void tearDown(TestContext context) {
        webClient.close();
        vertx.close(context.asyncAssertSuccess());
    }
 
    @Test
    public void testMyApplication(TestContext context) {
        Async async = context.async();
        webClient
            .get(8080, "localhost", "/")
            .send(asyncResult -> {
                context.assertTrue(asyncResult.succeeded());
                HttpResponse<Buffer> response = asyncResult.result();
                context.assertEquals(200, response.statusCode());
                context.assertEquals("Hello Vert.x world!", response.bodyAsString());
                async.complete();
            });
    }
 
}

Üzenetküldés

Az eddigi példák mindegyikében csak egy verticle szerepelt. Egy tipikus program természetesen többet is tartalmaz. Ezek viszont a reaktív megközelítés miatt közvetlenül nem hívhatják egymást, egymással üzenetküldő rendszeren keresztül kommunikálhatnak. Az esemény busz (event bus) a Vert.x egyik legalapvetőbb komponense. Támogatja a legfontosabb üzenetküldő modelleket, melyekre ássunk egy-egy példát!

Topic

Ez az ún. publish-subscribe modell: a kliensek feliratkoznak egy témára (topic), és mindent megkapnak, ami oda érkezik. Lássunk egy példát (vertxtopic)! A pom.xml-nek csak a vertx-core függőségeket kell tartalmaznia. A példa 3 verticle-t tartalmaz. Az egyik a fent megismert web szerver kiterjesztése:

package hu.faragocsaba.vertxtopic;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import java.util.logging.Logger;
 
public class HelloVerticle extends AbstractVerticle {
 
    private Logger logger = Logger.getLogger("HelloVerticle");
 
    @Override
    public void start() {
        logger.info("HelloVerticle started.");
        EventBus eventBus = vertx.eventBus();
        vertx.createHttpServer().requestHandler(request -> {
            eventBus.publish("hello.verticle.requests", "Request from " + request.getHeader("User-Agent"));
            request.response().end("Hello Vert.x world!");
        }).listen(8080);
    }
 
}

Vegyünk észre pár fontos dolgot:

  • Az esemény buszt az EventBus osztályon keresztül használhatjuk.
  • Az EventBus osztály publish() metódusával tudunk üzenetet küldeni, melyben megadjuk a topicot (a példában "hello.verticle.requests"; ez bármi lehet) és magát az üzenetet. A példában a HTTP fejléc információból kiolvassuk az ún. user stringet, és ezt küldjük el a megadott topicra.
  • A Vert.x tartalmaz egy beépített naplózó rendszert is, melynek használatára láthatunk ebben a pár sorban példát.

Két kliens is megkapja az üzeneteket. Az egyik közvetlenül kiírja a kapott üzenetet a konzolra, természetesen aszinkron módon (tehát a kiírás idejére sem blokkolódik a szál):

package hu.faragocsaba.vertxtopic;
 
import java.util.logging.Logger;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
 
public class AuditVerticle extends AbstractVerticle {
 
    private Logger logger = Logger.getLogger("AuditVerticle");
 
    @Override
    public void start() {
        logger.info("AuditVerticle started.");
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer("hello.verticle.requests", message -> System.out.println("[AuditVerticle] " + message.body()));
    }
 
}

A másik a naplózó rendszeren keresztül írja ki:

package hu.faragocsaba.vertxtopic;
 
import java.util.logging.Logger;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
 
public class LoggerVerticle extends AbstractVerticle {
 
    private Logger logger = Logger.getLogger("LoggerVerticle");
 
    @Override
    public void start() {
        logger.info("LoggerVerticle started.");
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer("hello.verticle.requests", message -> logger.info("[LoggerVerticle] " + message.body().toString()));
    }
 
}

A főprogram mindhárom verticle telepítését tartalmazza:

package hu.faragocsaba.vertxqueue;
 
import io.vertx.core.Vertx;
 
public class Main {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(AuditVerticle.class.getName());
        vertx.deployVerticle(LoggerVerticle.class.getName());
        vertx.deployVerticle(HelloVerticle.class.getName());
    }
}

Indítsuk el, és próbáljuk ki Firefox böngészővel. Amiatt Firefox-szal, mert a tapasztalatom szerint a többi kétszer küldi el a kérést, ez viszont csak egyszer. Eredményül azt kell kapnunk, hogy mindegyik lekérdezéskor kétszer íródik ki a böngésző információ a konzolra: egyszer közvetlenül és egyszer a naplózón keresztül.

Queue

A queue megoldásban egyetlen dolgot kell módosítanunk a fentihez képest: küldéskor a publish() helyett a send() függvényt kell meghívnunk (vertxqueue).

            eventBus.send("hello.verticle.requests", "Request from " + request.getHeader("User-Agent"));

Ha Firefox böngészőt használunk, és többször végrehajtjuk a lekérdezést, akkor egyszer az egyik, egyszer a másik írja ki.

Kérés-válasz

A fogadó válaszolni is tud a küldőnek, melyre lássunk egy példát (vertxresponse)!

A küldő most a request() hívással küldi az üzenetet, melynek harmadik paramétere a válasz feldolgozása:

package hu.faragocsaba.vertxresponse;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import java.util.logging.Logger;
 
public class HelloVerticle extends AbstractVerticle {
 
    private Logger logger = Logger.getLogger("HelloVerticle");
 
    @Override
    public void start() {
        logger.info("HelloVerticle started.");
        EventBus eventBus = vertx.eventBus();
        vertx.createHttpServer().requestHandler(request -> {
            eventBus.request(
                "hello.verticle.requests",
                "Request from " + request.getHeader("User-Agent"),
                message -> {
                    if (message.succeeded()) {
                        System.out.println("Message succeeded! " + message.result().body());
                    } else {
                        System.out.println("Message failed! " + message.cause());
                    }
                }
            );
            request.response().end("Hello Vert.x world!");
        }).listen(8080);
    }
 
}

A fogadó az alábbi módon változik:

package hu.faragocsaba.vertxresponse;
 
import java.util.logging.Logger;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
 
public class AuditVerticle extends AbstractVerticle {
 
    private Logger logger = Logger.getLogger("AuditVerticle");
 
    @Override
    public void start() {
        logger.info("AuditVerticle started.");
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer("hello.verticle.requests", message -> {
            String messageBody = message.body().toString();
            System.out.println(messageBody);
            if (messageBody.contains("Trident")) {
                message.fail(1, "This is most likely an Internet Explorer. Do not use it!");
            } else {
                message.reply("Good browser, accepted!");
            }
        });
    }
 
}

A message.reply() ill. message.fail() függvényekkel tudunk választ küldeni. Próbálgattam a böngészőket, és a legtöbb probléma még ezzel az egyszerű programmal is az Internet Explorerrel volt, melynek user agent stringje tartalmazta azt, hogy Trident. A példában hibát jelzünk Internet Explorer esetén, minden más esetben pozitív választ adunk.

Próbáljuk ki különböző böngészőkkel!

Időzítő

Aszinkron eseményként felfogható az is, hogy egy idő eltelt. A Vert.x is tartalmaz időzítőt.

Késleltetés

Lássunk először egy példát késleltetésre (vertxtimer)! A feladat az, hogy miután betöltődött a verticle, várjon egy másodpercet, mielőtt elkezdi a működését. A hagyományos megoldás a Thread.sleep(1000) lenne, de ezt a reaktív programozásban nem szabad, mivel nem tartalmazhat blokkoló utasításokat. Helyette használhatjuk a setTimer() utasítást:

package hu.faragocsaba.vertxtimer;
 
import io.vertx.core.AbstractVerticle;
 
public class TimerVerticle extends AbstractVerticle {
    @Override
    public void start() {
        vertx.setTimer(1000, id -> System.out.println("Timer " + id + " fired."));
        System.out.println("Timer started");
    }
}

Ha lefuttatjuk, először a Timer started jelenik meg, majd egy másodperc elteltével a következő: Timer 0 fired.

Periodikus időzítés

Az időzítő hasznosabb tulajdonsága a rendszeres, periodikus időzítés, tehát pl. az, hogy valami másodpercenként egyszer lefut. Az eltérés a fentihez képest mindössze annyi, hogy a setTimer() helyett a setPeriodic() függvényt hívjuk (a példaprogram neve nálam vertxperiodic):

package hu.faragocsaba.vertxperiodic;
 
import io.vertx.core.AbstractVerticle;
 
public class PeriodicVerticle extends AbstractVerticle {
 
    @Override
    public void start() {
        vertx.setPeriodic(1000, id -> System.out.println("Periodic " + id + " fired."));
        System.out.println("Periodic started");
    }
 
}

Egyszerre több feladat párhuzamosan

Lássunk most egy olyan példát, melyben több időzítő megy párhuzamosan (vertxparallel)! A webszervert is beletettem, hogy még érdekesebb legyen:

package hu.faragocsaba.vertxparallel;
 
import io.vertx.core.AbstractVerticle;
 
public class ParallelVerticle extends AbstractVerticle {
    @Override
    public void start() {
        vertx.createHttpServer().requestHandler(request -> request.response().end("Hello Vert.x world!")).listen(8080);
        vertx.setPeriodic(300, id -> System.out.println("Periodic " + id + " fired."));
        vertx.setPeriodic(1000, id -> System.out.println("Periodic " + id + " fired."));
        vertx.setTimer(5000, id -> System.out.println("Timer " + id + " fired."));
   }
}

Az alábbiak miatt hoztam létre ezt a példaprogramot:

  • Ebben láthatjuk a Vert.x, ill. általában a deklaratív módon megadott reaktív programozás előnyét: egyszerre végez több feladatot párhuzamosan anélkül hogy szálakat kellett volna létrehozni.
  • Ha a késleltetést a Thread.sleep() hívással oldottuk volna meg, akkor közben nem tudnánk mát is csinálni. A fenti példában az első 5 másodpercben is történik kiírás, ill. működik a HTTP szerver.
  • Mivel több időzítő van, különböző azonosítókat kapnak, amit futáskor láthatunk.

Blokkoló kód

Blokkoló kód jelenléte elkerülhetetlen; a reaktív programozásban nem azok teljes eliminálására, hanem azok csökkentésére és megfelelő kezelésére szorítkozunk. Az, hogy mi számít blokkolónak, nem egzakt; a Vert.x figyeli a verticle-eket, és 2 másodpercnél "szól". Ebben a szakaszban azt nézzük meg, hogy hogyan lehet "helyesen" a blokkoló kódot kezelni.

Végrehajtó komponens készítése

Az eddigi hagyományos verticle típus mellett van még egy lehetőség: lehet végrehajtó (worker), ami blokkolódhat. Ez viszont nem az esemény cikluson (event loop) belül hajtódik végre, hanem külön szálon. Ehhez létezik egy alkalmi piaci csoport (na jó ez vicc; igazából pool, aminek nem találtam megfelelő magyar megnevezést, és a szótáram első találata a nagyon sok közül ez), adott számú szállal, és azt tudják használni. Lássunk egy példát (vertxlongadder)! Ebben egy összeadót valósítunk meg: a program két számot vár, amit összead. Az összeadást hosszasan késleltetjük.

Az összeadó kódja ez:

package hu.faragocsaba.vertxlongadder;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;
 
public class AdderVerticle extends AbstractVerticle {
 
    @Override
    public void start() {
        EventBus eventBus = vertx.eventBus();
        eventBus.consumer("add", message -> {
            JsonObject jsonObject = new JsonObject(message.body().toString());
            int a = jsonObject.getInteger("a");
            int b = jsonObject.getInteger("b");
            int result = a + b;
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            message.reply(result);
            vertx.undeploy(context.deploymentID());
        });
    }
 
}

A szerver hozza létre az összeadót, és küldi el az üzenetet:

package hu.faragocsaba.vertxlongadder;
 
import java.util.HashMap;
import java.util.Map;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;
 
public class ServerVerticle extends AbstractVerticle {
 
    @Override
    public void start() {
        EventBus eventBus = vertx.eventBus();
        vertx.createHttpServer().requestHandler(request -> {
            vertx.deployVerticle(AdderVerticle.class.getName(), new DeploymentOptions().setWorker(true));
            int a = Integer.parseInt(request.getParam("a"));
            int b = Integer.parseInt(request.getParam("b"));
            Map<String, Object> addMap = new HashMap<>();
            addMap.put("a", a);
            addMap.put("b", b);
            JsonObject jsonObject = new JsonObject(addMap);
            eventBus.request(
                "add",
                jsonObject,
                response -> {
                    if (response.succeeded()) {
                        request.response().end("Calculation successful: " + a + " + " + b + " = " + response.result().body());
                    } else {
                        request.response().end("Calculation failed: " + response.cause());
                    }
                }
            );
        }).listen(8080);
    }
 
}

A főprogram csak a szervert hozza létre:

package hu.faragocsaba.vertxlongadder;
 
import io.vertx.core.Vertx;
 
public class Main {
    public static void main(String[] args) {
        Vertx vertx = Vertx.vertx();
        vertx.deployVerticle(ServerVerticle.class.getName());
    }
}

Indítsuk el, és töltsük be a localhost:8080/?a=2&b=3 oldalt.

Azt, hogy az összeadó végrehajtó módban induljon el, a new DeploymentOptions().setWorker(true) paraméterrel adtuk meg. A példában a blokkoló az 5 másodperces késletetetés. Ha hagyományosan futtatnánk, akkor kapunk pár figyelmeztetést:

WARNING: Thread Thread[vert.x-eventloop-thread-0,5,main]=Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2471 ms, time limit is 2000 ms

A példában az összeadó a végén törli magát, hogy nem foglaljon helyet. Ugyanis ez a pool-ban maradna, ami amúgy ebben a példában pont nem lenne probléma, de általában célszerű törölni.

Egyben példát láthattunk a következőkre:

  • Verticle konfigurálása.
  • HTTP kérés paraméterek kezelése.
  • JSON használata.

Fontos megértenünk, hogy itt a worker paraméter megadásával nem egyszerűen egy bosszantó figyelmeztetés elhallgattatásáról van szó, hanem ennek hatására egészen másképp fut a program: az event loop helyett egy fix méretű pool-ban.

Blokkoló szakasz végrehajtása

A valóságban annál sokkal sűrűbben váltakoznak a blokkoló és nem blokkoló kódok, hogy kényelmes legyen minden egyes blokkolóra külön verticle-t írni. A nem végrehajtó verticle-ben is lehet blokkoló kód az alábbi struktúrát alkalmazva:

package hu.faragocsaba.vertxblocking;
 
import io.vertx.core.AbstractVerticle;
 
public class BlockingVerticle extends AbstractVerticle {
    @Override
    public void start() {
        vertx.createHttpServer().requestHandler(request -> {
            vertx.<String>executeBlocking(promise -> {
                System.out.println("Calculating the result...");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Result calculated.");
                promise.complete("Hello blocking Vert.x world!");
            }, asyncResult -> {
                if (asyncResult.succeeded()) {
                    request.response().end(asyncResult.result());
                } else {
                    System.out.println("Execution failed.");
                    System.out.println(asyncResult.cause().getMessage());
                }
            });
        }).listen(8080);
    }
}

Az executeBlocking() a háttérben ugyanúgy a végrehajtó pool-ban hajtja végre, mintha végrehajtó verticle lenne. És jól látható, hogy maga a verticle ténylegesen nem blokkolódik, ugyanis az eredmény egy Promise osztályba kerül.

Adatbázis kapcsolat

Az adatbázis kapcsolatok tipikusan blokkolóak lehetnek, és reaktívvá alakításának az ötlete itt is az, hogy a lekérdezés után azonnal visszatér a program, az eredmény pedig később kerül bele egy Promise-ba. A Vert.x számos adatbázist támogat alapból, most a MySQL-t nézzük meg. Ehhez szükség van az alábbi függőségre:

        <dependency>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-mysql-client</artifactId>
            <version>3.9.0</version>
        </dependency>

A példa futtatásához hozzuk létre a teszt adatbázist az adatbázisok oldalon leírtak alapján. Az adatbázis kapcsolatot megvalósító verticle az alábbi:

package hu.faragocsaba.vertxmysql;
 
import io.vertx.core.AbstractVerticle;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
 
public class MySqlVerticle extends AbstractVerticle {
    @Override
    public void start() {
        MySQLConnectOptions connectOptions = new MySQLConnectOptions()
            .setHost("localhost")
            .setPort(3306)
            .setDatabase("testdb")
            .setUser("csaba")
            .setPassword("farago");
        PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
        MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
        client
            .query("SELECT * FROM person WHERE age < 40")
            .execute(asyncResult -> {
                if (asyncResult.succeeded()) {
                    RowSet<Row> result = asyncResult.result();
                    System.out.println("Got " + result.size() + " persons:");
                    for (Row row : result) {
                        System.out.println("- " + row.getString("name") + " (" + row.getInteger("age") + ")");
                    }
                } else {
                    System.out.println("Failure: " + asyncResult.cause().getMessage());
                }
            });
        client.close();
    }
}

További részleteket a https://vertx.io/docs/vertx-mysql-client/java/ oldalon, ill. az adott más adatbázis leírását tartalmazó oldalakon találunk.

Egyéb lehetőségek

Ez az oldal nem tartalmazza a Vert.x minden részét, számos dolog kimaradt. Ezek közül a fontosabbak:

És még nagyon sok egyéb.

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License