пятница, 17 августа 2012 г.

Отложенная доставка JMS сообщений в HornetQ

Иногда при обработке JMS сообщения возникла ситуация, когда нужно повторить операцию через какое-то время. Можно вместо того, чтобы висеть в Thread.sleep(), занимая ценные системные ресурсы, просто повторно послать сообщение обратно в очередь, указав время, когда его следует начать обрабатывать.
Стандарт JMS никак не регламентирует возможность доставки сообщений с определенной задержкой, тем не менее, поскольку это весьма полезная возможность, многие производители включают ее в реализацию.
Для HornetQ произвести задержку в доставке сообщения можно просто установив нужное значение в свойство _HQ_SCHED_DELIVERY (или Message.HDR_SCHEDULED_DELIVERY_TIME)
// задержка в 5 секунд относительно текущего момента.
message.setLongProperty("_HQ_SCHED_DELIVERY", System.currentTimeMillis() + 5000);
Источники:
HornetQ User Manual
Sending delayed JMS Messages

понедельник, 6 августа 2012 г.

Внедрение HornetQ JMS 2.2.5 сервера и клиента в приложение

В этой статье рассказывается, как внедрить в свое приложение замечательный HornetQ сервер. И это оказалось проще, чем я предполагал вначале.
Мне требовалась асинхронность обработки сообщений, потокобезопастность и возможность мониторинга, и желательно не писать много своего кода. Так что я остановился на HornetQ как уже знакомой мне реализации, которая удовлетворяем моим запросам.
Возьмем версию HornetQ 2.2.5 как последнюю стабильную на данный момент.
Зависимоcти для maven:
<dependency>
   <groupId>org.hornetq</groupId>
   <artifactId>hornetq-core</artifactId>
   <version>2.2.5.Final</version>
   <scope>compile</scope>
</dependency>
   <dependency>
   <groupId>org.hornetq</groupId>
   <artifactId>hornetq-jms</artifactId>
   <version>2.2.5.Final</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>org.hornetq</groupId>
   <artifactId>hornetq-logging</artifactId>
   <version>2.2.5.Final</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>org.jboss.netty</groupId>
   <artifactId>netty</artifactId>
   <version>3.2.3.Final</version>
</dependency>
<dependency>
   <groupId>org.jboss.spec.javax.jms</groupId>
   <artifactId>jboss-jms-api_1.1_spec</artifactId>
   <version>1.0.0.Final</version>
   <scope>compile</scope>
</dependency>
Конфигурация сервера и очередей.

В classpath должны находиться файлы hornetq-configuration.xml, содержащий настройки сервера, hornetq-jms.xml с настройками очередей, и hornetq-users.xml, содержащий настройки пользователей.
hornetq-configuration.xml

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

    <persistence-enabled>false</persistence-enabled>
    <!-- Connectors -->

    <connectors>
        <connector name="in-vm">
            <factory-class>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</factory-class>
        </connector>
    </connectors>

    <acceptors>
        <acceptor name="in-vm">
            <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
        </acceptor>
    </acceptors>

    <!-- Other config -->

    <security-settings>
        <!--security for example queue-->
        <security-setting match="#">
            <permission type="createDurableQueue" roles="guest"/>
            <permission type="deleteDurableQueue" roles="guest"/>
            <permission type="createNonDurableQueue" roles="guest"/>
            <permission type="deleteNonDurableQueue" roles="guest"/>
            <permission type="consume" roles="guest"/>
            <permission type="send" roles="guest"/>
        </security-setting>
    </security-settings>
</configuration>

hornetq-jms.xml 
<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

    <connection-factory name="ConnectionFactory">
        <connectors>
            <connector-ref connector-name="in-vm"/>
        </connectors>
        <entries>
            <entry name="ConnectionFactory"/>
        </entries>
        <consumer-window-size>0</consumer-window-size>
        <retry-interval>1000</retry-interval>
        <retry-interval-multiplier>1.5</retry-interval-multiplier>
        <max-retry-interval>60000</max-retry-interval>
        <reconnect-attempts>1000</reconnect-attempts>
    </connection-factory>

    <!--the queue used by the example-->
    <queue name="paymentQueue">
        <entry name="queue/paymentQueue"/>
    </queue>
    <queue name="statusQueue">
        <entry name="queue/statusQueue"/>
    </queue>

</configuration>

hornetq-jms.xml
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
    <!-- the default user.  this is used where username is null-->
    <defaultuser name="guest" password="guest">
        <role name="guest"/>
    </defaultuser>
</configuration>


Старт встроенного сервера очень прост:
  
import org.hornetq.jms.server.embedded.EmbeddedJMS;

  EmbeddedJMS server = new EmbeddedJMS();
  server.start();
Далее создаем нужное число потребителей сообщений:
 
            QueueConnectionFactory cf = (QueueConnectionFactory) server.lookup("ConnectionFactory");
            Queue queue = (Queue) server.lookup("queue/paymentQueue");
            QueueConnection conn = cf.createQueueConnection("guest", "guest");
            for(int i = 0; i< Config.getConsumerCount(); i++){
                Session consumerSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                MessageConsumer consumer = consumerSession.createConsumer(queue);
                consumer.setMessageListener(new PaymentSender());
            }
Класс PaymentSender должен реализовывать интерфейс javax.jms.MessageListener
Запускаем обработчики:
 
conn.start();
Теперь нужно создать класс для отсылки сообщений.
 
    private static QueueConnectionFactory qconFactory = null;
    private static QueueConnection qcon = null;


    public static void init() throws JMSException {
        qconFactory = (QueueConnectionFactory) HornetqListener.server.lookup("ConnectionFactory");
        qcon = qconFactory.createQueueConnection("guest", "guest");

    }

    public static void destroy() throws JMSException {
        qcon.close();
    }

    public static void addPaymentToQueue(Transaction t) throws JMSException {

        QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = (Queue) HornetqListener.server.lookup("queue/paymentQueue");
        QueueSender qsender = qsession.createSender(queue);
        qcon.start();
        TextMessage msg = qsession.createTextMessage();
        msg.setText(String.valueOf(t.getId()));
        logger.debug("message {} putted in queue 'queue/paymentQueue'", msg.getText());
        qsender.send(msg);
        qsession.close();
    }
HornetqListener.server - это статическая переменная, в которую мы сохранили созданный экземпляр объекта EmbeddedJMS
Мониторинг очередей реализуем через JMX
private static int max_queue = 200;

QueueView paymentqueue = jmxQuery("paymentQueue");
if(paymentqueue.getMessagesInQueue() > max_queue)
  max_queue = (int) paymentqueue.getMessagesInQueue();
String percentPayment = (new Double(paymentqueue.getMessagesInQueue()))/max_queue * 100d + "%";

private QueueView jmxQuery(String queueName) throws Exception {
        MBeanServer mbeanServer = java.lang.management.ManagementFactory.getPlatformMBeanServer();
        QueueView queueView = new QueueView();

        ObjectName name = new ObjectName("org.hornetq:module=JMS,type=Queue,name=\""+ queueName +"\"");

        queueView.setConsumersCount((Integer) mbeanServer.getAttribute(name, "ConsumerCount"));
        queueView.setMessagesDelivering((Integer) mbeanServer.getAttribute(name, "DeliveringCount"));
        queueView.setMessagesInQueue((Long) mbeanServer.getAttribute(name, "MessageCount"));
        queueView.setMessagesAdded((Long) mbeanServer.getAttribute(name, "MessagesAdded"));
        queueView.setName((String) mbeanServer.getAttribute(name, "Name"));

        name = new ObjectName("org.hornetq:module=Core,type=Server");

        queueView.setConnectionCount((Integer) mbeanServer.getAttribute(name, "ConnectionCount"));
        queueView.setThreadPoolMaxSize((Integer) mbeanServer.getAttribute(name, "ThreadPoolMaxSize"));
        queueView.setServerVersion((String) mbeanServer.getAttribute(name, "Version"));
        return queueView;
    }
Конфигурационные файлы взяты отсюда

четверг, 2 августа 2012 г.

Загрузка диапазонов номеров мобильных операторов с сайта Россвязи

На сайте Россвязи есть замечательный, периодически обновляющийся список, содержащий номерные ёмкости и телефонные компании, к которым эти диапазоны относятся. В данной статье описывается процедура получения и разбора этого файла с помощью апачевского HttpClient и обычного SAX парсера.

  • Скачиваем html со списком кодов

