Wednesday, February 3, 2010

A few recipes for reprocessing messages in Dead-Letter-Queue using ActiveMQ

A few recipes for reprocessing messages in Dead-Letter-Queue using ActiveMQ




Messaging based asynchronous processing is a key component of any complexed software especially in transactional environment. There are a number of solutions that provide high performance and reliable messaging in Java space such as ActiveMQ, FUSE broker, JBossMQ, SonicMQ, Weblogic, Websphere, Fiorano, etc. These providers support JMS specification, which provides abstraction for queues, message providers and message consumers. In this blog, I will go over some recipes for recovering messages from dead letter queue when using ActiveMQ.


What is Dead Letter Queue


Generally, when a consumer fails to process a message within a transaction or does not send acknowledgement back to the broker, the message is put back to the queue. The message is then delivered upto certain number of times based on configuration and finally the message is put to dead letter queue when that limit is exceeded. The ActiveMQ documentation recommends following settings for defining dead letter queues:



 <broker...>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">

<deadLetterStrategy>
<individualDeadLetterStrategy
queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>

</destinationPolicy>
...
</broker>

and you can control redlivery policy as follows:


 RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

It is important that you create dlq per queue, otherwise ActiveMQ puts them into a single dead letter queue.



Getting the QueueViewMBean Handle


ActiveMQ provides QueueViewMBean to invoke administration APIs on the queues. The easiest way to get this handle is to use BrokerFacadeSupport class, which is extended by RemoteJMXBrokerFacade and LocalBrokerFacade. You can use RemoteJMXBrokerFacade if you are connecting to remote ActiveMQ server, e.g. here is Spring configuration for setting it up:


     <bean id="brokerQuery" class="org.apache.activemq.web.RemoteJMXBrokerFacade" autowire="constructor" destroy-method="shutdown">
<property name="configuration">
<bean class="org.apache.activemq.web.config.SystemPropertiesConfiguration"/>
</property>

<property name="brokerName"><null/></property>
</bean>

Alternatively, you can use LocalBrokerFacade if you are running embedded ActiveMQ server, e.g. below is Spring configuration for it:


     <bean id="brokerQuery" class="org.apache.activemq.web.LocalBrokerFacade" autowire="constructor" scope="prototype"/>


Getting number of messages from the queue


Once you got handle to QueueViewMBean, you can use following API to find the number of messages in the queue:


 1     public long getQueueSize(final String dest) {

2 try {
3 return brokerQuery.getQueue(dest).getQueueSize();
4 } catch (Exception e) {
5 throw new RuntimeException(e);

6 }
7 }
8


Copying Messages using JMS APIs


The JMS specification provides APIs to browse queue in read mode and then you can send the messages to another queue, e.g.


  1 import java.util.Enumeration;

2

3 import javax.jms.Connection;
4 import javax.jms.ConnectionFactory;
5 import javax.jms.JMSException;

6 import javax.jms.Message;
7 import javax.jms.Queue;
8 import javax.jms.QueueBrowser;

9 import javax.jms.Session;
10 import javax.jms.TextMessage;
11 import javax.management.openmbean.CompositeData;
12

13 import org.apache.activemq.broker.jmx.QueueViewMBean;
14 import org.apache.activemq.web.BrokerFacadeSupport;
15 import org.springframework.beans.factory.annotation.Autowired;
16 import org.springframework.jms.core.BrowserCallback;

17 import org.springframework.jms.core.JmsTemplate;
18 import org.springframework.jms.core.MessageCreator;
19

20 public class DlqReprocessor {

21 @Autowired
22 private JmsTemplate jmsTemplate;
23

24 @Autowired
25 BrokerFacadeSupport brokerQuery;
26

27 @Autowired
28 ConnectionFactory connectionFactory;
29

30
31 @SuppressWarnings("unchecked")
32 void redeliverDLQUsingJms(final String brokerName, final String from,

33 final String to) {
34 Connection connection = null;
35 Session session = null;
36

37 try {
38 connection = connectionFactory.createConnection();
39 connection.start();
40 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
41 Queue dlq = session.createQueue(from);

42 QueueBrowser browser = session.createBrowser(dlq);
43

44 Enumeration<Message> e = browser.getEnumeration();
45
46 while (e.hasMoreElements()) {

47 Message message = e.nextElement();
48 final String messageBody = ((TextMessage) message).getText();
49 jmsTemplate.send(to, new MessageCreator() {
50 @Override

51 public Message createMessage(final Session session)
52 throws JMSException {
53 return session.createTextMessage(messageBody);

54 }
55 });
56 }
57 } catch (Exception e) {
58 throw new RuntimeException(e);

59 } finally {
60 try {
61 session.close();
62 } catch (Exception e) {

63 }
64 try {
65 connection.close();
66 } catch (Exception e) {

67 }
68 }
69 }
70 // . . .

71 }
72

The downside of above approach is that it leaves the original messages in the dead letter queue.


Copying Messages using Spring’s JmsTemplate APIs


You can effectively do the same thing with JmsTemplate provided by Spring with a bit less code, e.g.


  1    void redeliverDLQUsingJmsTemplateBrowse(final String from, final String to) {

2 try {
3 jmsTemplate.browse(from, new BrowserCallback() {
4

5 @SuppressWarnings("unchecked")

6 @Override
7 public Object doInJms(Session session, QueueBrowser browser)
8 throws JMSException {
9 Enumeration<Message> e = browser.getEnumeration();

10 while (e.hasMoreElements()) {
11 Message message = e.nextElement();
12 final String messageBody = ((TextMessage) message)
13 .getText();
14 jmsTemplate.send(to, new MessageCreator() {

15 @Override
16 public Message createMessage(final Session session)
17 throws JMSException {
18 return session.createTextMessage(messageBody);

19 }
20 });
21 }
22 return null;
23 }

24 });
25 } catch (Exception e) {
26 throw new RuntimeException(e);
27 }

28 }
29


Moving Messages using receive/send APIs


As I mentioned, the above approaches leave messages in the DLQ, which may not be what you want. Thus, another simple approach would be to consume messages from the dead letter queue and send it to another,e.g.


  1   public void redeliverDLQUsingJmsTemplateReceive(final String from,

2 final String to) {
3 try {
4 jmsTemplate.setReceiveTimeout(100);
5 Message message = null;

6 while ((message = jmsTemplate.receive(from)) != null) {
7 final String messageBody = ((TextMessage) message).getText();
8 jmsTemplate.send(to, new MessageCreator() {

9 @Override
10 public Message createMessage(final Session session)
11 throws JMSException {

12 return session.createTextMessage(messageBody);
13 }
14 });
15 }
16 } catch (Exception e) {

17 throw new RuntimeException(e);
18 }
19 }
20


Moving Messages using ActiveMQ’s API


Finally, the best approach I found waas to use ActiveMQ’s APIs to move messags, e.g.


  1     public void redeliverDLQUsingJMX(final String brokerName, final String from,

2 final String to) {
3 try {
4 final QueueViewMBean queue = brokerQuery.getQueue(from);

5 for (int i = 0; i < 10 && queue.getQueueSize() > 0; i++) {
6 CompositeData[] compdatalist = queue.browse();

7 for (CompositeData cdata : compdatalist) {
8 String messageID = (String) cdata.get("JMSMessageID");
9 queue.moveMessageTo(messageID, to);
10 }

11 }
12 } catch (Exception e) {
13 throw new RuntimeException(e);
14 }

15 }
16


I have been using this approach and have found to be reliable for reprocessing dead letter queue, though these techniques an also be used for general queues. I am sure there are tons of alternatives including using full-fledged enterprise service bus route. Let me know if you have interesting solutions to this problem.