Üzenetkezelés Javában

Kategória: Java külső könyvtárak.

Table of Contents

JMS

A programozás oldalon vázolt elv, sőt, a Java-ban maga az interfész is szabványos. Ez utóbbit a Java üzenetkezelő szolgáltatás (Java Messaging Service, JMS) definiálja, és a Java szabvány része. Viszont ahogyan többféle adatbázis kezelő rendszer van, és nincs egy kitüntetett, ugyanúgy üzenetorientált köztes rétegből is több megvalósítás van. Ebben a szakaszban az ActiveMQ alapjaival ismerkedünk meg. Másik népszerű megoldás a JBoss által gondozott HornetQ, ami viszont az ActiveMQ részévé vált, Artemis néven.

Lássuk először azt, hogy programból hogyan tudjuk indítani a brókert! Ehhez az alábbi függőségekre van szükségünk a pom.xml fájlban:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.10.1</version>
        </dependency>

(A Jackson nélkül nálam kivételt dobott induláskor.) A program lényege mindössze 3 sor:

import org.apache.activemq.broker.BrokerService;

public class ActiveMqEmbeddedBroker {
    public static void main(String[] args) throws Exception {
        BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:61616");
        broker.start();
    }
}

A másik lehetőség az önálló futtatás. Ehhez töltsük le az ActiveMQ-t erről az oldalról: http://activemq.apache.org/components/classic/download/, és csomagoljuk ki egy tetszőleges könyvtárba. A bin könyvtárát tegyük bele a PATH környezeti változóba, majd indítsuk el a programot a következőképpen:

activemq start

Az ActiveMQ alapértelmezett portja a 61616, amit megadtunk a fenti programban, ill. az önállóan futó mód ezt ki is naplózza.

A fogadó ill. a küldő kódjához adjuk hozzá a org.apache.activemq % activemq-all % 5.15.11 függőséget. (Ott a Jackson nem kell. Egyébként létezik külön csak a küldő ill. külön a fogadó számára létrehozott csomag; mi most azt használjuk, amelyben minden benne van).

Az első példa egy témára (topic) ír szöveget, mégpedig egymás után tízet:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqTopicProducer {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("My Topic");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (char c = 'A'; c <= 'I'; c++) {
            String text = "Hello from Topic Producer! " + c;
            TextMessage message = session.createTextMessage(text);
            producer.send(message);
        }
        System.out.println("Message sent");
        session.close();
        connection.close();
    }
}

A kódból látható, hogy csak az ActiveMQConnectionFactory az ActiveMQ specifikus, minden más általános. A példa jóval egyszerűbb mint ahogy azt egy valódi program esetén meg kell valósítani: ott szükség van megfelelő kivételkezelésre, tranzakció kezelésre, ill. célszerű névszolgáltatást (JNDI) használni az üzenetkezelő rendszer megtalálásához, vagy legalább paraméter fájlból beolvasva. A kódban megadtuk még a kézbesítés módját, ami lehet PERSISTENT (amikor nem szabad, hogy elvesszen) vagy NON_PERSISTENT (ebben az esetben nem baj, ha elveszik).

