Kategória: Java külső könyvtárak.
JMS
A programozás oldalon vázolt elv, sőt, a Java-ban maga az interfész is szabványos. Ez utóbbit a Java üzenetkezelő szolgáltatás (Java Messaging Service, JMS) definiálja, és a Java szabvány része. Viszont ahogyan többféle adatbázis kezelő rendszer van, és nincs egy kitüntetett, ugyanúgy üzenetorientált köztes rétegből is több megvalósítás van. Ebben a szakaszban az ActiveMQ alapjaival ismerkedünk meg. Másik népszerű megoldás a JBoss által gondozott HornetQ, ami viszont az ActiveMQ részévé vált, Artemis néven.
Lássuk először azt, hogy programból hogyan tudjuk indítani a brókert! Ehhez az alábbi függőségekre van szükségünk a pom.xml fájlban:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
(A Jackson nélkül nálam kivételt dobott induláskor.) A program lényege mindössze 3 sor:
import org.apache.activemq.broker.BrokerService;
public class ActiveMqEmbeddedBroker {
public static void main(String[] args) throws Exception {
BrokerService broker = new BrokerService();
broker.addConnector("tcp://localhost:61616");
broker.start();
}
}
A másik lehetőség az önálló futtatás. Ehhez töltsük le az ActiveMQ-t erről az oldalról: http://activemq.apache.org/components/classic/download/, és csomagoljuk ki egy tetszőleges könyvtárba. A bin könyvtárát tegyük bele a PATH környezeti változóba, majd indítsuk el a programot a következőképpen:
activemq start
Az ActiveMQ alapértelmezett portja a 61616, amit megadtunk a fenti programban, ill. az önállóan futó mód ezt ki is naplózza.
A fogadó ill. a küldő kódjához adjuk hozzá a org.apache.activemq % activemq-all % 5.15.11 függőséget. (Ott a Jackson nem kell. Egyébként létezik külön csak a küldő ill. külön a fogadó számára létrehozott csomag; mi most azt használjuk, amelyben minden benne van).
Az első példa egy témára (topic) ír szöveget, mégpedig egymás után tízet:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMqTopicProducer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("My Topic");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (char c = 'A'; c <= 'I'; c++) {
String text = "Hello from Topic Producer! " + c;
TextMessage message = session.createTextMessage(text);
producer.send(message);
}
System.out.println("Message sent");
session.close();
connection.close();
}
}
A kódból látható, hogy csak az ActiveMQConnectionFactory az ActiveMQ specifikus, minden más általános. A példa jóval egyszerűbb mint ahogy azt egy valódi program esetén meg kell valósítani: ott szükség van megfelelő kivételkezelésre, tranzakció kezelésre, ill. célszerű névszolgáltatást (JNDI) használni az üzenetkezelő rendszer megtalálásához, vagy legalább paraméter fájlból beolvasva. A kódban megadtuk még a kézbesítés módját, ami lehet PERSISTENT (amikor nem szabad, hogy elvesszen) vagy NON_PERSISTENT (ebben az esetben nem baj, ha elveszik).
A fogadó 3 szálon olvassa ki az üzeneteket:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMqTopicConsumer extends Thread {
public static void main(String[] args) throws JMSException {
for (int i = 1; i <= 3; i++) {
new ActiveMqTopicConsumer(i).start();
}
}
private int n;
public ActiveMqTopicConsumer(int n) {
this.n = n;
}
@Override
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("My Topic");
MessageConsumer consumer = session.createConsumer(destination);
System.out.println("[" + n + "] Waiting for topic messages...");
while (true) {
Message message = consumer.receive();
TextMessage textMessage = (TextMessage) message;
System.out.println("[" + n + "] Received: " + textMessage.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
Először a fogadót indítsuk el, utána a küldőt. Látni fogjuk, hogy mindegyik szál feldolgozza mindegyik üzenetet, az üzenetek sorrendje viszont csak szálon belül garantált:
[2] Waiting for topic messages...
[3] Waiting for topic messages...
[1] Waiting for topic messages...
[3] Received: Hello from Topic Producer! A
[2] Received: Hello from Topic Producer! A
[3] Received: Hello from Topic Producer! B
[3] Received: Hello from Topic Producer! C
[3] Received: Hello from Topic Producer! D
[3] Received: Hello from Topic Producer! E
[3] Received: Hello from Topic Producer! F
[3] Received: Hello from Topic Producer! G
[3] Received: Hello from Topic Producer! H
[3] Received: Hello from Topic Producer! I
[1] Received: Hello from Topic Producer! A
[1] Received: Hello from Topic Producer! B
[2] Received: Hello from Topic Producer! B
[1] Received: Hello from Topic Producer! C
[2] Received: Hello from Topic Producer! C
[1] Received: Hello from Topic Producer! D
[2] Received: Hello from Topic Producer! D
[1] Received: Hello from Topic Producer! E
[2] Received: Hello from Topic Producer! E
[1] Received: Hello from Topic Producer! F
[2] Received: Hello from Topic Producer! F
[1] Received: Hello from Topic Producer! G
[1] Received: Hello from Topic Producer! H
[2] Received: Hello from Topic Producer! G
[1] Received: Hello from Topic Producer! I
[2] Received: Hello from Topic Producer! H
[2] Received: Hello from Topic Producer! I
A sorba (queue) küldő komponens alig különbözik a fentitől:
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMqQueueProducer {
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("My Queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (char c = 'A'; c <= 'I'; c++) {
String text = "Hello from Queue Producer! " + c;
TextMessage message = session.createTextMessage(text);
producer.send(message);
}
System.out.println("Messages sent");
session.close();
connection.close();
}
}
A feliratkozás jellege itt minden esetben tartós, hiszen egy üzenetet pontosan egyszer dolgoznak fel. A fogadó itt is nagyon hasonlíthat a fentire, elég csak a createTopic()-ot createQueue()-re átírni, most viszont egy kicsit jobban átdolgozzuk, hogy a MessageListener interfésszel is megismerkedjünk (ami egyébként a fenti direkt olvasásnál jóval gyakoribb):
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMqQueueConsumer extends Thread implements MessageListener {
public static void main(String[] args) throws JMSException {
for (int i = 1; i <= 3; i++) {
new ActiveMqQueueConsumer(i).start();
}
}
private int n;
public ActiveMqQueueConsumer(int n) {
this.n = n;
}
@Override
public void run() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("My Queue");
MessageConsumer consumer = session.createConsumer(destination);
System.out.println("[" + n + "] Waiting for queue messages...");
consumer.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("[" + n + "] Message received: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
A lefutásnál mindegyik üzenetet pontosan egy fogadó dolgozza fel, de azt, hogy pontosan melyik, nem garantált. A lenti példában a 3-as sorszámú szál szóhoz sem jutott:
[2] Waiting for queue messages...
[1] Waiting for queue messages...
[3] Waiting for queue messages...
[1] Message received: Hello from Queue Producer! B
[2] Message received: Hello from Queue Producer! A
[2] Message received: Hello from Queue Producer! C
[1] Message received: Hello from Queue Producer! D
[1] Message received: Hello from Queue Producer! F
[2] Message received: Hello from Queue Producer! E
[1] Message received: Hello from Queue Producer! H
[2] Message received: Hello from Queue Producer! G
[2] Message received: Hello from Queue Producer! I
További információk:
- https://activemq.apache.org/, https://activemq.apache.org/hello-world
- https://www.tomitribe.com/blog/5-minutes-or-less-activemq-with-jms-queues-and-topics/
- https://codenotfound.com/jms-hello-world-activemq-maven.html
- https://javahonk.com/activemq-hello-world/
- https://examples.javacodegeeks.com/enterprise-java/jms/
- https://www.tomitribe.com/blog/5-minutes-or-less-activemq-with-jms-queues-and-topics/
- http://activemq.apache.org/components/artemis/ (ez az egykori HornetQ mostani oldala)
- https://www.javatpoint.com/jms-tutorial (általános JMS áttekintés, ami példaként a Glassfish beépített brókerét használja)
Útválasztás
Az Apache Camel egy szabály alapú útválasztós motor. Ezzel a beérkező adatokat tudjuk feldolgozni és továbbítani. A Camel számos komponenst támogat; a teljes listát itt találjuk: https://camel.apache.org/components/latest/. Néhány példa:
- file: ezzel egy adott könyvtárat lehet megadni, és a fájlokat olvassa be.
- ftp, sftp: FTP szerverről történő olvasás ill. oda történő feltöltés.
- jms: segítségével JMS queue-król és topicokról lehet olvasni ill. írni rá.
- activemq: ha specifikusan az ActiveMQ-t szeretnénk használni.
- kafka: adatkapcsolat a Kafka üzenetkezelővel.
- direct: ezzel Camelen belül tudja egyik komponens küldeni a másiknak az adatot.
- timer: időzítés, pl. másodpercenkénti üzenet indítás.
- quartz: hasonló mint az előző, de sokkal szofisztikáltabb módon.
- mock: egységtesztelésnél hasznos.
Az üzenet formája az Exchange. Ennek van bementi és kimeneti fejléce ill. törzse. Ha például fájlról van szó, akkor a fájl neve egy bemeneti fejléc információ, melyet a következőképpen tudunk lekérdezni: exchange.getIn().getHeader(Exchange.FILE_NAME, String.class). Erről bővebben itt olvashatunk: https://camel.apache.org/manual/latest/faq/using-getin-or-getout-methods-on-exchange.html.
A Camel megvalósítja az nagyvállalati integrációs mintákat (Enterprise Integration Patterns, EIP), melyről itt olvashatunk: https://camel.apache.org/manual/latest/enterprise-integration-patterns.html. Ennek segítségével számos műveletet végre lehet hajtani, melyek közül ismét csak párat elmítünk:
- process: ennek segítségével fel tudjuk dolgozni, át tudjuk alakítani az üzenetet.
- filter: ezzel tudjuk megszűrni az üzeneteket, akár XPath kifejezések, akár logikai értéket visszaadó függvény segítségével.
- delay: ennek hatására valamennyi ideig vár mielőtt folytatja.
- choice…when…otherwise: a klasszikus switch-case-default Cameles formája.
- multicast: több végpontnak tudjuk elküldeni az üzenetet.
- log: naplózni tudjuk az üzenetet a segítségével
- loop: az üzenet bizonyos részein végig tudunk iterálni. Pl. végig tudunk lépkedni a fejlécben található kulcs-érték párokon, vagy akár fix számú lépést is tehetünk. Ezzel egyúttal fel is bontottuk az üzenetet több részre.
- split: az üzenetet több részre tudjuk bontani, pl. akkor, ha az üzenet XML, és egy tömböt tartalmaz.
- aggregate: segítségével több üzenetet tudunk egybe fűzni. Logikailag az előző inverze.
- bean: egy osztály eljárását tudjuk segítségével meghívni.
Az útválasztás szabályait többféleképpen megadhatjuk. A teljes listát itt találjuk: https://camel.apache.org/manual/latest/languages.html. Néhány példa:
- Java: magában a kódban adjuk meg a szabályokat.
- Spring: a szabályokat XML fájlokban írjuk le. Így ez egy nem invazív megoldás, viszont XML-t kell hozzá szerkeszteni.
- XPath: a szabályokat magát nem, a kiválasztást viszont megadhatjuk a segítségével.
A következő példa Java-ban definiálja a szabályt, a kimenet és a bemenet is könyvtár, és közben egy átalakítást hajt végre: a fájl nevét átnevezi úgy, hogy hozzácsapja az aktuális időbélyeget. A pom.xml-hez adjuk hozzá a következő függőséget:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>3.0.0</version>
</dependency>
A kód az alábbi:
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
class FileProcessor implements Processor {
public void process(Exchange exchange) throws Exception {
String originalFileName = exchange.getIn().getHeader(Exchange.FILE_NAME, String.class);
Date date = new Date();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
String changedFileName = dateFormat.format(date) + " " + originalFileName;
exchange.getIn().setHeader(Exchange.FILE_NAME, changedFileName);
}
}
public class CamelExample {
public static void main(String[] args) throws Exception {
CamelContext camelContext = new DefaultCamelContext();
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("file://source?delete=true")
.process(new FileProcessor())
.to("file://destination");
}
});
camelContext.start();
Thread.sleep(10000);
camelContext.stop();
camelContext.close();
}
}
A futtatáshoz hozzuk létre a source és destination könyvtárakat, a source-ba tegyük néhány példafájlt, majd futtassuk le a programot. Ha minden jól ment, akkor a forrás könyvtárból eltűntek a fájlok (ld. delete=true), a cél könyvtárban pedig megváltozott fájlnévvel megjelentek. A példában is látható, hogy a Camel a fluent API megközelítést használja.
További források:
- https://camel.apache.org/, https://camel.apache.org/manual/latest/getting-started.html (rövid bevezető)
- https://www.tutorialspoint.com/apache_camel/
- https://www.baeldung.com/apache-camel-intro (jó áttekintő, de bár ne lenne benne az a sok easy meg simple, mert a Camel szerintem se nem könnyű se nem egyszerű)
- https://www.toptal.com/apache/apache-camel-tutorial
- https://www.javainuse.com/camel