В этой статье рассказывается, как внедрить в свое приложение замечательный
HornetQ сервер. И это оказалось проще, чем я предполагал вначале.
Мне требовалась асинхронность обработки сообщений, потокобезопастность и возможность мониторинга, и желательно не писать много своего кода. Так что я остановился на HornetQ как уже знакомой мне реализации, которая удовлетворяем моим запросам.
Возьмем версию HornetQ 2.2.5 как последнюю стабильную на данный момент.
Зависимоcти для maven:
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-core</artifactId>
<version>2.2.5.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms</artifactId>
<version>2.2.5.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-logging</artifactId>
<version>2.2.5.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.3.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.spec.javax.jms</groupId>
<artifactId>jboss-jms-api_1.1_spec</artifactId>
<version>1.0.0.Final</version>
<scope>compile</scope>
</dependency>
Конфигурация сервера и очередей.
В classpath должны находиться файлы hornetq-configuration.xml, содержащий настройки сервера, hornetq-jms.xml с настройками очередей, и hornetq-users.xml, содержащий настройки пользователей.
hornetq-configuration.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<persistence-enabled>false</persistence-enabled>
<!-- Connectors -->
<connectors>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
</connector>
</connectors>
<acceptors>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>
</acceptors>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</configuration>
hornetq-jms.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
<retry-interval>1000</retry-interval>
<retry-interval-multiplier>1.5</retry-interval-multiplier>
<max-retry-interval>60000</max-retry-interval>
<reconnect-attempts>1000</reconnect-attempts>
</connection-factory>
<!--the queue used by the example-->
<queue name="paymentQueue">
<entry name="queue/paymentQueue"/>
</queue>
<queue name="statusQueue">
<entry name="queue/statusQueue"/>
</queue>
</configuration>
hornetq-jms.xml
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
<!-- the default user. this is used where username is null-->
<defaultuser name="guest" password="guest">
<role name="guest"/>
</defaultuser>
</configuration>
Старт встроенного сервера очень прост:
import org.hornetq.jms.server.embedded.EmbeddedJMS;
EmbeddedJMS server = new EmbeddedJMS();
server.start();
Далее создаем нужное число потребителей сообщений:
QueueConnectionFactory cf = (QueueConnectionFactory) server.lookup("ConnectionFactory");
Queue queue = (Queue) server.lookup("queue/paymentQueue");
QueueConnection conn = cf.createQueueConnection("guest", "guest");
for(int i = 0; i< Config.getConsumerCount(); i++){
Session consumerSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new PaymentSender());
}
Класс PaymentSender должен реализовывать интерфейс javax.jms.MessageListener
Запускаем обработчики:
conn.start();
Теперь нужно создать класс для отсылки сообщений.
private static QueueConnectionFactory qconFactory = null;
private static QueueConnection qcon = null;
public static void init() throws JMSException {
qconFactory = (QueueConnectionFactory) HornetqListener.server.lookup("ConnectionFactory");
qcon = qconFactory.createQueueConnection("guest", "guest");
}
public static void destroy() throws JMSException {
qcon.close();
}
public static void addPaymentToQueue(Transaction t) throws JMSException {
QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = (Queue) HornetqListener.server.lookup("queue/paymentQueue");
QueueSender qsender = qsession.createSender(queue);
qcon.start();
TextMessage msg = qsession.createTextMessage();
msg.setText(String.valueOf(t.getId()));
logger.debug("message {} putted in queue 'queue/paymentQueue'", msg.getText());
qsender.send(msg);
qsession.close();
}
HornetqListener.server - это статическая переменная, в которую мы сохранили созданный экземпляр объекта EmbeddedJMS
Мониторинг очередей реализуем через JMX
private static int max_queue = 200;
QueueView paymentqueue = jmxQuery("paymentQueue");
if(paymentqueue.getMessagesInQueue() > max_queue)
max_queue = (int) paymentqueue.getMessagesInQueue();
String percentPayment = (new Double(paymentqueue.getMessagesInQueue()))/max_queue * 100d + "%";
private QueueView jmxQuery(String queueName) throws Exception {
MBeanServer mbeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer();
QueueView queueView = new QueueView();
ObjectName name = new ObjectName("org.hornetq:module=JMS,type=Queue,name=\""+ queueName +"\"");
queueView.setConsumersCount((Integer) mbeanServer.getAttribute(name, "ConsumerCount"));
queueView.setMessagesDelivering((Integer) mbeanServer.getAttribute(name, "DeliveringCount"));
queueView.setMessagesInQueue((Long) mbeanServer.getAttribute(name, "MessageCount"));
queueView.setMessagesAdded((Long) mbeanServer.getAttribute(name, "MessagesAdded"));
queueView.setName((String) mbeanServer.getAttribute(name, "Name"));
name = new ObjectName("org.hornetq:module=Core,type=Server");
queueView.setConnectionCount((Integer) mbeanServer.getAttribute(name, "ConnectionCount"));
queueView.setThreadPoolMaxSize((Integer) mbeanServer.getAttribute(name, "ThreadPoolMaxSize"));
queueView.setServerVersion((String) mbeanServer.getAttribute(name, "Version"));
return queueView;
}
Конфигурационные файлы взяты
отсюда