понедельник, 6 августа 2012 г.

Внедрение HornetQ JMS 2.2.5 сервера и клиента в приложение

В этой статье рассказывается, как внедрить в свое приложение замечательный HornetQ сервер. И это оказалось проще, чем я предполагал вначале.
Мне требовалась асинхронность обработки сообщений, потокобезопастность и возможность мониторинга, и желательно не писать много своего кода. Так что я остановился на HornetQ как уже знакомой мне реализации, которая удовлетворяем моим запросам.
Возьмем версию HornetQ 2.2.5 как последнюю стабильную на данный момент.
Зависимоcти для maven:
<dependency>
   <groupId>org.hornetq</groupId>
   <artifactId>hornetq-core</artifactId>
   <version>2.2.5.Final</version>
   <scope>compile</scope>
</dependency>
   <dependency>
   <groupId>org.hornetq</groupId>
   <artifactId>hornetq-jms</artifactId>
   <version>2.2.5.Final</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>org.hornetq</groupId>
   <artifactId>hornetq-logging</artifactId>
   <version>2.2.5.Final</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>org.jboss.netty</groupId>
   <artifactId>netty</artifactId>
   <version>3.2.3.Final</version>
</dependency>
<dependency>
   <groupId>org.jboss.spec.javax.jms</groupId>
   <artifactId>jboss-jms-api_1.1_spec</artifactId>
   <version>1.0.0.Final</version>
   <scope>compile</scope>
</dependency>
Конфигурация сервера и очередей.

В classpath должны находиться файлы hornetq-configuration.xml, содержащий настройки сервера, hornetq-jms.xml с настройками очередей, и hornetq-users.xml, содержащий настройки пользователей.
hornetq-configuration.xml

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

    <persistence-enabled>false</persistence-enabled>
    <!-- Connectors -->

    <connectors>
        <connector name="in-vm">
            <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
        </connector>
    </connectors>

    <acceptors>
        <acceptor name="in-vm">
            <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
        </acceptor>
    </acceptors>

    <!-- Other config -->

    <security-settings>
        <!--security for example queue-->
        <security-setting match="#">
            <permission type="createDurableQueue" roles="guest"/>
            <permission type="deleteDurableQueue" roles="guest"/>
            <permission type="createNonDurableQueue" roles="guest"/>
            <permission type="deleteNonDurableQueue" roles="guest"/>
            <permission type="consume" roles="guest"/>
            <permission type="send" roles="guest"/>
        </security-setting>
    </security-settings>
</configuration>

hornetq-jms.xml 
<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

    <connection-factory name="ConnectionFactory">
        <connectors>
            <connector-ref connector-name="in-vm"/>
        </connectors>
        <entries>
            <entry name="ConnectionFactory"/>
        </entries>
        <consumer-window-size>0</consumer-window-size>
        <retry-interval>1000</retry-interval>
        <retry-interval-multiplier>1.5</retry-interval-multiplier>
        <max-retry-interval>60000</max-retry-interval>
        <reconnect-attempts>1000</reconnect-attempts>
    </connection-factory>

    <!--the queue used by the example-->
    <queue name="paymentQueue">
        <entry name="queue/paymentQueue"/>
    </queue>
    <queue name="statusQueue">
        <entry name="queue/statusQueue"/>
    </queue>

</configuration>

hornetq-jms.xml
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
    <!-- the default user.  this is used where username is null-->
    <defaultuser name="guest" password="guest">
        <role name="guest"/>
    </defaultuser>
</configuration>


Старт встроенного сервера очень прост:
  
import org.hornetq.jms.server.embedded.EmbeddedJMS;

  EmbeddedJMS server = new EmbeddedJMS();
  server.start();
Далее создаем нужное число потребителей сообщений:
 
            QueueConnectionFactory cf = (QueueConnectionFactory) server.lookup("ConnectionFactory");
            Queue queue = (Queue) server.lookup("queue/paymentQueue");
            QueueConnection conn = cf.createQueueConnection("guest", "guest");
            for(int i = 0; i< Config.getConsumerCount(); i++){
                Session consumerSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                MessageConsumer consumer = consumerSession.createConsumer(queue);
                consumer.setMessageListener(new PaymentSender());
            }
Класс PaymentSender должен реализовывать интерфейс javax.jms.MessageListener
Запускаем обработчики:
 
conn.start();
Теперь нужно создать класс для отсылки сообщений.
 
    private static QueueConnectionFactory qconFactory = null;
    private static QueueConnection qcon = null;


    public static void init() throws JMSException {
        qconFactory = (QueueConnectionFactory) HornetqListener.server.lookup("ConnectionFactory");
        qcon = qconFactory.createQueueConnection("guest", "guest");

    }

    public static void destroy() throws JMSException {
        qcon.close();
    }

    public static void addPaymentToQueue(Transaction t) throws JMSException {

        QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = (Queue) HornetqListener.server.lookup("queue/paymentQueue");
        QueueSender qsender = qsession.createSender(queue);
        qcon.start();
        TextMessage msg = qsession.createTextMessage();
        msg.setText(String.valueOf(t.getId()));
        logger.debug("message {} putted in queue 'queue/paymentQueue'", msg.getText());
        qsender.send(msg);
        qsession.close();
    }
HornetqListener.server - это статическая переменная, в которую мы сохранили созданный экземпляр объекта EmbeddedJMS
Мониторинг очередей реализуем через JMX
private static int max_queue = 200;

QueueView paymentqueue = jmxQuery("paymentQueue");
if(paymentqueue.getMessagesInQueue() > max_queue)
  max_queue = (int) paymentqueue.getMessagesInQueue();
String percentPayment = (new Double(paymentqueue.getMessagesInQueue()))/max_queue * 100d + "%";

private QueueView jmxQuery(String queueName) throws Exception {
        MBeanServer mbeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer();
        QueueView queueView = new QueueView();

        ObjectName name = new ObjectName("org.hornetq:module=JMS,type=Queue,name=\""+ queueName +"\"");

        queueView.setConsumersCount((Integer) mbeanServer.getAttribute(name, "ConsumerCount"));
        queueView.setMessagesDelivering((Integer) mbeanServer.getAttribute(name, "DeliveringCount"));
        queueView.setMessagesInQueue((Long) mbeanServer.getAttribute(name, "MessageCount"));
        queueView.setMessagesAdded((Long) mbeanServer.getAttribute(name, "MessagesAdded"));
        queueView.setName((String) mbeanServer.getAttribute(name, "Name"));

        name = new ObjectName("org.hornetq:module=Core,type=Server");

        queueView.setConnectionCount((Integer) mbeanServer.getAttribute(name, "ConnectionCount"));
        queueView.setThreadPoolMaxSize((Integer) mbeanServer.getAttribute(name, "ThreadPoolMaxSize"));
        queueView.setServerVersion((String) mbeanServer.getAttribute(name, "Version"));
        return queueView;
    }
Конфигурационные файлы взяты отсюда

Комментариев нет:

Отправить комментарий