В этой статье описывается настройка очередей в JBoss AS 7.1.1
Последний раз мне приходилось работать с JBoss 5 и с тех пор есть определенный прогресс. Сервер запускается и останавливается быстро, появились средства конфигурации и мониторинга. В 7й ветке есть тенденция отказа от отдельных xml файлов для настроек очередей и источников данных, и рекомендованный способ - добавлять их непосредственно в конфигурационный файл сервера. Хотя старый способ все еще работает, добавленные таким образом очереди не будут видны в админской консоли как очереди и по ним нельзя будет посмотреть статистику.
Итак, для начала нам нужно просто подключить поддержку очередей. По умолчанию в standalone.xml она не подключена. Для включения поддержки копируем из standalone-full.xml все, связанное с messaging, а именно:
... <extension module="org.jboss.as.messaging"/> ... <subsystem xmlns="urn:jboss:domain:messaging:1.1"> <hornetq-server> <persistence-enabled>true</persistence-enabled> <journal-file-size>102400</journal-file-size> <journal-min-files>2</journal-min-files> <connectors> <netty-connector name="netty" socket-binding="messaging"/> <netty-connector name="netty-throughput" socket-binding="messaging-throughput"> <param key="batch-delay" value="50"/> </netty-connector> <in-vm-connector name="in-vm" server-id="0"/> </connectors> <acceptors> <netty-acceptor name="netty" socket-binding="messaging"/> <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput"> <param key="batch-delay" value="50"/> <param key="direct-deliver" value="false"/> </netty-acceptor> <in-vm-acceptor name="in-vm" server-id="0"/> </acceptors> <security-settings> <security-setting match="#"> <permission type="send" roles="guest"/> <permission type="consume" roles="guest"/> <permission type="createNonDurableQueue" roles="guest"/> <permission type="deleteNonDurableQueue" roles="guest"/> </security-setting> </security-settings> <address-settings> <address-setting match="#"> <dead-letter-address>jms.queue.DLQ</dead-letter-address> <expiry-address>jms.queue.ExpiryQueue</expiry-address> <redelivery-delay>1000</redelivery-delay> <max-size-bytes>10485760</max-size-bytes> <address-full-policy>BLOCK</address-full-policy> <message-counter-history-day-limit>10</message-counter-history-day-limit> </address-setting> </address-settings> <jms-connection-factories> <connection-factory name="InVmConnectionFactory"> <connectors> <connector-ref connector-name="in-vm"/> </connectors> <entries> <entry name="java:/ConnectionFactory"/> </entries> </connection-factory> <connection-factory name="RemoteConnectionFactory"> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="RemoteConnectionFactory"/> <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/> </entries> </connection-factory> <pooled-connection-factory name="hornetq-ra"> <transaction mode="xa"/> <connectors> <connector-ref connector-name="in-vm"/> </connectors> <entries> <entry name="java:/JmsXA"/> </entries> </pooled-connection-factory> </jms-connection-factories> <jms-destinations> <jms-queue name="testQueue"> <entry name="queue/test"/> <entry name="java:jboss/exported/jms/queue/test"/> </jms-queue> <jms-topic name="testTopic"> <entry name="topic/test"/> <entry name="java:jboss/exported/jms/topic/test"/> </jms-topic> </jms-destinations> </hornetq-server> ... <socket-binding name="messaging" port="5445"/> <socket-binding name="messaging-throughput" port="5455"/>
После этого можно добавить непосредственно сами очереди:
<jms-destinations> <jms-queue name="defaultQueue"> <entry name="queue/default"> <entry name="java:jboss/defaultQueue"> </entry></entry></jms-queue> <jms-queue name="beelineQueue"> <entry name="queue/beeline"> <entry name="java:jboss/beelineQueue"> </entry></entry></jms-queue> <jms-queue name="mtsQueue"> <entry name="queue/mts"> <entry name="java:jboss/mtsQueue"> </entry></entry></jms-queue> <jms-queue name="mfQueue"> <entry name="queue/mf"> <entry name="java:jboss/mfQueue"> </entry></entry></jms-queue> <jms-queue name="ExpiryQueue"> <entry name="queue/expiry"> <entry name="java:jboss/ExpiryQueue"> </entry></entry></jms-queue> <jms-queue name="DLQ"> <entry name="queue/dead"> <entry name="java:jboss/deadQueue"> </entry></entry></jms-queue> </jms-destinations>
Как можно заметить, в конфигурации есть отдельные очереди для "мертвых" и просроченных сообщений. Сообщение попадает в "мертвую" очередь после того, как его доставка окончилась неудачей определенное количество раз. Этот параметр можно настроить, как и интервал между повторными попытками отправки сообщения.
Добавляем в конфигурацию описание этих очередей. Тут есть магия с префиксами jms.queue. где-то их нужно указывать, где-то нет.
После этого можно приступать к настройке обработчиков сообщений. Это обычные Message Driving Bean (MDB). Можно описать их с помощью аннотаций:
@MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"), @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/defaultQueue"), @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10") }) public class DefaultQueueHandler implements MessageListener {
При деплое они подцепятся автоматически. В принципе это уже рабочая конфигурация, но если используется spring или hibernate может потребоваться создание контекста, еще до того, как очереди начнут обрабатывать сообщения. В этом случае можно поступить следующим образом: создать EJB, ответственный за инициализацию контекста, перед инициализацией остановить обработку сообщений во всех очередях, а после инициализации возобновить ее. Это позволить избежать включения отдельного war модуля только для того, чтобы поднять контекст.
@Startup @Singleton(name = "InitContextEJB") public class InitContextEJB{ private String jconsolePrefix = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue="; @PostConstruct public void atStartup() { stopQueues(); System.out.println("+++++++++++++++++++ CONTEXT +++++++++++++++++++"); ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext( "applicationContext.xml", "data-source.xml" ); startQueues(); } @PreDestroy public void atDestroy(){ stopQueues(); } private void startQueues() { try { setPaused(new ObjectName(jconsolePrefix + "defaultQueue"), false); setPaused(new ObjectName(jconsolePrefix + "beelineQueue"), false); setPaused(new ObjectName(jconsolePrefix + "mtsQueue"), false); setPaused(new ObjectName(jconsolePrefix + "mfQueue"), false); } catch (MalformedObjectNameException e) { e.printStackTrace(); } } private void stopQueues() { try { setPaused(new ObjectName(jconsolePrefix + "defaultQueue"), true); setPaused(new ObjectName(jconsolePrefix + "beelineQueue"), true); setPaused(new ObjectName(jconsolePrefix + "mtsQueue"), true); setPaused(new ObjectName(jconsolePrefix + "mfQueue"), true); } catch (MalformedObjectNameException e) { e.printStackTrace(); } }
Для остановки и запуска очередей можно вспомнить, что они по-сути MDB, а значит доступны через JMX консоль и javax.management.MBeanServer. Приведу вариант с MBeanServer, как более независимый от реализации и стандартный.
protected void setPaused(final ObjectName objectName, final Boolean method) { try { MBeanServer mbeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer(); System.out.println(objectName + " paused before= " + mbeanServer.getAttribute(objectName, "paused")); mbeanServer.invoke(objectName, method ? "pause" : "resume",new Object[]{}, new String[]{}); System.out.println(objectName + " paused after= " + mbeanServer.getAttribute(objectName, "paused")); } catch (Exception e) { e.printStackTrace(); } }
Перехват и глотание исключений при остановке желательно, поскольку сам HornetQ сервер может остановиться раньше (например при остановке JBoss)
Полезные ссылки: