вторник, 24 июля 2012 г.

Настройка очередей в JBoss 7

В этой статье описывается настройка очередей в JBoss AS 7.1.1

Последний раз мне приходилось работать с JBoss 5 и с тех пор есть определенный прогресс. Сервер запускается и останавливается быстро, появились средства конфигурации и мониторинга. В 7й ветке есть тенденция отказа от отдельных xml файлов для настроек очередей и источников данных, и рекомендованный способ - добавлять их непосредственно в конфигурационный файл сервера. Хотя старый способ все еще работает, добавленные таким образом очереди не будут видны в админской консоли как очереди и по ним нельзя будет посмотреть статистику.

Итак, для начала нам нужно просто подключить поддержку очередей. По умолчанию в standalone.xml она не подключена. Для включения поддержки копируем из standalone-full.xml все, связанное с messaging, а именно:

...

<extension module="org.jboss.as.messaging"/>

...

<subsystem xmlns="urn:jboss:domain:messaging:1.1">
            <hornetq-server>
                <persistence-enabled>true</persistence-enabled>
                <journal-file-size>102400</journal-file-size>
                <journal-min-files>2</journal-min-files>

                <connectors>
                    <netty-connector name="netty" socket-binding="messaging"/>
                    <netty-connector name="netty-throughput" socket-binding="messaging-throughput">
                        <param key="batch-delay" value="50"/>
                    </netty-connector>
                    <in-vm-connector name="in-vm" server-id="0"/>
                </connectors>

                <acceptors>
                    <netty-acceptor name="netty" socket-binding="messaging"/>
                    <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
                        <param key="batch-delay" value="50"/>
                        <param key="direct-deliver" value="false"/>
                    </netty-acceptor>
                    <in-vm-acceptor name="in-vm" server-id="0"/>
                </acceptors>
                <security-settings>
                    <security-setting match="#">
                        <permission type="send" roles="guest"/>
                        <permission type="consume" roles="guest"/>
                        <permission type="createNonDurableQueue" roles="guest"/>
                        <permission type="deleteNonDurableQueue" roles="guest"/>
                    </security-setting>
                </security-settings>

                <address-settings>
                    <address-setting match="#">
                        <dead-letter-address>jms.queue.DLQ</dead-letter-address>
                        <expiry-address>jms.queue.ExpiryQueue</expiry-address>
                        <redelivery-delay>1000</redelivery-delay>
                        <max-size-bytes>10485760</max-size-bytes>
                        <address-full-policy>BLOCK</address-full-policy>
                        <message-counter-history-day-limit>10</message-counter-history-day-limit>
                    </address-setting>
                </address-settings>
               <jms-connection-factories>
                    <connection-factory name="InVmConnectionFactory">
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/ConnectionFactory"/>
                        </entries>
                    </connection-factory>
                    <connection-factory name="RemoteConnectionFactory">
                        <connectors>
                            <connector-ref connector-name="netty"/>
                        </connectors>
                        <entries>
                            <entry name="RemoteConnectionFactory"/>
                            <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
                        </entries>
                    </connection-factory>
                    <pooled-connection-factory name="hornetq-ra">
                        <transaction mode="xa"/>
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/JmsXA"/>
                        </entries>
                    </pooled-connection-factory>
                </jms-connection-factories>
          <jms-destinations>
                    <jms-queue name="testQueue">
                        <entry name="queue/test"/>
                        <entry name="java:jboss/exported/jms/queue/test"/>
                    </jms-queue>
                    <jms-topic name="testTopic">
                        <entry name="topic/test"/>
                        <entry name="java:jboss/exported/jms/topic/test"/>
                    </jms-topic>
          </jms-destinations>
</hornetq-server>
...
        <socket-binding name="messaging" port="5445"/>
        <socket-binding name="messaging-throughput" port="5455"/>

После этого можно добавить непосредственно сами очереди:

