Мне требовалась асинхронность обработки сообщений, потокобезопастность и возможность мониторинга, и желательно не писать много своего кода. Так что я остановился на HornetQ как уже знакомой мне реализации, которая удовлетворяем моим запросам.
Возьмем версию HornetQ 2.2.5 как последнюю стабильную на данный момент.
Зависимоcти для maven:
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-core</artifactId>
<version>2.2.5.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-jms</artifactId>
<version>2.2.5.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.hornetq</groupId>
<artifactId>hornetq-logging</artifactId>
<version>2.2.5.Final</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
<version>3.2.3.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.spec.javax.jms</groupId>
<artifactId>jboss-jms-api_1.1_spec</artifactId>
<version>1.0.0.Final</version>
<scope>compile</scope>
</dependency>
Конфигурация сервера и очередей.
В classpath должны находиться файлы hornetq-configuration.xml, содержащий настройки сервера, hornetq-jms.xml с настройками очередей, и hornetq-users.xml, содержащий настройки пользователей.
hornetq-configuration.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<persistence-enabled>false</persistence-enabled>
<!-- Connectors -->
<connectors>
<connector name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
</connector>
</connectors>
<acceptors>
<acceptor name="in-vm">
<factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>
</acceptors>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="#">
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
</configuration>
hornetq-jms.xml
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
<consumer-window-size>0</consumer-window-size>
<retry-interval>1000</retry-interval>
<retry-interval-multiplier>1.5</retry-interval-multiplier>
<max-retry-interval>60000</max-retry-interval>
<reconnect-attempts>1000</reconnect-attempts>
</connection-factory>
<!--the queue used by the example-->
<queue name="paymentQueue">
<entry name="queue/paymentQueue"/>
</queue>
<queue name="statusQueue">
<entry name="queue/statusQueue"/>
</queue>
</configuration>
hornetq-jms.xml
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
<!-- the default user. this is used where username is null-->
<defaultuser name="guest" password="guest">
<role name="guest"/>
</defaultuser>
</configuration>
Старт встроенного сервера очень прост:
import org.hornetq.jms.server.embedded.EmbeddedJMS; EmbeddedJMS server = new EmbeddedJMS(); server.start();Далее создаем нужное число потребителей сообщений:
QueueConnectionFactory cf = (QueueConnectionFactory) server.lookup("ConnectionFactory"); Queue queue = (Queue) server.lookup("queue/paymentQueue"); QueueConnection conn = cf.createQueueConnection("guest", "guest"); for(int i = 0; i< Config.getConsumerCount(); i++){ Session consumerSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); MessageConsumer consumer = consumerSession.createConsumer(queue); consumer.setMessageListener(new PaymentSender()); }Класс PaymentSender должен реализовывать интерфейс javax.jms.MessageListener
Запускаем обработчики:
conn.start();Теперь нужно создать класс для отсылки сообщений.
private static QueueConnectionFactory qconFactory = null; private static QueueConnection qcon = null; public static void init() throws JMSException { qconFactory = (QueueConnectionFactory) HornetqListener.server.lookup("ConnectionFactory"); qcon = qconFactory.createQueueConnection("guest", "guest"); } public static void destroy() throws JMSException { qcon.close(); } public static void addPaymentToQueue(Transaction t) throws JMSException { QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = (Queue) HornetqListener.server.lookup("queue/paymentQueue"); QueueSender qsender = qsession.createSender(queue); qcon.start(); TextMessage msg = qsession.createTextMessage(); msg.setText(String.valueOf(t.getId())); logger.debug("message {} putted in queue 'queue/paymentQueue'", msg.getText()); qsender.send(msg); qsession.close(); }HornetqListener.server - это статическая переменная, в которую мы сохранили созданный экземпляр объекта EmbeddedJMS
Мониторинг очередей реализуем через JMX
private static int max_queue = 200; QueueView paymentqueue = jmxQuery("paymentQueue"); if(paymentqueue.getMessagesInQueue() > max_queue) max_queue = (int) paymentqueue.getMessagesInQueue(); String percentPayment = (new Double(paymentqueue.getMessagesInQueue()))/max_queue * 100d + "%"; private QueueView jmxQuery(String queueName) throws Exception { MBeanServer mbeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer(); QueueView queueView = new QueueView(); ObjectName name = new ObjectName("org.hornetq:module=JMS,type=Queue,name=\""+ queueName +"\""); queueView.setConsumersCount((Integer) mbeanServer.getAttribute(name, "ConsumerCount")); queueView.setMessagesDelivering((Integer) mbeanServer.getAttribute(name, "DeliveringCount")); queueView.setMessagesInQueue((Long) mbeanServer.getAttribute(name, "MessageCount")); queueView.setMessagesAdded((Long) mbeanServer.getAttribute(name, "MessagesAdded")); queueView.setName((String) mbeanServer.getAttribute(name, "Name")); name = new ObjectName("org.hornetq:module=Core,type=Server"); queueView.setConnectionCount((Integer) mbeanServer.getAttribute(name, "ConnectionCount")); queueView.setThreadPoolMaxSize((Integer) mbeanServer.getAttribute(name, "ThreadPoolMaxSize")); queueView.setServerVersion((String) mbeanServer.getAttribute(name, "Version")); return queueView; }Конфигурационные файлы взяты отсюда
Комментариев нет:
Отправить комментарий