A fogadó 3 szálon olvassa ki az üzeneteket:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqTopicConsumer extends Thread {
    public static void main(String[] args) throws JMSException {
        for (int i = 1; i <= 3; i++) {
            new ActiveMqTopicConsumer(i).start();
        }
    }

    private int n;

    public ActiveMqTopicConsumer(int n) {
        this.n = n;
    }

    @Override
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createTopic("My Topic");
            MessageConsumer consumer = session.createConsumer(destination);
            System.out.println("[" + n + "] Waiting for topic messages...");
            while (true) {
                Message message = consumer.receive();
                TextMessage textMessage = (TextMessage) message;
                System.out.println("[" + n + "] Received: " + textMessage.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Először a fogadót indítsuk el, utána a küldőt. Látni fogjuk, hogy mindegyik szál feldolgozza mindegyik üzenetet, az üzenetek sorrendje viszont csak szálon belül garantált:

[2] Waiting for topic messages...
[3] Waiting for topic messages...
[1] Waiting for topic messages...
[3] Received: Hello from Topic Producer! A
[2] Received: Hello from Topic Producer! A
[3] Received: Hello from Topic Producer! B
[3] Received: Hello from Topic Producer! C
[3] Received: Hello from Topic Producer! D
[3] Received: Hello from Topic Producer! E
[3] Received: Hello from Topic Producer! F
[3] Received: Hello from Topic Producer! G
[3] Received: Hello from Topic Producer! H
[3] Received: Hello from Topic Producer! I
[1] Received: Hello from Topic Producer! A
[1] Received: Hello from Topic Producer! B
[2] Received: Hello from Topic Producer! B
[1] Received: Hello from Topic Producer! C
[2] Received: Hello from Topic Producer! C
[1] Received: Hello from Topic Producer! D
[2] Received: Hello from Topic Producer! D
[1] Received: Hello from Topic Producer! E
[2] Received: Hello from Topic Producer! E
[1] Received: Hello from Topic Producer! F
[2] Received: Hello from Topic Producer! F
[1] Received: Hello from Topic Producer! G
[1] Received: Hello from Topic Producer! H
[2] Received: Hello from Topic Producer! G
[1] Received: Hello from Topic Producer! I
[2] Received: Hello from Topic Producer! H
[2] Received: Hello from Topic Producer! I

A sorba (queue) küldő komponens alig különbözik a fentitől:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqQueueProducer {
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("My Queue");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for (char c = 'A'; c <= 'I'; c++) {
            String text = "Hello from Queue Producer! " + c;
            TextMessage message = session.createTextMessage(text);
            producer.send(message);
        }
        System.out.println("Messages sent");
        session.close();
        connection.close();
    }
}

A feliratkozás jellege itt minden esetben tartós, hiszen egy üzenetet pontosan egyszer dolgoznak fel. A fogadó itt is nagyon hasonlíthat a fentire, elég csak a createTopic()-ot createQueue()-re átírni, most viszont egy kicsit jobban átdolgozzuk, hogy a MessageListener interfésszel is megismerkedjünk (ami egyébként a fenti direkt olvasásnál jóval gyakoribb):

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMqQueueConsumer extends Thread implements MessageListener {
    public static void main(String[] args) throws JMSException {
        for (int i = 1; i <= 3; i++) {
            new ActiveMqQueueConsumer(i).start();
        }
    }

    private int n;

    public ActiveMqQueueConsumer(int n) {
        this.n = n;
    }

    @Override
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("My Queue");
            MessageConsumer consumer = session.createConsumer(destination);
            System.out.println("[" + n + "] Waiting for queue messages...");
            consumer.setMessageListener(this);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("[" + n + "] Message received: " + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

A lefutásnál mindegyik üzenetet pontosan egy fogadó dolgozza fel, de azt, hogy pontosan melyik, nem garantált. A lenti példában a 3-as sorszámú szál szóhoz sem jutott:

[2] Waiting for queue messages...
[1] Waiting for queue messages...
[3] Waiting for queue messages...
[1] Message received: Hello from Queue Producer! B
[2] Message received: Hello from Queue Producer! A
[2] Message received: Hello from Queue Producer! C
[1] Message received: Hello from Queue Producer! D
[1] Message received: Hello from Queue Producer! F
[2] Message received: Hello from Queue Producer! E
[1] Message received: Hello from Queue Producer! H
[2] Message received: Hello from Queue Producer! G
[2] Message received: Hello from Queue Producer! I

További információk:

Útválasztás

Az Apache Camel egy szabály alapú útválasztós motor. Ezzel a beérkező adatokat tudjuk feldolgozni és továbbítani. A Camel számos komponenst támogat; a teljes listát itt találjuk: https://camel.apache.org/components/latest/. Néhány példa:

  • file: ezzel egy adott könyvtárat lehet megadni, és a fájlokat olvassa be.
  • ftp, sftp: FTP szerverről történő olvasás ill. oda történő feltöltés.
  • jms: segítségével JMS queue-król és topicokról lehet olvasni ill. írni rá.
  • activemq: ha specifikusan az ActiveMQ-t szeretnénk használni.
  • kafka: adatkapcsolat a Kafka üzenetkezelővel.
  • direct: ezzel Camelen belül tudja egyik komponens küldeni a másiknak az adatot.
  • timer: időzítés, pl. másodpercenkénti üzenet indítás.
  • quartz: hasonló mint az előző, de sokkal szofisztikáltabb módon.
  • mock: egységtesztelésnél hasznos.

Az üzenet formája az Exchange. Ennek van bementi és kimeneti fejléce ill. törzse. Ha például fájlról van szó, akkor a fájl neve egy bemeneti fejléc információ, melyet a következőképpen tudunk lekérdezni: exchange.getIn().getHeader(Exchange.FILE_NAME, String.class). Erről bővebben itt olvashatunk: https://camel.apache.org/manual/latest/faq/using-getin-or-getout-methods-on-exchange.html.

A Camel megvalósítja az nagyvállalati integrációs mintákat (Enterprise Integration Patterns, EIP), melyről itt olvashatunk: https://camel.apache.org/manual/latest/enterprise-integration-patterns.html. Ennek segítségével számos műveletet végre lehet hajtani, melyek közül ismét csak párat elmítünk:

  • process: ennek segítségével fel tudjuk dolgozni, át tudjuk alakítani az üzenetet.
  • filter: ezzel tudjuk megszűrni az üzeneteket, akár XPath kifejezések, akár logikai értéket visszaadó függvény segítségével.
  • delay: ennek hatására valamennyi ideig vár mielőtt folytatja.
  • choice…when…otherwise: a klasszikus switch-case-default Cameles formája.
  • multicast: több végpontnak tudjuk elküldeni az üzenetet.
  • log: naplózni tudjuk az üzenetet a segítségével
  • loop: az üzenet bizonyos részein végig tudunk iterálni. Pl. végig tudunk lépkedni a fejlécben található kulcs-érték párokon, vagy akár fix számú lépést is tehetünk. Ezzel egyúttal fel is bontottuk az üzenetet több részre.
  • split: az üzenetet több részre tudjuk bontani, pl. akkor, ha az üzenet XML, és egy tömböt tartalmaz.
  • aggregate: segítségével több üzenetet tudunk egybe fűzni. Logikailag az előző inverze.
  • bean: egy osztály eljárását tudjuk segítségével meghívni.

Az útválasztás szabályait többféleképpen megadhatjuk. A teljes listát itt találjuk: https://camel.apache.org/manual/latest/languages.html. Néhány példa:

  • Java: magában a kódban adjuk meg a szabályokat.
  • Spring: a szabályokat XML fájlokban írjuk le. Így ez egy nem invazív megoldás, viszont XML-t kell hozzá szerkeszteni.
  • XPath: a szabályokat magát nem, a kiválasztást viszont megadhatjuk a segítségével.

A következő példa Java-ban definiálja a szabályt, a kimenet és a bemenet is könyvtár, és közben egy átalakítást hajt végre: a fájl nevét átnevezi úgy, hogy hozzácsapja az aktuális időbélyeget. A pom.xml-hez adjuk hozzá a következő függőséget:

        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>3.0.0</version>
        </dependency>

A kód az alábbi:

import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

class FileProcessor implements Processor {
    public void process(Exchange exchange) throws Exception {
        String originalFileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
        Date date = new Date();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
        String changedFileName = dateFormat.format(date) + " " + originalFileName;
        exchange.getIn().setHeader(Exchange.FILE_NAME, changedFileName);
    }
}

public class CamelExample {
    public static void main(String[] args) throws Exception {
        CamelContext camelContext = new DefaultCamelContext();
        camelContext.addRoutes(new RouteBuilder() {
          @Override
          public void configure() throws Exception {
            from("file://source?delete=true")
                .process(new FileProcessor())
                .to("file://destination");
          }
        });
        camelContext.start();
        Thread.sleep(10000);
        camelContext.stop();
        camelContext.close();
    }
}

A futtatáshoz hozzuk létre a source és destination könyvtárakat, a source-ba tegyük néhány példafájlt, majd futtassuk le a programot. Ha minden jól ment, akkor a forrás könyvtárból eltűntek a fájlok (ld. delete=true), a cél könyvtárban pedig megváltozott fájlnévvel megjelentek. A példában is látható, hogy a Camel a fluent API megközelítést használja.

További források:

Unless otherwise stated, the content of this page is licensed under Creative Commons Attribution-ShareAlike 3.0 License