<jms-destinations>
                    <jms-queue name="defaultQueue">
                        <entry name="queue/default">
                        <entry name="java:jboss/defaultQueue">
                    </entry></entry></jms-queue>
                    <jms-queue name="beelineQueue">
                        <entry name="queue/beeline">
                        <entry name="java:jboss/beelineQueue">
                    </entry></entry></jms-queue>
                    <jms-queue name="mtsQueue">
                        <entry name="queue/mts">
                        <entry name="java:jboss/mtsQueue">
                    </entry></entry></jms-queue>
                    <jms-queue name="mfQueue">
                        <entry name="queue/mf">
                        <entry name="java:jboss/mfQueue">
                    </entry></entry></jms-queue>
                    <jms-queue name="ExpiryQueue">
                        <entry name="queue/expiry">
                        <entry name="java:jboss/ExpiryQueue">
                    </entry></entry></jms-queue>
                    <jms-queue name="DLQ">
                        <entry name="queue/dead">
                        <entry name="java:jboss/deadQueue">
                    </entry></entry></jms-queue>
                </jms-destinations>

Как можно заметить, в конфигурации есть отдельные очереди для "мертвых" и просроченных сообщений. Сообщение попадает в "мертвую" очередь после того, как его доставка окончилась неудачей определенное количество раз. Этот параметр можно настроить, как и интервал между повторными попытками отправки сообщения.

Добавляем в конфигурацию описание этих очередей. Тут есть магия с префиксами jms.queue. где-то их нужно указывать, где-то нет.

После этого можно приступать к настройке обработчиков сообщений. Это обычные Message Driving Bean (MDB). Можно описать их с помощью аннотаций:

@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
        @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/defaultQueue"),
        @ActivationConfigProperty(propertyName = "maxSession", propertyValue = "10")
        })
public class DefaultQueueHandler implements MessageListener {

При деплое они подцепятся автоматически. В принципе это уже рабочая конфигурация, но если используется spring или hibernate может потребоваться создание контекста, еще до того, как очереди начнут обрабатывать сообщения. В этом случае можно поступить следующим образом: создать EJB, ответственный за инициализацию контекста, перед инициализацией остановить обработку сообщений во всех очередях, а после инициализации возобновить ее. Это позволить избежать включения отдельного war модуля только для того, чтобы поднять контекст.

@Startup
@Singleton(name = "InitContextEJB")
public class InitContextEJB{

    private String jconsolePrefix = "jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=";

    @PostConstruct
    public void atStartup() {
        stopQueues();
        System.out.println("+++++++++++++++++++ CONTEXT +++++++++++++++++++");
        ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(
                "applicationContext.xml",
                "data-source.xml"
                );
        startQueues();
    }

    @PreDestroy
    public void atDestroy(){
        stopQueues();
    }

    private void startQueues() {
        try {
            setPaused(new ObjectName(jconsolePrefix + "defaultQueue"), false);
            setPaused(new ObjectName(jconsolePrefix + "beelineQueue"), false);
            setPaused(new ObjectName(jconsolePrefix + "mtsQueue"), false);
            setPaused(new ObjectName(jconsolePrefix + "mfQueue"), false);
        } catch (MalformedObjectNameException e) {
            e.printStackTrace();
        }

    }

    private void stopQueues() {
        try {
            setPaused(new ObjectName(jconsolePrefix + "defaultQueue"), true);
            setPaused(new ObjectName(jconsolePrefix + "beelineQueue"), true);
            setPaused(new ObjectName(jconsolePrefix + "mtsQueue"), true);
            setPaused(new ObjectName(jconsolePrefix + "mfQueue"), true);
        } catch (MalformedObjectNameException e) {
            e.printStackTrace();
        }

    }

Для остановки и запуска очередей можно вспомнить, что они по-сути MDB, а значит доступны через JMX консоль и javax.management.MBeanServer. Приведу вариант с MBeanServer, как более независимый от реализации и стандартный.

protected void setPaused(final ObjectName objectName, final Boolean method) {
        try {
            MBeanServer mbeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer();
            System.out.println(objectName + " paused before= " + mbeanServer.getAttribute(objectName, "paused"));

            mbeanServer.invoke(objectName, method ? "pause" : "resume",new Object[]{}, new String[]{});

            System.out.println(objectName + " paused after= " + mbeanServer.getAttribute(objectName, "paused"));

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Перехват и глотание исключений при остановке желательно, поскольку сам HornetQ сервер может остановиться раньше (например при остановке JBoss)

Полезные ссылки: