08 February 2010

Using Spring to Receive JMS Messages



Have you ever had a need to create your own JMS consumer? Or will you have this need in the future? If you answered yes to either one of these questions, this post will simplify your life.

In the previous post, I discussed Using the Spring JmsTemplate to Send JMS Messages. As a follow-on, in this post I will demonstrate how to receive messages using Spring JMS. Although the previously mentioned JmsTemplate can receive messages synchronously, here I will focus on asynchronous message reception using the Spring message listener container architecture, specifically the DefaultMessageListenerContainer.

The DefaultMessageListenerContainer (DMLC) is another wonderful convenience class that is part of the Spring Framework's JMS package. As you can see in the Javadoc, the DMLC is not a single class, but a well-abstracted hierarchy for the purpose of receiving messages. The reason for this is that the DMLC takes its inspiration from Message Driven Beans (MDB).

MDBs were originally defined in the EJB 2.0 spec as a stateless, transaction aware message listener that use JMS resources provided by the Java EE container. MDBs can also be pooled by the Java EE container in order to scale up. In short, MDBs were designed for asynchronous message reception in a way that the Java EE container could manage them. Although the intention was good, unfortunately the disadvantages of MDBs are numerous including:
  • MDBs are static in their configuration and creation (they cannot be created dynamically)
  • MDBs can only listen to a single destination
  • MDBs can only send messages after first receiving a message
  • MDBs require an EJB container (and therefore the Java EE container)
Although the Spring DMLC took its inspiration from MDBs, it did not replicate these disadvantages; quite the opposite, in fact. The Spring DMLC is commonly used to create what have become known as Message-Driven POJOs (MDP). MDPs offer all of the same functionality as MDBs but without the disadvantages listed above. The Spring DMLC provides many features including:
  • Various levels of caching of the JMS resources (connections and sessions) and JMS consumers for increased performance
  • The ability to dynamically grow and shrink the number of consumers to concurrently process messages based on load (see setConcurrentConsumers and setMaxConcurrentConsumers) for additional performance
  • Automatically re-establishes connections if the message broker becomes unavailable
  • Asynchronous execution of a message listener using the Spring TaskExecutor
  • Support for local JMS transactions as well as an external transaction manager around message reception and listener execution
  • Support for various message acknowledgement modes, each providing different semantics
For some situations, it is important to understand the additional error handling and the redelivery semantics that are provided by the DMLC. For more information, see the AbstractMessageListenerContainer JavaDoc.

The reason I recommend the DMLC (or even the SimpleMessageListenerContainer) is because writing JMS consumers can be a lot of work. In doing so, you must manually handle and mange the JMS resources and the JMS consumers, any concurrency that is necessary and any use of transactions. If you've ever done such work you know how arduous and error prone it can be. Certainly MDBs provide some of these features but with all their disadvantages. By creating MDPs using the Spring DMLC, I have seen users save a tremendous amount of time and increase their productivity significantly. This is because the DMLC offers much flexibility, robustness, a high amount of configurability and it has widespread deployment in businesses all over the world (so it has been widely tested).

Compared to MDBs, use of the Spring DMLC is actually surprisingly simple. The easiest way to get started is to using an XML configuration as the Spring DMLC provides JMS namespace support. Below is a Spring application context that demonstrates the configuration to use the Spring DMLC with Apache ActiveMQ:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

  <!-- A JMS connection factory for ActiveMQ -->
  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
  p:brokerURL="tcp://foo.example.com:61616" />

  <!-- A POJO that implements the JMS message listener -->
  <bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

  <!-- The Spring message listener container configuration -->
  <jms:listener-container
      container-type="default"
      connection-factory="connectionFactory"
      acknowledge="auto">
    <jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
  </jms:listener-container>
</beans>


For folks who are already familiar with the Spring Framework, the XML above is quite straightforward. It defines a connection factory bean for ActiveMQ, a message listener bean and the Spring listener-container. Notice that the jms:listener contains the destination name and not the listener-container. This level of separation is important because it means that the listener-container is not tied to any destination, only the jms:listener is. You can define as many jms:listener elements as is necessary for your application and the container will handle them all.

Below is the message listener implementation:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;

public class SimpleMessageListener implements MessageListener {

  private static final Logger LOG = Logger.getLogger(SimpleMessageListener.class);

  public void onMessage(Message message) {
      try {
       TextMessage msg = (TextMessage) message;
       LOG.info("Consumed message: " + msg.getText());
      } catch (JMSException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
      }
  }

}

The message listener implementation is deliberately simple as its only purpose is to demonstrate receiving the message and logging the payload of the message. Although this listener implements the javax.jms.MessageListener interface, there are a total of three options available for implementing a message listener to be used with the Spring DMLC:
  • The javax.jms.MessageListener - This is what was used in the example above. It is a standardized interface from the JMS spec but handling threading is up to you.
  • The Spring SessionAwareMessageListener - This is a Spring-specific interface the provides access to the JMS session object. This is very useful for request-response messaging. Just be aware that you must do your own exception handling (i.e., override the handleListenerException method so exceptions are not lost).
  • The Spring MessageListenerAdapter - This is a Spring-specific interface that allows for type-specific message handling. Use of this interface avoids any JMS-specific dependencies in your code.


So not only is the Spring message listener container easy to use, it is also full of options to adapt to many environments. And I've only focused on the DefaultMessageListenerContainer here, I have not talked about the SimpleMessageListenerContainer (SMLC) beyond a simple mention. At a high level the difference is that the SMLC is static and provides no support for transactions.

One very big advantage of the Spring message listener container is that this type of XML config can be used in a Java EE container, in a servlet container or stand alone. This same Spring application context will run in Weblogic, JBoss, Tomcat or in a stand alone Spring container. Furthermore, the Spring DMLC also works with just about any JMS compliant messaging middleware available. Just define a bean for the JMS connection factory for your MOM and possibly tweak a few properties on the listener-container and you can begin consuming messages from different MOMs.

I should also note that the XML configuration is certainly not a requirement either. You can go straight for the underlying Java classes in your own code if you wish. I've used each style in various situations, but to begin using the Spring DMLC in the shortest amount of time, I find the Spring XML application context the fastest.

Update: I have made all of the code for these examples available via a GitHub repo.

234 comments:

  1. Great post Bruce. The fact that you can scale the number of consumers on the fly has some serious advantages as you alluded to. One way to scale the number of consumers might be to actually automate this with code or via some sort of JMX instrumentation and external management. All very cool!

    ReplyDelete
  2. I'm not sure what you're referring to automating exactly? You can certainly expose beans in the XML configuration via JMX by using the Spring MBeanExporter. As a quick example of how to do this, just add the following XML hunk to the XML example in the blog entry to register the message listener and listener container with the MBean server:

    <bean id="exporter" class="org.springframework.jmx.export.MBeanExporter">
    <property name="beans">
    <map>
    <entry key="test:name=myMessageListener" value-ref="myMessageListener" />
    <entry key="test:name=dmlc" value-ref="org.springframework.jms.listener.DefaultMessageListenerContainer#0"/>
    </map>
    </property>
    </bean>

    After adding this to the Spring XML application context, you can browse the DMLC in JConsole and even change any of the configurable properties.

    Although this works, it's kinda hacky. As you can see, I'm using the internal key for the DMLC as it exists in the bean registry. Ideally you'd want to make the MBeanExporter automatically locate and register the DMLC instance instead of doing it manually. I'm not sure at this moment if there's a way to do this automatically, but I'm sure I could find a way given the time.

    Hopefully this helps you see what is possible.

    ReplyDelete
  3. Excellent article Bruce.Although i am a Jee developer, i like spring framework. Recently i have posted Spring integration with java mail on my blog gananathsun.blogspot.com

    ReplyDelete
  4. Great article, very helpful. I just ordered ActiveMQ In Action MEAP 2 days ago. Great Book. Thanks.

    ReplyDelete
  5. What approach is best when you need to process the messages on the queue in order of arrival on the queue. Surely there is no way you can use concurrent consumers? I would love to see a programmatic example of the DMLC and not one that uses xml config but in my use case I need to process the messages in order within groups where some characteristic of the message defines the group. I know I can use selectors providing the headers are in place to select on and simply ensure that messages for a given group are pinned to any consuming thread but otherwise what else can be done?

    ReplyDelete
  6. One way to process messages in order would be to use total ordering in ActiveMQ (http://activemq.apache.org/total-ordering.html) and probably a single consumer on the destination.

    You might also be able to use ActiveMQ message groups (http://activemq.apache.org/message-groups.html) so that everything is processed by a single consumer. However, this is not as straightforward as using a single consumer.

    ReplyDelete
  7. Hi Bruce

    Thanks for the nice article.

    We have a question.

    Once the PERSISTENT message is consumed by the MessageListener, the message is completely removed from the Queue (so does from Persistent Storage (DB)). Is it default behavior?

    Is there any way to force/tell the MessageListener NOT to delete the message from the Queue until we commit the session (Or do something) in onMessage method?

    What we trying to do is:

    Even though the message is received by the MessageListener, we don't want to remove from the Queue, we want to do some processing in onMessage method and based on the result:

    We want to commit for Success - so the message will be completely removed from the Queue.

    For Failures - we don't commit; so the message will be redelivered (6 times by default) and then goes to Dead letter Queue (DLQ). That’s OK for us.

    Any help would be appreciated.

    Thanks

    ReplyDelete
  8. @Krishna, your questions are relevant for the message broker you are using, not for Spring. With message-oriented middleware that implement the JMS spec, a message won't be removed from persistence until the message has been acknowledged. So in your case, you should be using client acknowledge instead of auto acknowledge. This will allow you more control over when messages are acknowledged.

    The acknowledgement mode is specified when creating the session from the connection. See the following for more information:

    http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Connection.html#createSession(boolean, int)

    http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Session.html#CLIENT_ACKNOWLEDGE

    The acknowledgement mode can be controlled from the Spring DMLC. See the following for more information about acknowledgement modes with Spring:

    http://static.springsource.org/spring/docs/2.5.x/api/org/springframework/jms/listener/AbstractMessageListenerContainer.html

    ReplyDelete
  9. Thanks Bruce

    Now I am clear, as per the doc, by setting the acknowledge="auto", The message is acknowledged before listener execution, so the message is deleted from queue.

    I have also achieved the DLQ scenario in Spring Application by doing the following changes to your code.

    1. We set the acknowledge="transacted" (Since we want guaranteed redelivery in case of exception thrown and Trans acknowledgment for successful listener execution)

    < jms:listener-container container-type="default" connection-factory="connectionFactory" acknowledge=" transacted" >

    2. Since we want to throw the JMSException, we are implementing SessionAwareMessageListener.

    public class MyMessageQueueListener implements SessionAwareMessageListener {

    public void onMessage( Message message , Session session ) throws JMSException
    {
    //DO something
    if(success){
    //Do nothing – so the transaction acknowledged
    } else {
    //throw exception - So it redelivers
    throw new JMSException("..exception");
    }
    }

    }

    I have tested this. This seems working fine.

    Do you see any improvements?

    Thank you.

    ReplyDelete
  10. @Krishna, this is a perfect use of a transacted session to guarantee redelivery and a SessionAwareMessageListener for finer control. This illustrates the reason why the SessionAwareMessageListener is so useful -- it provides the most control over the behavior of the session from within the message listener.

    ReplyDelete
  11. Hi,

    what is the recommended use of
    - queuePrefetch
    - maxMessagesPerTask
    - caching/pooled/connectionfactory
    - maxConcurrentConsumers

    when I have a lot of messages coming from an activemq broker (10 per second), where
    - I don't care about the ordering
    - the processing in onMessage() takes about 2 seconds
    - I need to use transactions.

    I would like to make this as highly concurrent as possible, but my CPU is now going to full load when the messages start coming, so I expect that I misconfigured something.

    ReplyDelete
  12. @Leen, I must assume that the CPU spike to which you are referring is on the client-side where the Spring MLC is running. If this is the case, it sounds like you need to do some profiling of the JVM, possibly some thread dumps, etc. (see VisualVM - https://visualvm.dev.java.net/) to see what is happening during those periods of CPU spike. You are either just simply overloading the CPU or you need to tune things so that they perform more optimally.

    The ActiveMQ prefetch is documented here:

    http://activemq.apache.org/what-is-the-prefetch-limit-for.html

    The options for the Spring MLC (and it's parent classes) are documented in the Javadoc:

    http://static.springsource.org/spring/docs/3.0.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html

    Regarding the caching/pooling question, the Spring MLC provides tiered caching for the connection, sessions and consumers. So there is no need to use a caching/pooling connection factory with the MLC. The caching in the MLC can really improve performance when used correctly, so I advise that you read up on it and experiment with it in your environment to figure what is going to work best for your application.

    ReplyDelete
  13. Sending a message but not able to receieve this mesage. when i change queue name then its able receieve for few messsages again i have to change the queue name. But i am able to see every message sent and receive in ActiveMQ admin. please help me why every time i have change queue name.

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;

    public class MsgReceiver implements MessageListener {
    private static int ackMode;
    private static String messageQueueName;
    private static String messageBrokerUrl;

    private Session session;
    private boolean transacted = false;
    MessageConsumer consumer = null;
    CreateDS createDS = new CreateDS();

    static {
    messageBrokerUrl = ReadProperty.getValues("messageBrokerUrl");
    messageQueueName = "tmsmevent.messages";
    ackMode = Session.AUTO_ACKNOWLEDGE;
    }

    private void setupMessageQueueConsumer() throws Exception {

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    messageBrokerUrl);
    Connection connection = null;

    try
    {
    connection = connectionFactory.createConnection();
    connection.start();
    session = connection.createSession(transacted, ackMode);
    Destination adminQueue = session.createQueue(messageQueueName);
    // Set up a consumer to consume messages off of the admin queue
    consumer = session.createConsumer(adminQueue);
    MsgReceiver msgReceiver = new MsgReceiver();
    consumer.setMessageListener(msgReceiver);
    }
    catch (Exception e)
    {
    // Handle the exception appropriately
    e.printStackTrace();
    }
    /*
    * finally
    * {
    * try
    * {
    * if (consumer != null)
    * {
    * consumer.close();
    * }
    *
    * if (session != null)
    * {
    * session.close();
    * }
    *
    * if (connection != null)
    * {
    * connection.close();
    * }
    * }
    * catch (Exception e)
    * {
    * e.printStackTrace();
    * }
    * }
    */
    }

    public void onMessage(Message message) {
    try
    {

    if (message instanceof TextMessage)
    {

    TextMessage txtMsg = (TextMessage) message;
    String messageText = txtMsg.getText();

    System.out.println("Job Name: " + messageText);
    }
    }
    catch (Exception e)
    {
    // Handle the exception appropriately
    e.printStackTrace();
    }
    }
    public static void main(String[] args) throws Exception {
    new MsgReceiver().setupMessageQueueConsumer();
    }
    }

    ReplyDelete
  14. @pawan, I see that you are creating a new queue named tmsmevent.messages and registering a message listener so ActiveMQ should be delivering messages asynchronously to the message listener. Are you sure that the message listener thread is staying active and not dying?

    This question would be better addressed by posting to the ActiveMQ user mailing list. Information on this list is available here:

    http://activemq.apache.org/mailing-lists.html

    Bruce

    ReplyDelete
  15. Hi,

    Do you have any suggestions about using a DMLC when the Destination is unknown until runtime? (the destinations are saved into the database)

    ReplyDelete
  16. @tfecw, If you defer the creation of the DMLC until the destinations can be resolved, you have solved the problem. But there is no solution for doing this in Spring, so you would need to do this manually. Here are some suggestions:

    1) After Spring has started and the destinations have been resolved, read in an application context at runtime that is specifically for starting up the DMLC.

    2) Programmatically configure the DMLC (i.e., via Java instead of XML) and start it up after the destinations have been resolved.

    3) Implement your own custom DestinationResolver. In Spring, the DestinationResolver is used to resolve JMS destinations and three implementations are provided -- the BeanFactoryDestinationResolver, the DynamicDestinationResolver and the JndiDestinationResolver. A custom DestinationResolver would perform the necessary operations to resolve JMS destinations in your environment. Here's the Javadoc for the DestinationResolver:

    http://static.springsource.org/spring/docs/3.0.x/javadoc-api/org/springframework/jms/support/destination/DestinationResolver.html

    Hope that helps.

    Bruce

    ReplyDelete
  17. Hi bruce can that was a good article. I am not able to connect to a queue. i dont how and where to specify my queue name. I am getting an java.io.EOFException.

    ReplyDelete
  18. @pawan, if you are having trouble with the Spring Framework, I suggest you post a message to the Spring JMS forum here:

    http://forum.springsource.org/forumdisplay.php?f=30

    ReplyDelete
  19. Hi bruce, i tried that link but not of much help to me.
    I want to ask you that where do i need to give my credentials like Username and Password in the xml file. And wat is "Test.Foo" as a destination in thejms listener tag.

    Right now i am getting an exception ::::


    WARNING: Async exception with no exception listener: java.io.EOFException
    java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:375)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:192)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:184)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
    at java.lang.Thread.run(Thread.java:619)
    Aug 9, 2010 3:51:42 PM org.springframework.web.servlet.FrameworkServlet initServletBean

    ReplyDelete
  20. @pawan, Although you have not mentioned it, based on your question about username/password, I must assume that you have configured authentication in the activemq.xml file. The following page describes how to do this:

    http://activemq.apache.org/security.html

    To specify the username/password from the client-side (that is, the message listener container), you must do so on the JMS connection object. Below is an example of how to do this:



    The TEST.FOO specified in the destination attribute of the listener element is name of the destination for that particular listener. Because you can specify any number of listener beans, each listener must be associated with a specific destination.

    The exception that you are seeing may be the result of the maxInactivityDuration not being set on the TCP transport. By default, the maxInactivityDuration is set on the broker side to 30 secs. But for it to work correctly, it must also be set to the same duration on the client side. To do this, you must specify it in the broker URL being used on the JMS client. For example, to set the maxInactivityDuration to 30 secs on the JMS client side to match the default on the broker side, specify the property with a value in the broker URL like so:

    tcp://hostname:61616?wireformat.maxInactivityDuration=30000

    For more info on this, see the following pages:

    http://activemq.apache.org/tcp-transport-reference.html

    http://activemq.apache.org/configuring-wire-formats.html

    Hope that helps.

    Bruce

    ReplyDelete
  21. @pawan, Whoops, I forgot to properly specify the format for the example XML I attempted to provide in my comment above. Below is the example:

    <bean id="connectionFactory"
    class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="tcp://foo.example.com:61616"
    p:userName="bruce"
    p:password="secret" />

    ReplyDelete
  22. Bruce, Nice article, thank you indeed !!

    Can you please direct me to more specific reading about restoring connections of message processors with activemq, when an activemq is reconnected. Pardon me but, I am not yet sophisticated using understanding MOM or ActiveMq.

    The problem I am trying to troubleshoot is reestablishing Queues when an ActiveMq is reconnected. So far I have partically solved it. I could establish a connection tweaking ConnectionManager properties, but so far unable to have Listeners consuming messages again after ActiveMQ is brought up again. (I am not using failover property , due to another problem -- If used it seems server hangs and can not be brought down.)

    We are using , somewhat old versions here ActiveMQ 4.1, Spring 2, Jencks. Please let me know if you would like to know more details.

    ReplyDelete
  23. I think tweaking "DefaultMessageListenerContainer" in my spring configuration can help. I have tried some settings but is there a better documentation / tutorial about these classes somewhere?

    ReplyDelete
  24. @Sam, The way to recover connections to ActiveMQ is through the use of the failover transport on the JMS client side as it handles automatic reconnection.

    Yikes, ActiveMQ 4.x is awfully old. I highly suggest that you look into upgrading to 5.x -- a LOT has changed.

    As for your questions about docs, besides the JMS section of the Spring reference doc, the JavaDocs for the DefaultMessageListenerContainer class and its parent classes are the best documentation. If you have any specific questions let me know and I will try to help.

    ReplyDelete
  25. Hi !

    I have scenario as below:
    1. Successfully setup AMQ 5.3 server running on tcp
    Defined Queue and Topics in activemq.xml, AMQ admin shows them correctly

    2. My swing client application which should not create any Queues/Topics in AMQ server, but post and receive messages:
    on some action, class1 creates message and puts in Queue in AMQ so that only single listeners gets message.
    on some action, class2 creates message and puts in Topic in AMQ so that multiple listeners get same message.

    3. In swing client application, I have created spring-config.xml as posted at http://pastebin.org/617311

    I am not sure if I have bind multiple consumers to Topic rightly.
    Also noticed that AMQ admin shows myTopic1 and myTopic2 under “Queues” after running my app !
    Your kind guidence will get me going.

    Thank you very much for your articles.
    Nitin

    ReplyDelete
  26. @Nitin, The Spring config looks good. I'm not sure why topics would show up under the queues page in the web console, but it sounds like perhaps the syntax you used to create them is creating queues with your topic names. Please paste your ActiveMQ configuration so I can take a look.

    BTW, here is some info about creating queues in the ActiveMQ config:

    Configuring Startup Destinations

    Bruce

    ReplyDelete
  27. Hi Bruce,

    Thank you for your time and quick reply.
    AMQ setup is simple and ok, coz before running my app, in admin console it shows myQueue1 and myQueue2 under "Queues" and myTopic1 and myTopic2 under "Topics"
    The only thing I added in activemq.xml which comes with AMQ 5.3 is at http://pastebin.org/619392
    Its only after I run my app, AMQ admin console shows Topics also under Queue.

    I suspect how I configured jms:listener-container. I am not sure how to bind consumers to Queue and Topics ..u rightly said may be it is creating consumers for queue with topic name!

    Please guide me in configuring this in any simpler/better way for my scenario.

    Thanks again,
    Nitin

    ReplyDelete
  28. Hi again !

    Solved this by creating two jms:listener-container one with destination-type queue and other with topic :)

    Thanks,
    Nitin

    ReplyDelete
  29. Bruce,

    I want to replace a stably running active mq with an embedded active mq. It would run in side a JBoss server spring configured.

    Any known pit falls with this approach? I have just started mulling this idea.

    Will I loose any flexibility that I will have with a stand alone active mq? For example can I be able to view the console of it like I can do that of stand alone activemq.

    How can I be able to start and stop it.

    ReplyDelete
  30. @Sam, It really depends on how you configure the broker. You can embed ActiveMQ in JBoss and still configure it in a manner that it uses the activemq.xml so that you have the utmost in flexibility. I've done this many times. In fact, the ActiveMQ In Action book walks through an embedded ActiveMQ configuration in JBoss.

    As far as the web console goes, you should be able to achieve this. After all, it's just a web application that can be easily deployed in Jetty or Tomcat. Although I have not done it myself, I've seen reports of people doing it.

    Bruce

    ReplyDelete
  31. Bruce, I have experimented and sure it works that way but I have more questions.

    Configuration of Broker: You said, if activemq.xml is made available we can configure it flexibly. How do you supply activemq.xml in spring config file or an external file? I am trying to find what the preferred method to configure broker is. I am on an older version of spring, version2.

    Version of spring: I am on an older version of activemq and spring. If I want to use newer versions of activemq like 5.4, I think I must first upgrade spring first? When I tried to deploy newer version of activemq like 5.4, it failed deploying broker bean. Spring version I use again is 2.0.

    Jar files: Is there difference between Linux jars and windows jars from download? I need to deploy this
    In a Linux based jboss srvr, which jar should I download.

    Disadvantages: It looks like embedded broker is norm? But I want to catch, any disadvantages of performance, beforehand. For example on apache page it refers to some class loader issues if embedded, can you please explain what they are?
    http://activemq.apache.org/should-i-deploy-the-broker-inside-my-jvm-or-appserver.html
    (Please see Disadvantages section.)

    Your forum is always very helpful, Thank you.

    ReplyDelete
  32. @Sam, It depends on how you are configuring ActiveMQ in JBoss. Since you explained that you want ActiveMQ embedded in JBoss (i.e., so that it starts and stops with the JBoss lifecycle), take a look at the following page that I wrote on the ActiveMQ website:

    Integrating Apache ActiveMQ with JBoss

    This page explains how to embed ActiveMQ inside of JBoss using the ActiveMQ resource adapter.

    Regarding the broker configuration, in the example, this configuration lives in a file named broker-config.xml and is read in using the ActiveMQ resource adapter. (BTW, the ActiveMQ In Action book contains an updated version of this guide to embedding in JBoss 5.1.0.)

    As for the version of Spring to use with ActiveMQ, see below:

    * ActiveMQ 5.3.2 and lower uses Spring 2.5.6
    * ActiveMQ 5.4 and higher uses Spring 3.0.3

    Regarding your question about the JARs, the only difference between the distributions for the different platforms is line endings and scripts. The Linux/Unix distribution contains shell scripts and the Windows distribution batch scripts.

    The disadvantages listed on the page you reference is just what it says. It's easier to manage a cluster of brokers that are running in a stand alone manner outside of an application server than it is to manage a large number of brokers that are embedded. In a situation where you are running hundreds of instances of the application server, embedding a broker in each instance of the application server can be rather costly to manage in terms of time. In such a situation, it is advisable to consider not embedding ActiveMQ. The classloader issues that are mentioned also have a page describing the solution, below is that page:

    java.lang.NoSuchMethodError

    These errors are rather unlikely to occur, but just in case there may be some old JMS 1.0.2 JARs lying around, it will help. If you experience other classloading issues, the solution is usually just a matter of getting the correct JARs loaded on the classpath.

    Hope that all helps.

    Bruce

    ReplyDelete
  33. Thanks Bruce.

    Can you also please comment on embedding a broker in to a Java application v/s integrating it with JBoss server -- any merits or demerits on one over the other. I read your articles and also active mq in action but I don't think there is a comparision of the two approaches, is it.

    Would you say , embedding a broker in to an application is as well the same as a stand alone one. How about scalability in to future.

    My need is not to build a heavy message processing system at this point -- my main concern is embedded broker acts normally (normal functioning start and be there , handle queues until shutdown) as a stand alone.

    This indeed the question I wanted to ask you above all others, thank you.

    ReplyDelete
  34. @Sam, There is really no difference between embedding a broker in an app server vs. embedding it in an application that is deployed to the app server. Here are a couple more considerations:

    * How easy it is to maintain that broker. Is the broker config easily accessible? Is the broker config defined in Java that requires recompilation every time you make a change? This is why I previously recommended the use of a XML configuration file.
    * Also consider the number of messages that may be stored in the broker at any given time. Does the application in which the broker is embedded have adequate disk space for this purpose?

    These are just two considerations. Depending on your environment and your use cases, there may be more.

    Bruce

    ReplyDelete
  35. Bruce, OK.

    I guess there is no difference between embedding it in to an application v/s a Stand alone activemq broker, either.

    Thanks

    ReplyDelete
  36. @Sam, There is no difference as far as the broker configuration is concerned. The differences are mainly with regard to the management and monitoring of the broker. Because an embedded broker may have difficulty exposing the JMX URL (e.g., firewall, etc.), this could present problems in connecting to the broker to monitor its state. But this can usually be worked around by configuring the command agent to accept administrative commands via JMS text messages.

    http://activemq.apache.org/command-agent.html

    Hope that helps.

    Bruce

    Bruce

    ReplyDelete
  37. Bruce,
    Is there any chance an embedded broker shuts down automatically if it is idle for a long time? I think I have noticed something like that (not quite sure though) it did shut down it self automatically during long weekend when it was idle for a long time. It is still in my testing.

    Is it norm, can we control it?

    ReplyDelete
  38. Another question I wanted to ask is where can I look for activemq.log in an embedded broker. It is essential to see log specially in this kind of problem investigation.

    ReplyDelete
  39. I think logging can be enabled tapping in to server logger..something like log4j. Could produce debug for logger org.apache.activemq.xbean

    But if you have any comments to better it please let me know. My purpose of logging is arrest the root cause of automatic shut down if it really occurs.

    My broker is enveloped in to following

    class="org.apache.activemq.xbean.BrokerFactoryBean"
    property name="config" value="/WEB-INF/activemq.xml"
    property name="start" value="true"

    ReplyDelete
  40. @Sam, Regarding the shutdown, if the main thread is not kept active, then the broker can shut down. You may need to enable TCP keep alives. See the TCP transport reference page for more info:

    TCP Transport Reference

    Regarding the activemq.log file for an embedded broker, that log will wind up in the data directory, so make sure that you set the data directory for the embedded broker. You can either use the BrokerService#setDefaultDataDirectory method or the org.apache.activemq.default.directory.prefix system property. If this is not set, then the activemq-data directory will be placed in the directory from which ActiveMQ was started.

    Regarding the Log4J logging, the logging can be adjusted via the conf/log4j.properties file using standard Log4J syntax. For more info, see the website:

    How do I change the logging

    Hope that helps.

    Bruce

    ReplyDelete
  41. Bruce,

    CONNECTIVIY DURING INACTIVITY :
    I have added keepAlive property to transportconnector in activemq.xml.

    transportConnector name="openwire" uri="tcp://localhost:61616&keepAlive=true"

    Keeping broker alive irrespective of activity is very important to me for my purpose of reasons that I want to replace stand alone. And I hope keepAlive can do the trick -- even during long weekends it should stay on.


    LOGGING:
    Presently I have added required jar files and activemq.xml in brokerservice bean in spring config to enable embedded activemq. I have not included config directory or log4j.properties. Should I instead also include config directory and log4j.prop file OR should I enable in jboss log4j.xml file for activemq?

    IMPLEMENTATION OF EMBEDDED ACTIVEMQ:
    Apart from activemq in action , which other places can I refer for some detail level discussion of activemq as an embedded broker. Presently It seems I am doing some guess work and adjusting it based on your answers and my research.

    Thank You Bruce.

    ReplyDelete
  42. Just wanted to add to my previous question, I notice there is no automatic activemq.log created with present configuration.

    ReplyDelete
  43. @Sam, Re: logging, there's no need to include the entire config directory just to enabled logging. If your application is deployed in JBoss, then you will need to add the appropriate categories to the jboss-log4j.xml file so that the ActiveMQ log statements you are seeking will be output. Due to the lack of such categories in the JBoss logging config, there will be no logging output from ActiveMQ. If you want a separate log file to be output for ActiveMQ log statements, then you will need to configure a new file appender that filters only log statements from ActiveMQ.

    Re: embedded broker discussion, the best place to discuss ActiveMQ is on the ActiveMQ users mailing list. Information on the list is available here:

    http://activemq.apache.org/mailing-lists.html

    Bruce

    ReplyDelete
  44. Bruce,

    You might want to review these problems.

    JVM is throwing a thread core dump with embedded activeMQ.

    Cause of thread dump : Dump Event "systhrow" (00040000) Detail "java/lang/OutOfMemoryError":"Failed to allocate JNIEnv" received

    I read around the problem and following is suggested in an analysis tool.

    "NOTE: Only for Java 5.0 Service Refresh 4 (build date:February 1st, 2007) and older. When you use delegated class loaders, the JVM can create a large number of ClassLoader objects. On IBM Java 5.0 Service Refresh 4 and older, the number of class loaders that are permitted is limited to 8192 by default and an OutOfMemoryError exception is thrown when this limit is exceeded. Use the -Xmxcl parameter to increase the number of class loaders allowed to avoid this problem, for example to 25000, by setting -Xmxcl25000, until the problem is resolved."

    ReplyDelete
  45. Bruce,

    I have noticed another problem related to the above is , I can not now open jconsole in to embedded broker.

    The keepAlive option with tcp/ip seem to be working and I can submit messages and consume , but 1099 port dies , do I have to do something similar Or is it related to above problem from your guess.

    I know you said there would be some differences between standalone and embedded options and I am wondering if this is one of them -- delegated class loaders.

    It would be difficult for you to guess every thing with out seeing actual env. but let me some facts here.

    Server is JBoss 4
    ActiveMq is 4.11 (I could not deploy a newer one as I need to first enhance spring. It is 2).
    os is linux.

    Any comments are welcome.

    ReplyDelete
  46. @Sam, Re: the OutOfMemoryError, I hazard a guess that you are using the default values for the amount of memory available to ActiveMQ. By default, the classes in ActiveMQ are hard-coded to use 64mb of memory. This can be changed by editing the <systemUsage> element (specifically the <memoryUsage> child element) in the activemq.xml file. More info about this is available here:

    http://activemq.apache.org/producer-flow-control.html#ProducerFlowControl-Systemusage

    Re: JMX in embedded ActiveMQ, you should be able to configure ActiveMQ to use the same JMX agent that JBoss uses via the <managementContext> element in the activemq.xml. There's some more info on the <managementContext> here:

    http://activemq.apache.org/jmx.html

    Bruce

    ReplyDelete
  47. Hi,

    I really appriciate if anyone can guide me on below -

    I have two weblogic domains D1 & D2.
    D1 has cluster C1 (group of 2 managed servers).
    D2 has cluster C2 (group of 2 managed servers).

    Two distributed queue ReqestQueue & ResponseQueue has been deployed in cluster C1.

    I have jms client setup application (APP1) (spring jms template) deployed in cluster C2.

    APP1 need to sends message in RequestQueue and recieve message from ResponseQueue.

    With current set up APP1 is able to post and receive message only from one Managed Server.

    Please guide me way to post/receive message in weblogic jms queue(distributed) deployed in cluster.

    ReplyDelete
  48. @jadu14$, My experience with Weblogic is quite out of date. I think you might have more luck asking your question in the Weblogic JMS forum here:

    http://forums.oracle.com/forums/forum.jspa?forumID=831&start=0

    Bruce

    ReplyDelete
  49. Hi Bruce
    In our spring based App, we have configured multiple JMS message listeners as mentioned in this blog. All works as expected.

    As per our understanding, once the Spring Application Context loaded, the listeners do start working.

    While the application running, Is there any way we can programmatically STOP one of the listener? So that listener stops processing its messages until it restarts.

    Basically we want to programmatically control Stop, Start or Pause one of the Listeners while the application is running and without affecting other Listeners?
    Is this achievable?

    Thank you,
    Krishna.

    ReplyDelete
  50. @Krishna, Have you tried using the start method and the stop method on the DefaultMessageListenerContainer?

    Bruce

    ReplyDelete
  51. Thanks Bruce,
    That's what we need.
    Krishna

    ReplyDelete
  52. @Krishna, I'm glad I could help!

    ReplyDelete
  53. Hi Bruce, I'm trying to write a plugin for a proprietary app "X" so it receives events from a JMS queue. Using Spring's DMLC, it begins consuming messages before the plugin receives the "go" signal from "X".
    Is it possible to have DMLC initialized in stopped state?
    I also need to pass a parameter received with the "go" signal to the MessageListener instance initialized with Spring... any idea on how to do this "cleanly"?

    ReplyDelete
  54. @Matias, In Spring 3, in the DMLC class hierarchy is the AbstractJmsListeningContainer class which contains a property named autoStartup whose default value is true. You can use the AbstractJmsListeningContainer.setAuthoStartup() method to set the value to false to allow for a manual startup. To manually start the DMLC, invoke the start method.

    Bruce

    ReplyDelete
  55. This comment has been removed by the author.

    ReplyDelete
  56. Hi Bruce, I am using Spring integration's Message driven channel adapter which internally uses DefaultMessageListnerContainer. I am going against Websphere's Service integration bus 7.x After reading the doc, it seems that DefaultMessageListenerContainer is polling based, where it sits in infinite loop to receive messages. Is there an event driven container? I am working on an app that is realtime in nature.
    Thanks Sri

    ReplyDelete
  57. @meltyourfat, You should probably take a look at the Spring SimpleMessageListenerContainer. The SMLC is not a polling container but a plain old asynchronous JMS listener container with the option to use a Spring task executor to invoke the listener. The SMLC can also handle a Spring SessionAwareMessageListener.

    ReplyDelete
  58. Hi bruce,
    thanks for quick response. I will use smlc and see how it performs.
    One other question regarding DMLC.

    Does this message driven channel adapter require spring's caching connection factory or adding concurrent-consumers is good enough with plain connection factory?
    Similarly for
    what kind of caching is available? or is best?
    Thanks
    sri

    ReplyDelete
  59. @meltyourfat, Per the Spring Integration documentation for the Message-Driven Channel Adapter:

    The "message-driven-channel-adapter" requires a reference to either an instance of a Spring MessageListener container (any subclass of AbstractMessageListenerContainer) or both ConnectionFactory and Destination (a 'destinationName' can be provided in place of the 'destination' reference).

    ReplyDelete
  60. Yes! the autoStartup="false" property on the Spring context worked like a charm. Thanks!

    ReplyDelete
  61. @Matias, Thanks for reporting back your results. I'm happy to hear that it worked well for you!

    ReplyDelete
  62. Hello,

    We packaged a JMX enabled version of the Spring's DefaultMessageListenerContainer at ManagedSpringJmsDefaultMessageListenerContainer. We use it both for monitoring and for operations with stop() and start() actions.

    We associated this with JMX enabled ConnectionFactory wrapper and Spring CachingConnectionFactory, they both have monitoring jsps, Hyperic HQ plugins and Spring XML namespace configuration.
    We tried our best to provide detailed documentation at Google Code - XebiaManagementExtras.

    I hope this can help you,

    Cyrille

    ReplyDelete
  63. Hi bruce,

    we use activemq with spring and spring integration.

    On the message producer side, we have a pooled Connection Factory with maxConnections parameter set to 3.

    On the consumer side also, we maintain a pooled connection Factory with maxConnections as 1.


    When a failover happens from active to passive broker, the senders succesfully connect to the second
    broker where as the consumers don t connect 100 % successfully.

    On the activemq console of the passive broker, we see consumer count for a queue as 5 (we gave concurrent
    consumers as 5), but no message is getting dequeued.

    our prefecth is 250. When we see the active consumers for that queue, we see 250 enqueues but 0 dequeues.

    The only solution we have currently is to restart the consumer. We use message groups and our ack mode is
    DUPS_Ok.

    when we take a thread dump of the consumer, all the MessageListenerContainer threads wait at

    - locked <0x5421ab28> (a java.lang.Object)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:428)
    at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:554)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:74)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:405)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:977)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:969)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:871)
    at java.lang.Thread.run(Thread.java:619)


    Where are we making mistake. is it because of the pooled Connection Factory. Please help us.


    Thanks,
    D.Radhakrishnan

    ReplyDelete
  64. @Radhakrishnan D, You mention that the consumers don't always automatically reconnect to the passive broker on a failover, are the consumers using the failover transport? There are many options for tuning retries, etc. of the failover transport.

    Since messages are not being consumed correctly, I suggest making the scenario as easy as possible and then add the settings you need one-by-one in order to narrow down the problem. So as a test, I'm curious to know if messages are consumed correctly if you don't use message groups? If that does not work, try an even easier test and see if the ActiveMQ examples work correctly. The examples are extremely simple and good to use as a smoke test to see if everything is working from end-to-end.

    Also, instead of using the ActiveMQ web console to monitor the broker, I suggest using JConsole instead. Monitoring ActiveMQ with JConsole gives you access to many more properties on the broker than the ActiveMQ web console.

    The thread dump you provided looks OK to me. But before digging into thread dumps, make sure that the most basic send/receive works as I suggested above.

    Bruce

    ReplyDelete
  65. This comment has been removed by the author.

    ReplyDelete
  66. Nice writeup Bruce.

    I am currently trying to determine whether to use org.springframework.jms.connection.CachingConnectionFactory or org.apache.activemq.pool.PooledConnectionFactorywith a Spring 3/Tomcat 6/ActiveMQ 5.4.2 application.

    This application will need to support 20k concurrent users so even if we bump sessionCacheSize from 1 to a larger number I am still weary of the following statement in the CachingConnectionFactory JavaDoc.

    Note also that MessageConsumers obtained from a cached Session won't get closed until the Session will eventually be removed from the pool. This may lead to semantic side effects in some cases.

    I can see this being a potential issue with an application with a high number of concurrent users as the MessageConsumers will start to pile up. You mention that the CachingConnectionFactory is used in many business applications around the world. Do any of those applications include the scenario of 20k concurrent users? We will be benchmarking the two connection factories via JMeter tests next week but just wanted to see if I could get your take as well.

    Regards,

    Jonathan

    ReplyDelete
  67. @Jonathan, Both the Spring CCF and the ActiveMQ PCF can handle your situation as I have seen large apps using both. However, the number of concurrent users alone does not define which one should be used. It's more a matter of how the app is built and what JMS features it is using.

    For example, one difference between the CCF and the PCF is that the CCF caches message consumers using the JMS selector as part of the hash key. This can be a problem for apps with high concurrency if each consumer is using its own selector because it will not allow for reuse of consumers. This can drive overhead higher and cause memory related issues.

    Some situations (such as the one noted above) require multiple different connection factories (and Spring DMLCs) to be defined for different purposes in an application. This basically comes down to a manual division of labor for consumers so as to disperse the connection load amongst many connection factories and/or Spring DMLCs.

    Another difference is the connection/session model used by the CCF and the PCF. The CCF uses a single connection/many sessions model (which is very standard) whereas the PCF uses many connections/many sessions. Neither one is better than the other, its just a difference and means that tuning each one is unique. This leads to my final point, tuning.

    Tuning the connection factories (or a lack thereof) can make or break testing. So make sure to fully understand the configuration of not only ActiveMQ (highly important, esp. the systemUsage) but also of the connection factory that is being tested.

    Hope that helps.

    ReplyDelete
  68. @Bruce, Thanks for your detailed response. That makes a ton of sense to create separate connection factories and DMLCs for the different JMS usage patterns. So with this it is possible we might use both the CCF and PCF in the same application.

    Keep up the great work with Camel and ActiveMQ. They are a couple of my favorite projects right now.

    -Jonathan

    ReplyDelete
  69. @Jonathan, There's no reason that you can't use both the CCF and the PCF in the same app. Of course, this is dependent upon your app's requirements in conjunction with the unique behavior of each solution. Just make sure to experiment and test to gain a full understanding of the functionality of each before deploying them.

    Also, a quick bit of info about the Spring DMLC and connection caching/pools. The DMLC provides its own tiered caching strategy for JMS resources. With this caching in the DMLC, there is no need to also use the CCF. This can result in double connection/session caching and not what you're seeking.

    ReplyDelete
  70. @Bruce, I really appreciate the insight concerning the DMLC caching overlap with CCF to help us avoid potential pitfalls as we are somewhat new to these technologies. I have your ActiveMQ book on pre-order so when it gets here next week I won't have to bombard you with as many questions.

    I noticed you are living in Boulder. I am down the road in Denver so I am sure you are as ready as I am for some Spring weather to quit teasing us and finally get here.

    Take care,

    Jonathan

    ReplyDelete
  71. @Jonathan, I'm glad that my experiences are able to help you and other folks who are newer to JMS and ActiveMQ. I hope the book will also be helpful for you.

    A fellow Coloradoan! I'm definitely ready for consistently warm weather so I get outside on a regular basis. But this is standard Spring time weather in Colorado. As the saying goes, if you don't like the weather here, wait 10 minutes ;-).

    ReplyDelete
  72. Hi Bruce,

    It seems that my configuration settings are not displayed properly in my comment.

    Can you look at code which I posted on coderanch ?

    http://www.coderanch.com/t/530341/Spring/DefaultMessageListenerContainer-not-getting-started#2404961

    Regards,
    Anand

    ReplyDelete
  73. @anandloni, By default, the DMLC will start up automatically when the Spring application context is started. This is controlled by the autoStartup property in the AbstractJmsListeningContainer set to true. There's no need to start up the listeners as you mentioned because they get invoked each time a message arrives in the DMLC.

    Which message listener interface does your TCDataSubscriber bean implement? For more info, see the Spring Reference Doc's section named Receiving a message.

    ReplyDelete
  74. Great article. Do you know anything about compatibility with WebSphere? In the following article, it appears that IBM recommends against using anything other than MDBs for high availability / workload management. They also seem to suggest that only the DMLC would be compatible if you went against their recommendation. Thoughts?

    http://www.ibm.com/developerworks/websphere/techjournal/0609_alcott/0609_alcott.html

    ReplyDelete
  75. @Justin, I know that many people have used Spring JMS with WebSphere Application Server (WAS). The reason that IBM recommends MDBs that run on WAS is because MDBs will utilize resources from WAS such as JMS connection pools, thread pools, etc. The reason for such a recommendation comes down to a sales and marketing strategy. Did you notice that the article is written by six folks who are employed by IBM?

    IBM is a business that makes a lot of money by selling WAS. Being driven by this purpose, it's not in IBM's interest to support other competing app servers or spend time recommending customers use anything other than WAS. It is in their interest to show how to use open source technologies that will run on WAS, but not to circumvent WAS. Furthermore, IBM's expertise is with WAS so this is naturally their recommended and supported solution for deploying applications. It scales well and they know how to support it, but it does require a very steep financial investment. Hence, their recommendation against using the Spring DMLC.

    I can tell you that WAS has some pretty strict requirements about what JMS APIs can be used on the server vs. on the client. That's what the article is referring to when it says, 'Be aware that no other Spring JMS MessageListenerContainer types are supported, as they can start unmanaged threads and might also use JMS APIs that should not be called by applications in a Java EE environment.' But this is nothing that you can't work around with some perseverance.

    The Spring DMLC was created to allow developers to use a MDB-like model without the requirement for a full Java EE application server. Using an app server like Jetty or Tomcat, developers can utilize the Spring DMLC and enjoy the same powerful functionality. This basically provides developers a path to avoid the costly investment in full Java EE, commercial application servers, so of course IBM is going to recommend against it.

    ReplyDelete
  76. Thanks for the quick reply. The IBM spin was definitely understood. I've been attempting to reconcile reality with their marketing statement. I suspect that so long as we use a Task Executor against the WAS Work Manager we'll be at least running in a manner where the threading will work better with WAS (potentially with some other necessary tweaks). Beyond that, based on what you've written, it sounds like we should be able to get a similar level of performance. Sound about right?

    Of course, we'd need to test that all out. It doesn't seem like there is much out there that points someone specifically down the path to MDB freedom on WAS.

    ReplyDelete
  77. @Justin, Spring provides a task executor that delegates to a CommonJ WorkManager for this purpose named WorkManagerTaskExecutor, so check that out.

    As you mention, there's not much out there in the way of documenting such an effort on WAS. It's more an exercise for the user type of task. That being said, the fact that Spring provides this convenience class should tell you that folks are actually doing this ;-).

    ReplyDelete
  78. I come from a middleware environment where GUI's lead me to creating thread-pools for JMS messaging. What I can't figure out is where to find the equivalent within a spring configuration.

    My simple use case is I want to pull messages off a JMS Topic with x number of subscribers. Each subscriber to be configured with 5-10 threads to receive that data. Can you point me in a direction to achieve this?

    ReplyDelete
  79. (I thought i posted this yesterday. Apologies if it is duplicate)

    Hi Bruce,

    We have Connection factory and Listener container configured as below. We need the redelivery policy for re-processing. So some of our messages take very long time to process it.

    So our design requirement is: Any consumer should process/hold ONLY ONE message at any given point of time. Any consumer should NOT even have/hold any messages in to its ‘to be processed’ buffer or in RAM. It should always have one (the ONE which is processing) or zero.
    That’s the reason we set the prefetch size to 1.

    (We are fine with slow processing / low message volumes)

    <!-- connection-factory -->
    <amq:connectionFactory id="xConnectionFactory" alwaysSessionAsync="true" brokerURL="${jms.broker.url}" userName="${jms.queue.username}" password="${jms.queue.password}">

    <amq:prefetchPolicy>
    <amq:prefetchPolicy queuePrefetch="1" />
    </amq:prefetchPolicy>

    <amq:redeliveryPolicy>
    <amq:redeliveryPolicy maximumRedeliveries="10"
    initialRedeliveryDelay="5000" backOffMultiplier="3"
    useExponentialBackOff="true" useCollisionAvoidance="true" />
    </amq:redeliveryPolicy>

    </amq:connectionFactory>


    <!-- Listener container -->
    <jms:listener-container concurrency="3-5"
    connection-factory=" xConnectionFactory" acknowledge="transacted">
    <jms:listener id="xMessageListener" destination="${jms.queue.name}"
    ref="xSystemMessageListener" />
    </jms:listener-container>


    Fyi – Our listener implements SessionAwareMessageListener

    import org.springframework.jms.listener.SessionAwareMessageListener;

    @Component
    public class SystemMessageListener implements SessionAwareMessageListener {

    if(isProcessed ){
    session.commit(); //For process success
    } else {
    session.rollback(); //rollback for any exceptions. So redelivery policy applies.
    }}

    Our problem is: Even though we set the prefetch limit to one, We saw that more than one message was streaming to the same consumer, where that consumer was still processing/ applying the re-delivery policy (not yet committed the session) of another message ( In very rare scenarios it takes 5 to 10 hours to finish processing one message when re-delivery policy applies)

    Do you see any configuration issue in the above code? Why the prefetch limit is not working as expected?

    Also are we using any connection pool in the above config ? (We believe we are not using)
    The faq in the following URL page warns using the prefectch in the pooled connections. So we confuse whether we are using the connection pool or not. http://activemq.apache.org/what-is-the-prefetch-limit-for.html

    Also Can we consider prefetch limit of zero? Any downsides of it?

    Any help would be appreciated.

    Thank you,
    Krishna

    ReplyDelete
  80. @Krishna, Have you used the JMX API or JConsole to view the prefetch size for the consumer's subscription? In JConsole, when a consumer is connected to the broker, you will find the prefetch size on the consumer's subscription. For example, when I have a consumer using a non-persistent subscription to a queue named TEST.FOO, I can find the subscription under org.apache.activemq//Subscription/Non-Durable/Queue/TEST.FOO//Attributes/PrefetchSize. Use this to verify that the prefetch size is actually being set to 1.

    At a glance, your configuration looks fine to me. However, a prefetch of 1 tells the broker to stream 1 message to the consumer even if it is still processing a message (i.e., has acknowledged the message it has already been sent).

    In situations like this, I will create a stand alone example that excludes everything except only what I want to test. This can be a huge help to me in discovering the problem. I oftentimes build such examples as a JUnit test where I can assert various things such as the prefetch size. This allows me to use an embedded broker so that I can easily make use of the JMX API to fetch various properties (such as the prefetch size) at various times during the test. There are many JUnit tests in ActiveMQ that you can use as an example for building your own test to verify your settings.

    Bruce

    ReplyDelete
  81. Hi Bruce,
    I checking JMS with spring 3
    i was trying the listener xml snippet and tried something like
    method="onMessageCustom" in place of
    method="onMessage"

    but i am getting compile time error in listner class.
    point is, i have to have method with name 'onMessage'
    What i am not getting why we have 'method' attribute at all ?

    ReplyDelete
  82. @Praveen, if you are implementing the javax.jms.MessageListener interface, you are required to implement the onMessage() method. The only way to use a method of a different name is to implement one of the Spring interfaces.

    ReplyDelete
  83. Hi Bruce,

    I have two standalone instance of the Spring Container. Using maxConcurrentCosumer=10 for both instance. but if unchecked exception comes, i do system.exit but its not shutdown the application and still exception message remain there in the queue, then i use to get Long running transaction . I would like to know how we can stop the application if any exception comes. I heard in the case of multithreaded, its not possible to do use system.exists. It won't kill the Daemon thread.

    ReplyDelete
  84. @Rahul Bhandari, If you are trying to shutdown the Spring MessageListenercontainer, you will need to get a reference to it so that you invoke its stop() method.

    Also, you might think about utilizing the setExceptionListener to register your own custom JMS exception listener as well as the setErrorHandler method to register your own custom exception handler for errors when processing a message. Check out the JavaDoc for each method for more info.

    ReplyDelete
  85. Thanks Bruce, But i need to stop the Listerner as well as stop the application in the multithreaded environment. i dn't have to handle the exception. When i stop the both listeners using the DefaultMessageListener.stop(), then one instance is still listening the exception message. After stopped i have tried to use System.exist(1) for JVM exists , but other non-daemon thread still running i suppose. What do you suggest? Please ASAP let me know

    ReplyDelete
  86. @Rahul, Calling the System.exit() method won't shut down the JVM when non-daemon threads are still running. The best way to shut down the MLC is to shut down the whole Spring container by calling the ApplicationContext.close() method or the conApplicationContext.stop() method.

    ReplyDelete
  87. This comment has been removed by the author.

    ReplyDelete
  88. This comment has been removed by the author.

    ReplyDelete
  89. (I apologize for the previous posts ... xml code was blocked. Here is the post again. Sorry!)

    Hi Bruice

    Can one generic listener bean be used in two jms listener-containers to process the messages from two different queues?

    We have a single Generic Message Listener bean as below.

    <bean id="genericMessageListener" class="com.mycompany.GenericMessageListener">

    and we have 2 listener containers as below - note that they listen from two different queues
    but use the same generic listener bean to process the message.

    <jms:listener-container container-type="default" connection-factory="connectionFactory" acknowledge="auto">

    <jms:listener destination="QUEUE.ONE" ref="genericMessageListener" method="onMessage" />
    </jms:listener-container>

    <jms:listener-container container-type="default" connection-factory="connectionFactory" acknowledge="auto">

    <jms:listener destination="QUEUE.TWO" ref="genericMessageListener" method="onMessage" />
    </jms:listener-container>

    (We use Message properties to distinguish the message type and queue name in onmessage method of GenericMessageListener)

    Seems it is working as expected (We are still testing though). But are there any issues/cons in this approach?

    We appreciate your time.

    Thanks,
    Krishna

    ReplyDelete
  90. @Krishna, You probably do not want to use a single instance of a given generic listener bean across two different listeners. When I want to use the same class for more than one listener, I create a separate instance of the generic listener bean for each listener. That way, you won't need to deal with race conditions, etc. because an onMessage() method is not inherently thread-safe.

    ReplyDelete
  91. Hi Bruce,

    Am facing an issue with MBeans, I want to control life cycle operations(start/stop) of the listener using jmx. I have defined message-listener-adapter along with this in my configuration file i have xpath-router and recipient-list-router. when i have defined context:mbean-export and context:mbean-server in my configuration file, I could see in jconsole only recipientListRouter and XpathRouter as Mbeans but not my listener/adapter. Can you please let me know how to configure message-listener-adapter as MBean.

    ReplyDelete
  92. Hi Bruce,

    Small correction in my previous post, i want to know how to configure message-driven-channel-adapter as MBean. thanks !

    ReplyDelete
  93. Hi Bruce,

    Thanks for the article. We're currently polling queues for messages, using JMS template receive() because we only want to consume when our worker servers have capacity.

    This means we're having to handle a lot of complexity ourselves (transactions, threads etc.). Is there a way we can use a message listener and enable/disable it programmatically?

    Thanks.

    ReplyDelete
  94. @Cornish, Take a look at both the start method and the stop method on the DefaultMessageListenerContainer. These might be useful for your situation.

    ReplyDelete
  95. Hello Bruce,

    This is a similar question to one asked before, but with a slight twist. We have a single listener-container which handles multiple listeners in our system. ie :

    <jms:listener-container container-type="default" .....
    <jms:listener destination="x" ref="beanA" />
    <jms:listener destination="y" ref="beanB" />
    <ms:listener destination="z" ref="beanC" />
    </jms:listener-container>

    Now, I just want to stop consuming from destination "y" but keep "x" and "z" running. If I call messageListenerContainer.stop(), it will stop everybody. Is there a way to selectively
    stop and restart and individual listener programmatically ?

    I can always put each listener in it's own listener container to do that, but it seems convoluted.

    Thanks,

    Jalpesh

    ReplyDelete
  96. hi bruce,
    i need the same configuration for apache qpid javabroker.please post the sample code and libs for producer and consumer.
    thanks,
    Satya

    ReplyDelete
  97. @satya, I have updated the post with a link to the Github repo. Enjoy!

    ReplyDelete
  98. This comment has been removed by the author.

    ReplyDelete
  99. Hi Bruce ,
    This is Siva ,i want develop java wrapper to consume messages from MQ Queues using Spring JMS . For this , i got some info from sites and completed implementation and tested but messages are not being consumed .

    Could please help me on how to start Spring JMS container to invoke onMessage()

    See the below spring config file as well as java client







    com.sun.jndi.fscontext.RefFSContextFactory
    file:///D:/Eclipse/MQjars/lnx34-Bindings/









    CHN.BS.ETL.REQ.QCF









    false









    QUEU.BS.ETL.REQ.QD









    java client:

    ---------------

    AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring-service.xml");
    DefaultMessageListenerContainer defaultMessageListenerContainer = (DefaultMessageListenerContainer)applicationContext.getBean("jmsContainer");
    defaultMessageListenerContainer.initialize();
    defaultMessageListenerContainer.start();

    ReplyDelete
  100. In my post , i am not able to see xml content , why?

    ReplyDelete
  101. Sorry , in my previous post, xml file content was blocked so i am republish again

    bean id="jobListener" class="com.dnb.batch.legacyenginewrapper.jms.JobListener" />

    bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    property name="environment">
    props>
    prop key="java.naming.factory.initial">com.sun.jndi.fscontext.RefFSContextFactory
    prop key="java.naming.provider.url">file:///D:/Eclipse/MQjars/lnx34-Bindings/
    /props>
    /property>
    /bean>

    bean id="internalJmsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    property name="jndiTemplate">
    property name="jndiName">
    value>CHN.BS.ETL.REQ.QCF
    /property>
    /bean>

    bean id="jmsQueueConnectionFactory"
    class="org.springframework.jms.connection.SingleConnectionFactory102">
    property name="targetConnectionFactory">
    ref bean="internalJmsQueueConnectionFactory"/>
    /property>
    property name="pubSubDomain">
    value>false
    /property>
    /bean>

    bean id="jmsDestinationResolver"
    class="org.springframework.jndi.JndiObjectFactoryBean">
    property name="jndiTemplate">
    ref bean="jndiTemplate"/>
    /property>
    property name="jndiName">
    value>QUEU.BS.ETL.REQ.QD
    /property>
    /bean>
    jms:listener-container container-type="default"
    connection-factory="jmsQueueConnectionFactory"
    concurrency="10"
    acknowledge="auto">
    jms:listener destination="QUEU.BS.ETL.REQ.QD" ref="jobListener" method="onMessage"/>
    /jms:listener-container>
    /beans>

    java client:
    ---------------
    AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring-service.xml");
    DefaultMessageListenerContainer defaultMessageListenerContainer = (DefaultMessageListenerContainer)applicationContext.getBean("jmsContainer");
    defaultMessageListenerContainer.initialize();
    defaultMessageListenerContainer.start();

    ReplyDelete
  102. @Siva, for an example of Spring JMS to asynchronously receive message, please see the examples that I have provided in my Github repository here:

    https://github.com/bsnyder/spring-jms-examples/tree/master/async

    You can clone this repository to your local system in order to compile and run the examples using ActiveMQ or any other message-oriented middleware (MOM). To use a MOM other than ActiveMQ, you will need to change the XML configuration for the specific example to point at the correct JMS connection factory.

    These examples provide implementations of the three styles of message listener that are supported by Spring JMS including:

    * javax.jms.MessageListener
    * org.springframework.jms.listener.SessionAwareMessageListener
    * org.springframework.jms.listener.adapter.MessageListenerAdapter

    The Spring documentation provides an explanation of these three styles of asynchronous message reception here:

    Asynchronous Reception - Message-Driven POJOs

    Take a look at the these items and compare them to what you are doing in your application. Hopefully this will help you figure out your issues.

    ReplyDelete
  103. Thanks for your reply . The problem here is that when i run my standalone Message driven POJO , it is not throwing any error or exception but messages are being picked up from MQ Queues.

    I got a .binding file that is created using JNDI fsContext and as per the binding file i did config changes in spring config.xml even then still i am having same problem .

    To connect MQ queue using binding file , do i need to use any adapter?

    Appreciate your help.

    ReplyDelete
  104. @Siva, Spring does not support a .binding file. Any configuration items in the .binding file need to be translated into the appropriate Spring XML configuration.

    ReplyDelete
  105. Hey Bruce ,
    I am facing the Long running transaction using the DMLC. Configuration is ::
    bean id="txManager" class="org.springframework.jms.connection.JmsTransactionManager"
    bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
    property name="connectionFactory" ref="pooledConnectionFactory"
    property name="destinationName" value="XYZ_MQ"
    property name="messageListener" ref="messageListener"
    property name="transactionManager" ref="txManager"
    bean
    Setting the jmscorrealation id, maxConsumer=2, concurrentConsumer=2,maxConnectionPool=5.
    (using the multithreading concept )
    There are 60 message , out of sometime processed like 57-58 message, two stuck somewhere and there is no exception, after sometime 2-3 message processed, meanwhile we got the so many Long Running Transaction alert .

    Please let me know, what else is missing in the spring configuration.

    ReplyDelete
  106. @Rahul, Based on your description, it doesn't sound like anything is missing in your Spring config. My best guess is that it has something to do with message prefetch or something similar for each JMS client. What message broker are you using?

    ReplyDelete
  107. Hi Bruce,
    I need to subscribe to multiple topics.
    Is it correct to do somethings like:











    Listner1 & Listner2 implement MessageListener both use the same factory. I get a lot of errors "Cannot refresh jms connection" as soon as I add the second listner.

    Is this correct or should I implement only one Listner and that inturn dispatches messages to Topic1 & Topic2 ? If this is correct, isnt that slowing down the consumer ?

    Hope you can help a newbie. Most examples over the web have only one Topic.

    Thanks,
    Ram

    ReplyDelete
  108. Hi Bruce,
    thanks for your best helpful effort

    i have one question

    - if i have 4 clients sending messages on the same queue i want to mark the messages sent from each client.
    - i tried to setMessageId but when i getMessageId at consumer it gives me different and unique Id for each message.

    how can i solve this situation

    ReplyDelete
  109. @Ramji, Unfortunately I cannot see your configuration. If you are posting XML, you need to convert the < > characters to their HTML ASCII character entity names (see this reference for more info).

    If you are using the Spring DMLC, below is a quick example of how to listen to two topics:

    ...
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="tcp://foo.example.com:61616" />

    <bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

    <jms:listener-container
    container-type="default"
    connection-factory="connectionFactory"
    acknowledge="auto">
    <jms:listener id="testFooListener" destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
    <jms:listener id="testBarListener" destination="TEST.BAR" ref="simpleMessageListener" method="onMessage" />
    </jms:listener-container>
    </beans>
    ...

    What Spring actually does in a case where there are multiple listeners defined like this is create a separate DMLC instance for each listener. So there should not be a slowdown to either consumer.

    Regarding your connection, make sure that you are using a pooled connection such as the Spring CachingConnectionFactory (and make sure to set the cache size appropriately for your environment).

    ReplyDelete
  110. @AhmedAbobakr, Below is my description of the JMSMessageID that I wrote for the ActiveMQ In Action book:

    'JMSMessageID —A string that uniquely identifies a message that’s assigned by the JMS provider and must begin with ID. The message ID can be used for message processing or for historical purposes in a message storage mechanism. Because message IDs can cause the JMS provider to incur some overhead, the producer can advise the JMS provider that the JMS application doesn’t depend on the value of this header via the MessageProducer.setDisableMessageID() method. If the JMS provider accepts this advice, the message ID must be set to null. Be aware that a JMS provider may ignore this call and assign a message ID anyway.'

    That's according to the JMS spec.

    In your situation, when you are grabbing the JMSMessageID before the message is sent, you won't get the appropriate ID at that point in time because the JMS provider has not yet sent the message so it has not yet assigned the permanent ID to the message. In your case, instead of trying to grab the JMSMessageID, you will be better off setting your own property on each message that contains your own globally unique ID.

    ReplyDelete
  111. Thank You for your Quick and Prompt Response. I really appreciate it. In the example you mentioned if Class A is interested in TEST.FOO and class B is interested in TEST.BAR. How will they get the Message from simpleMessageListener? As for both destinations simpleMessageListener's onMessage gets called, it would now need to handle & keep track of class A & class B i.e it should send the Messages to via some subscribe, notfy implementation ?
    or is it better to have class A & B implement MessageListener directly and in the xml above have the listener ref for Test.FOO as class A and similarly have listener ref for Test.BAR as class B ?
    Wanted to understand which is the correct implementation & why ?
    Also for my clarity the bean ids: testFooListener & testBarListener are not really used, are they ?

    Thanks again.

    ReplyDelete
  112. @Ramji, You should either have class A and class B implement the MessageListener directly or create one MessageListener implementation that delegates its message to class A and one MessageListener implementation that delegates its messages to class B.

    You are correct, the ides are not used, I see now that adding those was an error. See the example below for corrections and clarification:

    ...
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="tcp://foo.example.com:61616" />

    <bean id="simpleMessageListenerA" class="com.mycompany.ClassASimpleMessageListener">

    <bean id="simpleMessageListenerB" class="com.mycompany.ClassBSimpleMessageListener">

    <jms:listener-container
    container-type="default"
    connection-factory="connectionFactory"
    acknowledge="auto">
    <jms:listener destination="TEST.FOO" ref="simpleMessageListenerA" method="onMessage" />
    <jms:listener destination="TEST.BAR" ref="simpleMessageListenerB" method="onMessage" />
    </jms:listener-container>
    </beans>
    ...

    I hope this example is more clear than the previous one.

    ReplyDelete
  113. Bruce,

    I have to programatically call the doStop/doStart methods on DefaultMessageListenerContainer. I have three queues configured in weblogic6.1 using jms namespace.

    1. How can I get the instance of DefaultMessageListenerContainer using spring context, as doesn't have an ID attribute to retrieve.
    2. I had created the DefaultMessageListenerContainer like we create using bean tag, but would like to associate multiple listener to the container.

    ReplyDelete
  114. @Prakash, please see my answers below:

    1) You can fetch the DMLC by type using the ApplicationContext.getBean(java.lang.Class) method:

    ...
    DefaultMessageListenerContainer dmlc = appCtx.getBean(DefaultMessageListenerContainer.class);
    ...

    This will work to fetch the DMLC if you only have a single instance in the registry, but if you have more than one DMLC instance in the registry, you need to get a little more creative to find the one you want.

    Here's a bit of background before proceeding: most people are not aware that any single instance of a DMLC actually only handles one listener at a time. This is something that the Spring XML style of configuration kinda masks this and handles it automatically by creating a new DMLC instance for each listener that is configured in the Spring XML config. So fetching by type gets more difficult if there is more than one instance in the registry.

    So, the best solution here is to give each DMLC instance in the Spring XML config an id. This is the easiest thing because you can then fetch the bean based on its unique id.

    Another idea is to perhaps use the <a href="http://static.springsource.org/spring/docs/3.0.x/javadoc-api/org/springframework/beans/factory/ListableBeanFactory.html#getBeansOfType(java.lang.Class)> ListableBeanFactory.getBeansOfType (java.lang.Class) method</a> which returns a list of beans with the matching type.

    ReplyDelete
  115. @Prakash, for some reason, Blogger would not display both answers correctly if they were included in the same reply, so here is my answer to your second question:

    2) When the DMLC is configured programmatically, for every listener that you have, you must manually create an instance of the DMLC. For every instance of the DMLC that you create, then you need to use the setMessageListener() method to set the message listener for that DMLC instance.

    Hopefully this provides the information you are seeking.

    ReplyDelete
    Replies
    1. Hi Bruce,

      Thank you very much for the quick reply and providing insight about how DMLC handles listeners. I will be creating three containers wired to different listeners.

      Using jms namespace for wiring DMLC throws the below error message Setup of JMS message listener invoker failed for destination 'dummy_q' - trying to recover. Cause: MQJMS2008: failed to open MQ queue ; nested exception is com.ibm.mq.MQException: MQJE001: Completion Code 2, Reason 2085.

      However when I wire using bean tag its works perfectly. I have provided in the listener the destination values as destination and dummy_q, both are throwing exceptions as above. Not sure what I am missing. Please find the spring configuration below for your reference

      <bean id="taskExecutor"
      class="org.springframework.scheduling.commonj.WorkManagerTaskExecutor">
      <property name="workManagerName" value="java:comp/env/wm/MQManager" />
      <property name="resourceRef" value="true" />
      </bean>

      <bean id="transactionManager"
      class="org.springframework.transaction.jta.WebSphereUowTransactionManager" />

      <jee:jndi-lookup id="jmsConnectionFactory" jndi-name="jms/QueueFactory"></jee:jndi-lookup>
      <jee:jndi-lookup id="jmsDestination" jndi-name="jms/dummy_q"></jee:jndi-lookup>
      <bean id="jmsDestResolver" class="org.springframework.jms.support.destination.DynamicDestinationResolver"/>
      <bean id="jndiDestResolver" class=" org.springframework.jms.support.destination.JndiDestinationResolver"/>

      <bean id="messageListener" class="com.jms.MyMessageListener" />
      <!-- This configuration gives me a exeception of MQJMS2008: failed to open MQ queue -->
      <jms:listener-container
      container-type="default"
      connection-factory="targetConnectionFactory"
      transaction-manager="transactionManager"
      task-executor="taskExecutor"
      acknowledge="auto"
      destination-resolver="jmsDestResolver"
      >
      <jms:listener destination="destination" ref="messageListener" />
      </jms:listener-container>

      <!-- This configuration works.
      <bean id="msgListenerContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="connectionFactory" ref="jmsConnectionFactory" />
      <property name="destination" ref="jmsDestination" />
      <property name="messageListener" ref="messageListener"></property>
      <property name="transactionManager" ref="transactionManager"></property>
      <property name="taskExecutor" ref="taskExecutor"></property>
      </bean>
      -->

      Delete
    2. I just googled around for the these two errors and it seems to be something specific to the WebSphere MQ configuration for the queue. I suggest you do the same and make sure that the queue has been properly defined in the MQ queue manager.

      Delete
  116. Hello Bruce,

    I'm facing a situation where we need two different redeliver policy as we have two kinds of messages. Our intention is we only have one DMLC but for different messages we can use different connectionFactory

    Is there a way that we can dynamically load connectionFactory for the DMLC ?

    Thanks in advance.

    ReplyDelete
    Replies
    1. @Huawei, If you want to use different connection factories for different listeners, then you need to configure different instances of the DMLC. There is no way to dynamic load different connection factories.

      Delete
    2. Understood, thanks very much Bruce.
      Still having one question regarding AMQ redeliver..assuming that we only have one consumer and our redelivery policy will allow the message to be redelivered for a quite long time.

      I've tried a scenario where I sent two messages(different type), one is designed to be redelivered and the other can be consumed normally.

      It seems the normal message will be blocked if it is delivered later than the redelivered one(meaning we sent A earlier than B). It will not be consumed until the redelivered message A has tried many times reaching the maximum redeliver times. That would lead to a situation where a easy-to-consumed message must wait a long time to be consumed..

      I got that AMQ is will maintain the order on a queue. What I want is that the redeliver delay of A should not block the consume of message B even if message A is the first one on the to be consumed queue.. I do not want message A's delay to cause any other message delay either. Even if we define many consumers in client, somehow there is a chance that all the consumers consume same message type which is A(designed to be redelivered several times and according to redeliver policy that will take a quite long time, maybe 1h), that will cause our app lower efficiency.

      How can I handle this situation?

      Many thanks to you ..

      Delete
  117. The guys that work full-time on the ActiveMQ code base would be the best to help you with this because it sounds like a possible bug to me. I suggest that you ask about this situation on the ActiveMQ user mailing list. Information on that list available here:

    http://activemq.apache.org/mailing-lists.html

    ReplyDelete
  118. Bruce, I am badly stuck at executing your examples as it is. I am using activemq broker, and I have a non-spring message producer, which just fires some Text messages to the queue. Now when I start the Spring consumer the one in asynch-->messagelistener , I am getting the following error in maven.

    Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'connectionFactory' defined in class path resource [
    org/bsnyder/spring/jms//META-INF/spring/consumer-jms-context.xml]: Initialization of bean failed; nested exception is java.lang.NoSuchFieldError: NULL

    Any idea what's wrong ?

    ReplyDelete
    Replies
    1. Bruce, my mistake it's a version conflict. My spring-expression was 3.0.3 and spring-jms was 3.1.0. Sorry to bother you, also big thanks for putting things together in this post, pretty useful.

      Delete
    2. Yes, that's typically the problem, the java.lang.NoSuchFieldError: NULL error indicates a version mismatch with one of the Spring JARs.

      Delete
  119. Hi Bruce,
    Can you please find out a way to unsubscribe a durable subscription using Spring DMLC ?

    It's straight forward using JMS session but I did not get any such method in Spring DMLC.

    Thanks,
    Gourab Pattanayak

    ReplyDelete
    Replies
    1. @Gourab, There is no helper method in the DMLC to unsubscribe from a durable subscription. In order to do this, you need to use the SessionAwareMessageListener because it provides access to the raw JMS session so that you can call methods on it directly.

      Delete
  120. i want to use JMS with spring in was6.0 applcaiton server can i use your code ? i am new to jms

    thanks
    raj

    ReplyDelete
    Replies
    1. Yes, this should work just fine. Just make sure to configure the connection pooling in WAS correctly.

      Delete
  121. The defaultmessagelistenercontainer implementation is based on a polling receive loop. If eg three listeners are defined and concurrency is set to e.g. 10, that would mean there would be constantly 30 threads in use. Isn't that very inefficient ? Wouldn't that lead to performance problems ? Wouldn't that impair scalability ? Wouldn't a real async impl. be the only realistic option in a heavy duty production setting ?
    Could you please elaborate on this Bruce ? Am i wrong ?
    If not, what are the options ? The simple mlc ? But it doesn't support a txn mgr or does it ?

    ReplyDelete
    Replies
    1. Defining 30 threads in use shouldn't be inefficient as long as there is an appropriate amount of resources (i.e., cpu, memory, network, etc.). Just simply adding more consumers shouldn't inherently cause scalability issues. I've worked on systems that have thousands of concurrent consumers. Like anything, you need to plan for this type of scale, it won't just happen without some work. As for the support of a transaction manager, yes, the DMLC supports a transaction manager, see the following Javadoc:

      http://static.springsource.org/spring/docs/3.1.x/javadoc-api/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.html#setTransactionManager(org.springframework.transaction.PlatformTransactionManager)

      Delete
  122. hey, I am new to Spring JMS.
    My problem is I will generate one javascript code to client. client to put this javascript code any where in the websites (third party websites). When users view this page the messages will continiously generated and hit to our server. how can I get this messages and handle this. Which approach is best???

    ReplyDelete
    Replies
    1. If I understand your description correctly, you will provide your customer a Javascript to be used in a web page so that when someone visits the web page a message will be created and sent to ActiveMQ.

      Since you're generating messages from within a web page and you already mentioned Javascript, you should use a Javascript client for ActiveMQ. When using a Javascript client for ActiveMQ you need to consider how ActiveMQ will receive those messages. This is achieved by enabling a transport in ActiveMQ and there are two options that will support Javascript clients -- either the Stomp transport or the Websocket transport. For the client side Javascript, take a look at the bottom of the Websocket transport page I linked to above and you will find links to two different Javascript client libraries that will work with ActiveMQ. Both can definitely talk to the Stomp transport and I think they might both be able to speak to the Websocket transport (not sure, you'll need to verify this).

      Delete
  123. Hey Bruce
    great post. I'm a JMS/spring newbie. I'm trying to read N JMS messages in a single transaction using spring. I'm using Atomikos to manage the transaction but I can't find any example in which N messages are read from a queue in a single transaction. It's usually one transaction per message. Is there an out-of-the-box way of doing this? I found an example online using a subclass of DefaultMessageListenerContainer that reads N messages but this doesn't seem to work with Atomikos for some reason. Any idea?
    Thanks!

    ReplyDelete
    Replies
    1. I can't provide an answer to your problem without far more detail about the situation. What I can tell you is about how to consume N messages within a single transaction. You will need to use one or more consumers that are all created from the same transacted session. The consumer(s) need to operate in a synchronous manner so that they're manually calling receive() in the scope of a single transaction. As messages are consumed you'll need to hold them in a batch and once that batch is full (you determine the size) only then do you commit the transaction. Then you can process the batch of messages. Hopefully this will help with your dilemma.

      Delete
  124. Hi - Looks like an extremely useful forum on Spring JMS Listeners. Thank you Bruce for doing this.

    Now for one of my own questions, this is still in the design phase:
    I have a JMS Listener configured on a clustered weblogic implementation (2 managed servers). When a message comes on to the queue, either Listener could start processing it - that is my undestanding. But we want to have only one Listener updating the data at one time. Even if a second message comes on to the queue, we dont want the other listener to start processing until the first Listener is done. If its possible to have the Listener deployed to only one managed server - that is fine too.

    Is there a Listener setting that I can use to achieve this?

    Thank you!

    ReplyDelete
    Replies
    1. What is the real requirement behind this idea that one message be fully processed and ack'd before consuming another message? The reason I ask is that this use case is not a normal one for the use of message-oriented middleware.

      By default, the JMS message listeners operate concurrently whereby any number of them register their interest in messages on a given queue and the ActiveMQ message broker then automatically distributes messages to all registered consumers in a round robin manner. Bear in mind that various qualities of service can and do affect the message distribution to consumers.

      Delete
    2. The data that is uploaded by one user needs to be fully uploaded to the DB. A second upload by another user should be processed only when the first update has been completed. Dirty reads should not be allowed while processing the second upload. Hence the requirement that only one JMS Listener be running at any given time.

      Delete
    3. The only way to achieve this requirement is to only ever use a single consumer. There are a couple of reasons for this:

      1) The way that JMS queues work is rather opposed to this requirement unless you only ever use a single consumer. If there are messages in a JMS queue and there are consumers registered to receive messages from the queue, then the message broker will distribute those messages in a round robin fashion across those consumers (depending on the qualities of service that are set on each consumer).

      2) Given that messages will be handed out independently to any consumer registered to receive them, there is no ability to control the actions of one consumer based on another consumer's state.

      The only way that you might be able to achieve something like this is through the use of distributed transactions which I do not recommend using. The trouble with distributed transactions is that they are just simply impractical. Not only do they introduce a high level of complexity but they also result poor performance and the applications that utilize them wind up with a very brittle design. Instead I suggest that you consider the use of eventual consistency.

      Take a look at the Pat Helland's Life Beyond Distributed Transactions paper which focuses on the limitations of distributed transactions and proposes the use of different concepts to achieve the same end result. This paper has been widely discussed across the internet and is a model for how Amazon.com and many other large scale businesses have crafted their system architectures.

      Delete
  125. Hi Bruce, great post but I'm wondering how to tweak your solution to work against an oracle aq. I've documented my code on stack over flow here: http://stackoverflow.com/questions/15340030/jms-unable-to-consume-messages-from-oracle-queue-using-spring-jms

    I'm not seeing my code consume any messages from the queue. Any hints or tips would be greatly appreciated.

    Thanks

    Christian

    ReplyDelete
    Replies
    1. I have posted an answer to your question on Stack Exchange.

      Delete
  126. Hi Bruce, this article is very informative.
    I am new to JMS. For one of our existing application we are using Websphere and IBM MQ. We are now moving from Websphere to Tomcat. I am just wondering how I'll achieve connection pooling and session pooling using Spring.
    1) As far as message receiving is concerned will DMLC alove suffice my need as you have posted in you previous comment that 'The DMLC provides its own tiered caching strategy for JMS resources. With this caching in the DMLC, there is no need to also use the CCF.' ?

    2) As far as sending messages using JMSTemplate is concerned, will a CachingConnectionFactory will suffice?

    3) Can I use the same CachingConnectionFactory bean wired to JMSTemplate and DMLC ?

    4) My existing IBM MQ pooling settings while creating Queue Connection Factory JNDI are:
    Connection pooling -- Max = 20 -- Min = 1
    Session pooling -- Max = 10 -- Min = 1

    Any help/guidance is much appreciated.

    Thanks.

    ReplyDelete
    Replies
    1. Hi Bruce,
      At the moment I have configured CachingConnectionFactory(with sessionCacheSize set to 20) and wired it to DMLC and JMSTemplate.
      Also for DMLC, I have set 'concurrentConsumers = 5' and 'maxConcurrentConsumers = 20'.

      Do you see any problem with this config?

      Thanks
      Amit

      Delete
    2. Amit, here are some answers to your questions:

      1) Because the DMLC provides it's own session pooling, there is no need to use the CachingConnectionFactory with the DMLC.

      2) Yes, when using the JmsTemplate, you definitely should be using the CachingConnectionFactory. This is because the JmsTemplate opens and closes the connection for every send or receive. Using the CachingConnectionFactory alleviates the overhead of actually opening and closing a new connection/session for every message operation.

      3) No, I do not recommend doing this. Set up the CachingConnectionFactory to be used with the JmsTemplate. Then set up a second connection to be used by the DMLC.

      4) Begin with those settings and perform some testing in your environment with your message load and monitor the performance. Adjust it as necessary and test again.

      You asked a fifth question in the second reply to which I offer the same answer: The only way that you'll know if those settings are good enough is to perform some testing using those settingsin your environment with your message load and monitor the performance. Nothing else will be able to tell you if this is correct for your situation or not.

      Also, for more info about using Tomcat but with ActiveMQ, take a look at some articles I wrote here:

      * ActiveMQ and Apache Tomcat: Perfect Partners
      * Integrating ActiveMQ With Apache Tomcat Using Local JNDI
      * Integrating ActiveMQ With Apache Tomcat using Global JNDI

      This may give you some additional info about one way of using the Spring DMLC with Tomcat. This is certainly by no means the only way.

      Delete
  127. This comment has been removed by the author.

    ReplyDelete
  128. Hi Bruce,
    I am new to MQ.

    Requirement:
    This is a standalone java application. We need to consume messages continuously, do some processing store message in the db and acknowledge manually in the code. speed is a very important factor to be considered. which could be the better approach for doing this?

    ReplyDelete
    Replies
    1. I suggest starting out by using the examples I provided via my Github page as a base for your work. Just simply tweak the example to utilize the CLIENT_ACKNOWLEDGE mode instead of the AUTO_ACKNOWLEDGE mode so that you can manually acknowledge messages. Then you can change anything else that doesn't suit your use case as necessary.

      Delete
    2. Bruce, Thanks for the most quick reply.

      1) If i use CLIENT_ACKNOWLEDGE it will acknowledge all the messages consumed in that session right?
      i am looking for something like EXPLICIT_CLIENT_ACKNOWLEDGE in TIBCO EMS.

      2) Also for speed concern, can i implement asynchronous receiving? will it suit for standalone java application?

      Thanks in advance.

      Delete
    3. 1. I think what you want then is the ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE mode which sends an acknowledgment for every message consumed. This avoids all the messages consumed previously on that session from being acknowledged but it also is a bit slower.

      2. The focus of this blog post is on asynchronous message consumption.

      Delete
    4. Bruce,
      Is there an INDIVIDUAL_ACKNOWLEDGE mode or something similar is available in IBM WebSphere MQ?

      Delete
    5. To my knowledge, there is no such acknowledgement mode like this in WebSphere MQ. Here's a link to the documentation for the ack modes in WMQ:

      WebSphere MQ Version 7.0 Acknowledgement Modes

      Delete
  129. I am getting "Configuration problem: Cannot locate BeanDefinitionDecorator for element [listener-container]" eventhough I have declared jms name space in my context xml as shown below.

    Is there anything that I am missing ?




    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete

  130. <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util" xmlns:context="http://www.springframework.org/schema/context"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/util
    http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/jms
    http://www.springframework.org/schema/jms/spring-jms-3.0.xsd

    ReplyDelete
  131. Never mind I figured it out.

    ReplyDelete
  132. Bruce,

    Is there any console to see the ActiveMQ in action while I run my app following your example ?

    I am embedding the broker inside spring context and then trying to bring the console by launching http://localhost:8161/admin.

    I can see the queue name but couldnt see message enqueued or dequeed, eventhough Java console was successfully processed both messages.





    vm://localhost

    ReplyDelete
    Replies
    1. There are several ways to monitor ActiveMQ:

      * As long as the administration web app is deployed, you should be able to use the web UI as you pointed out. I'm not sure why you're not seeing messages enqueued/dequeued. I always recommend starting with the ActiveMQ examples just to make sure that basic production and consumption are working. Once that is confirmed, then begin to build your own app, making sure to start very simple at first and moving toward more complexity. Don't just add all the complexity right away at the beginning or it makes troubleshooting difficult.

      * Utilize the built in JMX capabilities in ActiveMQ via JConsole. This will give you access to all information that is happening inside of the message broker. One of the most powerful methods of monitoring is to watch the advisory topics so that you can see all internal activity.

      * Still another way to monitor ActiveMQ is by using the logging interceptor or by increasing the log level. These are two different forms of logging that produce different types of information. Although you get a lot of information by increasing the logging level, it's not all intelligible to someone who doesn't know the ActiveMQ internals. So I suggest looking at the logging interceptor first for information relevant to message production and consumption.

      Hopefully one of these suggestions will help you to figure out what's happening with your situation so that you can fix it.

      Delete
  133. Hi Bruce,

    I have a appilication which puts strings in activeMQ, now i implemented a listener similar to the one given by you to receive these string messages.
    Now the problem here is on reading the messgae it is getting ActiveMQObjectMessage type for the message and not as jms TextMessage.
    Is there a way i can get it as TextMessage

    ReplyDelete
    Replies
    1. The example code that made available via my Github repo demonstrates the use of TextMessages. Have you taken a look at that example code? See the link to the my Github repository in the at the bottom of the post labeled Update.

      Delete
  134. Please share the server or environment setup details to achieve the above demonstration.

    ReplyDelete
    Replies
    1. The server side was an ActiveMQ message broker, but I don't recall the specific configuration as I wrote this blog post about 3.5 years ago. I probably tuned the ActiveMQ memory usage but there are no special requirements of the server side to work with the DMLC as a client. Do you have a particular reason you are asking this question?

      Delete
  135. Hi Bruce,

    Thanks for the blog. We are currently stuck in a situation where under load some transaction are picked up twice or thrice by concurrent threads in parallel, resulting in duplicate processing of message. We have some 5 concurrent consumers listening on a websphere message queue. We are sure that no duplicate messages were put in the queue. I had an understanding that the object is locked once it is read from the queue, seems like it is not happening somehow.

    Can you provide some suggestions on the same.

    Many Thanks,
    Shwetank

    ReplyDelete
    Replies
    1. Occurrence is 4-8 in 4000 so less than 1%

      Delete
    2. Are you sure that the messages that are being processed twice are not redelivered messages?

      Delete
    3. Yes Bruce, as when we make the onMessage method synchronized, we do not see this issue. The reason we cannot synchronize is that it causes a performance hit. It is like setting 0 concurrentConsumers. We tried putting the message using multiple ways JMeter etc and the results were same.

      Delete
    4. I would really need to see a working example to understand the situation more fully. I suggest you post a message to the Spring JMS forum to get help from the folks who created Spring.

      Delete
  136. Hi Bruce,
    Is it possible to control the rollback in case i have an error handler which say just logs the input message. What is happening is if i use
    an error handler and an exeption is thrown and caught, i am able to log the message but at the same time the rollback is happening causing the same msg to be processed again and whole cycle is repeated. The transaction handler is Spring's JMS TransactionManager

    ReplyDelete
    Replies
    1. It sounds like you need some rules around the redelivery of messages. This is commonly handled by the message broker itself, not the Spring DMLC. Check your message broker to see if there's a way to define a policy or settings for message redelivery.

      Delete
  137. This comment has been removed by the author.

    ReplyDelete
  138. This comment has been removed by the author.

    ReplyDelete
  139. Hi Bruce,

    We have two queues. Please let me know how a listener can be configured in MDP to listen to two or more queues the was its done in MDB.

    ReplyDelete
    Replies
    1. To make a single bean listen to more than one queue, you will need to define a <jms:listener> element for each queue. Below is an example:

      ...
      <jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
      <jms:listener destination="TEST.BAR" ref="simpleMessageListener" method="onMessage" />
      <jms:listener destination="TEST.BLAH" ref="simpleMessageListener" method="onMessage" />
      ...

      Note that for each element above, the reference to the same listener bean is used but each element refers to a different queue name.

      Delete
  140. This comment has been removed by the author.

    ReplyDelete
  141. This comment has been removed by the author.

    ReplyDelete
  142. This comment has been removed by the author.

    ReplyDelete
  143. I am trying to get the adapter example to work with IBM MQ 7 and getting the beow exception ONLY when I use message type of "object". For some reason, processMessage is not able to consume Payload put on the queue. Any help appreciated. Thanks.

    17:25:05,029 WARN listener.DefaultMessageListenerContainer: 694 - Execution of JMS message listener failed, and no ErrorHandler has been set.
    org.springframework.jms.listener.adapter.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with arguments {
    JMSMessage class: jms_object
    JMSType: null
    JMSDeliveryMode: 2
    JMSExpiration: 0
    JMSPriority: 4
    JMSMessageID: ID:414d5120445353444556202020202020858b775229300620
    JMSTimestamp: 1383603904862
    JMSCorrelationID: null
    JMSDestination: queue://DSSDEV/RQA.CALYPSO.OUT
    JMSReplyTo: null
    JMSRedelivered: false
    JMSXAppID: WebSphere MQ Client for Java
    JMSXDeliveryCount: 1
    JMSXUserID: sghorp
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 273
    JMS_IBM_Format:
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 28
    JMS_IBM_PutDate: 20131104
    JMS_IBM_PutTime: 22250493
    messageCount: 39
    class com.ab.springmq.Payload}; nested exception is java.lang.NoSuchMethodException: com.ab.springmq.MessageDelegateImpl.processMessage(com.ibm.jms.JMSObjectMessage)
    at org.springframework.jms.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:476)
    at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:355)
    at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:330)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:535)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:495)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:467)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1058)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1050)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:947)
    at java.lang.Thread.run(Thread.java:662)
    Caused by: java.lang.NoSuchMethodException: com.ab.springmq.MessageDelegateImpl.processMessage(com.ibm.jms.JMSObjectMessage)
    at java.lang.Class.getMethod(Class.java:1605)
    at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:178)
    at org.springframework.jms.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:462)
    ... 11 more

    ReplyDelete
    Replies
    1. The error states the nature of the problem:

      nested exception is java.lang.NoSuchMethodException: com.ab.springmq.MessageDelegateImpl.processMessage(com.ibm.jms.JMSObjectMessage)

      There is an attempt to call a method that does not exist in the com.ab.springmq.MessageDelegateImpl class.

      Delete
  144. Hi Bruce,

    Am using jboss-4.0.5.GA and I have an MDB configured for a queue with message selector. Everything goes fine, messages are consumed and processed, until a day or two then the MDB stops consuming messages while there are still messages in the queue. I did take a look in logs for any exception, but it was clean. I browsed through the thread dump, I could'nt see anything suspicious. If the jboss is restarted the messages are consumed and are back to normal.

    But am not sure, what is the triggering point which causes the MDB not the consume the message. Appreciate any help!!

    Thanks,
    Julia

    ReplyDelete
    Replies
    1. Clearly some troubleshooting needs to take place to determine what the issue is here. I recommend that you post a message to the ActiveMQ user mailing list available here:

      http://activemq.apache.org/mailing-lists.html

      There are many folks who work on the ActiveMQ project who will see your message and be able to help you troubleshoot the situation.

      Delete
  145. This comment has been removed by the author.

    ReplyDelete
  146. Hi Bruce,

    I am having issue with JMS Listeners. Below are the details:
    Details:

    ABC is an application which have JMS listeners and consume messages from XYZ Queues (Different Application on different weblogic server). We have 32 listeners listening to 3 XYZ Queue nodes in PROD. These 32 listeners are evenly distributed for 3 Queues.

    Let’s assume the listeners are assigned in this order.

    Example:
    XYZ node 1: 10 listeners
    XYZ node 2: 11 listeners
    XYZ node 3: 11 listeners

    Issue:

    When one of the XYZ node goes down, all the listeners for that will be reallocated to other 2 nodes.

    Example:
    XYZ node 1: 0 listeners (Not active)
    XYZ node 2: 16 listeners (Active)
    XYZ node 3: 16 listeners (Active)

    Now when the XYZ node comes up, the listeners are not re-assigned. They are still with the other 2 nodes.

    Example:
    XYZ node 1: 0 listeners (Active)
    XYZ node 2: 16 listeners (Active)
    XYZ node 3: 16 listeners (Active)

    Configuration:




    ${INITIAL_CONTEXT_FACTORY}
    ${PROVIDER_URL}
    ${SECURITY_PRINCIPAL}
    ${SECURITY_CREDENTIALS}

























    ${CONCURRENT_CONSUMERS}



    ${CLIENT_ACKNOWLEDGE}



    Can you please let me know how can I configure to distribute the listeners all the time especially after XYZ Q's recover from fail over.

    Thanks,
    VaNa

    ReplyDelete
    Replies
    1. In order to post the XML configuration, you must manually replace all '<' and '>' characters with '&lt;' and '&gt;' respectively.

      You need to force the clients to the broker to rebalance and the only way to do this that I remember is to enforce a deterministic lifetime on connections. This only works if you are using the PooledConnectionFactory object for the pool and can be achieved via the expiryTimeout property on a pool of connections like this:

      <bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" <property name="brokerURL” value="tcp://localhost:61616" /> <property name="expiryTimeout” value="10000" /> <property name="connectionFactory"> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" /> </property> </bean>

      See the Javadoc for the PooledConnectionFactory object for more info.

      Delete
    2. Hi Bruce,
      Thanks alot for your response and I am pasting my modified configuration file details:

      Configuration:

      <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
      <property name="environment">
      <props>
      <prop key="java.naming.factory.initial">${INITIAL_CONTEXT_FACTORY}</prop>
      <prop key="java.naming.provider.url">${PROVIDER_URL}</prop>
      <prop key="java.naming.security.principal">${SECURITY_PRINCIPAL}</prop>
      <prop key="java.naming.security.credentials">${SECURITY_CREDENTIALS}</prop>
      </props>
      </property>
      </bean>

      <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
      <property name="jndiTemplate" ref="jndiTemplate"/>
      <property name="jndiName" value="${CONTEXT_FACTORY_NAME}"/>
      </bean>

      <bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">
      <property name="jndiTemplate" ref="jndiTemplate"/>
      <property name="jndiName" value="${XYZ_QUEUE}"/>
      </bean>

      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="connectionFactory"/>
      <property name="defaultDestination" ref="destination"/>
      </bean>

      <bean id="messageReceiver" class="com.jms.MessageReceiver">
      <property name="eventProcessor" ref="eventProcessor"/>
      </bean>

      <bean id="jmsErrorHandler" class="com.JMSErrorHandler"/>

      <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="concurrentConsumers"><value>${CONCURRENT_CONSUMERS}</value></property>
      <property name="connectionFactory" ref="connectionFactory"/>
      <property name="destination" ref="destination" />
      <property name="messageListener" ref="messageReceiver" />

      <property name="sessionAcknowledgeMode"><value>${CLIENT_ACKNOWLEDGE}</value></property>
      <property name="recoveryInterval" value="5000"/>
      <property name="errorHandler" ref="jmsErrorHandler"/>
      </bean>

      I used only Spring JMS over here and I saw from your configuration which uses ActiveMQ. There is no message broker used between ABC and XYZ application due to business requirement (To implement Unit of Order). I can change my configuration to use ActiveMQ as you suggested but I have few questions:

      1) If expiryTimeout is set, will the connections will be refreshed and load the properties?
      2) Will there be any significant performance issue.

      Note: Sorry for my questions as I am new this.

      Thanks,
      VaNa

      Delete
  147. This comment has been removed by the author.

    ReplyDelete
  148. Following your example:
    public class MyMessageListener implements MessageListener

    Is it possible for a listener to stop consuming messages?

    What I want is to send a signal to the consumer and have it start listening and stop listening to messages.

    Thanks for your blog

    ReplyDelete
    Replies
    1. There is no Spring-specific manner to do this, so you will probably need to write your own consumer so as to start and stop the subscription whenever you like.

      Delete
  149. Great article and thanks for the code.

    ReplyDelete
  150. Hi, Bruce.. thanx for this great article.

    Do you know some bug related to AMQ session closing after an inactivity period (default: 30 seconds) even when the client set keepAlive=true in PooledConnectionFactory brokerUrl configuration?

    It´s happening to me using AMQ 5.7.0 client connector and server(no custom configuration).

    ReplyDelete
    Replies
    1. wow... found this issue https://issues.apache.org/jira/browse/AMQ-4366

      =/

      Delete
  151. This comment has been removed by the author.

    ReplyDelete
  152. Hi Bruce, I am implementing MDP + JMSTemplate to send and receive the message. The message send mechanism is working fine, however the MDP is not getting invoked. I tried testing the via plain receiver, and was able to retrieve the message, but not via MDP. What could be the problem?

    Here's the Spring Config. The Java class to send and received are pretty much standard ones

    <bean id="cookieRequestListener" class="com.text.jms.mq.mdp.RequestQueueMDP">
    <property name="logger" ref="mqLogger" />
    <property name="scoringEngine" ref="scoringEngine" />
    <property name="mqSender" ref="jmsMQSender" />
    </bean>

    <bean id="CookieRequestContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    <property name="destination" ref="jmsRequestQueue" />
    <property name="messageListener" ref="cookieRequestListener" />
    </bean>

    <bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
    <ref bean="jndiTemplate" />
    </property>
    <property name="jndiName">
    <value>java:/jms/queueCF</value>
    </property>
    </bean>

    <!-- A cached connection to wrap the ActiveMQ connection -->
    <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
    <property name="sessionCacheSize" value="10" />
    </bean>


    <!-- jms Request Queue Configuration -->
    <bean id="jmsRequestQueue" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
    <ref bean="jndiTemplate" />
    </property>
    <property name="jndiName">
    <value>java:/jms/cookieReqQ</value>
    </property>
    </bean>

    <!-- jms Response Queue Configuration -->
    <bean id="jmsResponseQueue" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
    <ref bean="jndiTemplate" />
    </property>
    <property name="jndiName">
    <value>java:/jms/cookieResQ</value>
    </property>
    </bean>

    <bean id="jmsJMSTemplate" class="org.springframework.jms.core.JmsTemplate" >
    <property name="connectionFactory" ref="cachedConnectionFactory" />
    </bean>

    <!-- jms MQ Utility -->
    <bean id="jmsMQSender" class="com.text.jms.util.MQSender">
    <property name="jmsTemplate">
    <ref bean="jmsJMSTemplate"></ref>
    </property>
    <property name="defaultDestination">
    <ref bean="jmsRequestQueue" />
    </property>
    <property name="logger" ref="mqLogger" />
    </bean>

    ReplyDelete
    Replies
    1. I see that the request queue and the response queue are using the same name for the jndiTemplate and the jndiName. I'm guessing that this is why it's not working. Clean up the XML so that you use unique names for each object and see if that fixes the problem.

      Delete
  153. Hi Bruce, Request & Response Queue name and JNDI mappings are different. Yes, the JNDITemplate is same, but will that create problem? I mean typically all the objects are bind to same JNDI context, isn't it? Moreover, I tested the same setup by writing simple JSP with CreateReceiver and calling receive(). Using this I'm able to get the same message, that was put in queue.

    ReplyDelete
    Replies
    1. My mistake, those are both inside of a bean element -- I missed that. I suggest that you ask questions about your problem on the Spring forums at Stackoverflow (http://spring.io/questions). You will get a lot more eyes on your problem and probably find a solution faster.

      Delete
    2. Hey Bruce, one question and doubt. Do We need to explicitly define/add any listener to JBoss Server via admin Console for DMLC so that it gets binded to QCF & queue? Something similar to what we do with MDB in Websphere? I am getting this doubt because when I start the server, I don't see any active listener (on JBoss Console) listening for incoming messages on MDP.

      Delete
    3. Unfortunately I have not used the JBoss app server in several years, so I cannot provide an answer to your question. I suggest that you ask your question on the Spring Framework forums here: http://spring.io/questions

      Delete
  154. Hi Bruce,
    We are using ActiveMQ 5.10.0, camel 2.13.1, spring 3.2.8 to send and receive messages to/from Oracle OSB. We use spring JndiTemplate and CachingConnectionFactory to connect with OSB. Uner load everything works fine. However after a day or so we are seeing some threads are consuming 100% of the CPU core.
    The thread that is taking more CPU is as below:
    "Camel (camel-1) thread #91 - JmsConsumer[UL_T2_OSB_MS201JMSSERVER@XXXRsQueue]" id=169 idx=0x2bc tid=2799 prio=5 alive, native_blocked, daemon
    at __lll_lock_wait+36(:0)@0x3d4940d654
    at tsCheckTransitToJava+276(execution.c:105)@0x2adbe09da5f5
    at java/lang/System.currentTimeMillis()J(Native Method)
    at weblogic/jms/client/JMSSession.receiveMessage(JMSSession.java:760)[optimized]
    at weblogic/jms/client/JMSConsumer.receiveInternal(JMSConsumer.java:647)[inlined]
    at weblogic/jms/client/JMSConsumer.receive(JMSConsumer.java:526)[inlined]
    at weblogic/jms/client/WLConsumerImpl.receive(WLConsumerImpl.java:184)[optimized]
    at org/springframework/jms/listener/AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:430)[inlined]
    at org/springframework/jms/listener/AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:310)[optimized]
    at org/springframework/jms/listener/AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)[inlined]
    at org/springframework/jms/listener/DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1101)[inlined]
    at org/springframework/jms/listener/DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1093)[optimized]
    at org/springframework/jms/listener/DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:990)[optimized]
    at java/util/concurrent/ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[optimized]
    at java/util/concurrent/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java/lang/Thread.run(Thread.java:662)
    at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method)
    -- end of trace
    We are seeing some exception like this in the ActiveMQ:
    2014-07-25 12:48:03,454 | INFO | Successfully refreshed JMS Connection | org.apache.camel.component.jms.DefaultJmsMessageListenerContainer | Camel (camel-1) thread #188 - JmsConsumer[TL_D2_OSB_MS01JMSSERVER@XXXDataFeedRsQueue]
    2014-07-25 12:48:03,462 | WARN | Encountered a JMSException - resetting the underlying JMS Connection | org.springframework.jms.connection.CachingConnectionFactory | ExecuteThread: '2' for queue: 'default'
    weblogic.jms.common.LostServerException: java.lang.Exception: weblogic.rjvm.PeerGoneException: ; nested exception is:
    java.io.EOFException
    at weblogic.jms.client.JMSConnection.dispatcherPeerGone(JMSConnection.java:1436)[weblogic.jar:10.3.2.0]
    at weblogic.messaging.dispatcher.DispatcherWrapperState.run(DispatcherWrapperState.java:692)[weblogic.jar:10.3.2.0]
    at weblogic.messaging.dispatcher.DispatcherWrapperState.timerExpired(DispatcherWrapperState.java:617)[weblogic.jar:10.3.2.0]
    at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:273)[wlfullclient-10.3.jar:10.3.3.0]
    at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)[weblogic.jar:10.3.3.0]
    at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:145)[weblogic.jar:10.3.3.0]
    at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:117)[weblogic.jar:10.3.3.0]

    Any direction to resolve this will be appreciated.

    ReplyDelete
    Replies
    1. Based on the exceptions, it looks like Camel reset the JMS connection:

      INFO | Successfully refreshed JMS Connection | org.apache.camel.component.jms.DefaultJmsMessageListenerContainer

      And then Spring's CachingConnectionFactory is throwing a warning that the underlying connection was reset:

      WARN | Encountered a JMSException - resetting the underlying JMS Connection | org.springframework.jms.connection.CachingConnectionFactory

      And then it seems that Weblogic throws an exception because the connection was reset:

      weblogic.jms.common.LostServerException: java.lang.Exception: weblogic.rjvm.PeerGoneException: ; nested exception is:
      java.io.EOFException

      I'm not quite sure of a solution for this problem simply because I'm not familiar with the use of ActiveMQ with Weblogic. I suggest that you ask on the Camel or ActiveMQ mailing list to get more eyes on your problem. Information on these mailing lists is available at the following URLs:

      http://activemq.apache.org/mailing-lists.html
      http://camel.apache.org/mailing-lists.html

      Delete