Для этого вполне подойдет org.apache.http.impl.client.DefaultHttpClient

DefaultHttpClient client = new DefaultHttpClient();
HttpGet httpget = new HttpGet("http://rossvyaz.ru/docs/articles/DEF-9x.html");

HttpResponse resp = client.execute(httpget);
if (resp.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
    InputStream is = resp.getEntity().getContent();
    List registry = new ArrayList();
    parseHTML(is, registry);
}

Но вот незадача, контент сжат gzip'ом. В принципе это даже неплохо, уменьшается время загрузки и сетевой трафик, так что просто добавим поддержку gzip в HttpClient. Делается это путем добавления перехватчиков на запрос и ответ:

private DefaultHttpClient prepareHttpClient() {
        DefaultHttpClient client = new DefaultHttpClient();
        client.addRequestInterceptor(new HttpRequestInterceptor() {

            public void process(
                    final HttpRequest request,
                    final HttpContext context) throws HttpException, IOException {
                if (!request.containsHeader("Accept-Encoding")) {
                    request.addHeader("Accept-Encoding", "gzip");
                }
            }

        });

        client.addResponseInterceptor(new HttpResponseInterceptor() {
            public void process(
                    final HttpResponse response,
                    final HttpContext context) throws HttpException, IOException {
                HttpEntity entity = response.getEntity();
                if (entity != null) {
                    Header ceheader = entity.getContentEncoding();
                    if (ceheader != null) {
                        HeaderElement[] codecs = ceheader.getElements();
                        for (int i = 0; i < codecs.length; i++) {
                            if (codecs[i].getName().equalsIgnoreCase("gzip")) {
                                response.setEntity(
                                        new GzipDecompressingEntity(response.getEntity()));
                                return;
                            }
                        }
                    }
                }
            }

        });
        return client;
    }

Теперь осталось разобрать полученный html и вытащить из него DEF коды.

  • Парсим полученный html

Воспользуется стандартным потоковым java SAX парсером (javax.xml.stream.XMLEventReader), поскольку полученный файл большой, и к тому же является невалидным XML файлом, так что придется произвести небольшие ухищрения, чтобы его разобрать. Разбор производится "на лету", то есть начинается уже в процессе скачивания html, за счет чего достигается приличная скорость при небольшом потреблении памяти. Весь процесс занимает порядка трех секунд.


static final String TR = "tr";
static final String TD = "td";

...

private void parseHTML(InputStream is, List<DEFCode> result) {
        try {
            XMLInputFactory inputFactory = XMLInputFactory.newInstance();
            XMLEventReader eventReader = inputFactory.createXMLEventReader(is, "windows-1251");
            String[] buff = new String[6];
            int count = 0;
            int ind = 0;
            while (eventReader.hasNext()) {
                try {
                    XMLEvent event = eventReader.nextEvent();

                    if (event.isStartElement()) {
                        StartElement startElement = event.asStartElement();
                        // start new row
                        if (TR.equals(startElement.getName().getLocalPart())) {
                            buff = new String[6];
                            count++;
                            ind = 0;
                        }

                        if (TD.equals(event.asStartElement().getName().getLocalPart())) {
                            event = eventReader.nextEvent();
                            buff[ind++] = event.asCharacters().getData();
                            continue;
                        }
                    }

                    if (event.isEndElement()) {
                        EndElement endElement = event.asEndElement();
                        if (TR.equals(endElement.getName().getLocalPart())) {
                            if(count != 1){ // пропускаем первую строку с заголовком
                                result.add(validateRow(count, buff));
                            }
                        }
                    }
                } catch (XMLStreamException e) { // вероятнее всего это незакрытый тег, игнорируем ошибку
                    logger.error("skip error");
                } catch(Exception ex){ // вероятнее всего файл просто закончился, завершаем обработку
                    logger.error("skip error, break");
                    break;
                }
            }
            eventReader.close();

        } catch (XMLStreamException e) { // что-то не так с кодировкой или структурой файла
            e.printStackTrace();
        }
    }

private DEFCode validateRow(int counter, String[] nextLine) throws ValidationException {
        // здесь проводится проверка строки на валидность и если 
        // строка не валидна, выбрасывается эксепшн с номером строки 
    }