04 February 2010

Using Spring to Send JMS Messages



Recently I stumbled upon a number of places in the some docs and mailing lists where claims are made that the Spring JmsTemplate is full of anti-patterns, is horribly inefficient and shouldn't be used. Well I'm here to debunk these erroneous claims by pointing out a class in the Spring Framework that was overlooked entirely.

The Spring JmsTemplate is a convenience class for sending and receiving JMS messages in a synchronous manner. The JmsTemplate was originally designed to be used with a J2EE container where the container provides the necessary pooling of the JMS resources (i.e., connections, consumers and producers). Such requirements came from the EJB spec. But when developers began using the JmsTemplate outside of J2EE containers, and because some JMS providers do not offer caching/pooling of JMS resources, a different solution was necessary. Enter the Spring CachingConnectionFactory.

The CachingConnectionFactory is meant to wrap a JMS provider's connection to provide caching of sessions, connections and producers as well as automatic connection recovery. By default, it uses a single session to create many connections and this model works very well with most MOMs. But if you need to scale further, you can also specify the number of sessions to cache using the sessionCacheSize property.

Below is a snippet from a Spring app context that demonstrates the configuration for the CachingConnectionFactory


...
<!-- A connection to ActiveMQ -->
<bean id="amqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL='tcp://localhost:61616" />

<!-- A cached connection to wrap the ActiveMQ connection -->
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="amqConnectionFactory"
p:sessionCacheSize="10" />

<!-- A destination in ActiveMQ -->
<bean id="destination"
class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="FOO.TEST" />
</bean>

<!-- A JmsTemplate instance that uses the cached connection and destination -->
<bean id="producerTemplate"
class="org.springframework.jms.core.JmsTemplate"
p:connectionFactory-ref="cachedConnectionFactory"
p:defaultDestination-ref="destination" />
...


As you can see, the configuration for the CachingConnectionFactory along with the JmsTemplate is quite simple. Furthermore, these two classes are also both in the org.springframework.jms package path so they're both included in the spring-jms jar file making their use even easier.

The only thing left to do is utilize the jmsTemplate bean in your Java code to actually send a message. This is shown below:


public class SimpleMessageProducer {

private static final Logger LOG = Logger.getLogger(SimpleMessageProducer.class);

@Autowired
protected JmsTemplate jmsTemplate;

protected int numberOfMessages = 100;

public void sendMessages() throws JMSException {
StringBuilder payload = null;

for (int i = 0; i < numberOfMessages; ++i) {

payload = new StringBuilder();
payload.append("Message [").append(i).append("] sent at: ").append(new Date());

jmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage(payload.toString());
message.setIntProperty("messageCount", i);
LOG.info("Sending message number [" + i + "]");
return message;
}
});
}
}
}


The SimpleMessageProducer class above demonstrates the use of Spring autowiring to resolve the relationship between the jmsTemplate property and the producerTemplate in the app context further above. Then an anonymous MessageCreator instance is used to actually create a message for the jmsTemplate to send.

The JmsTemplate and the CachingConnectionFactory are both very widely used in businesses of all sizes throughout the world. Coupled with one of the Spring message listener containers, they provide an ideal solution.

I'll elaborate on message consumption using the Spring DefaultMessageListenerContainer and the SimpleMessageListenerContainer in a future blog post.

132 comments:

  1. Woooh. this is new to me. Let me explore this technology.. Great post. Learned something new. :D Im hoping that i will understand the query immediately.

    ReplyDelete
  2. Hi Bruce,

    Nice blog. Thanx. Can you comment on the warning that Spring doc throws at us for using this class?

    NOTE: This ConnectionFactory requires explicit closing of all Sessions obtained from its shared Connection. This is the usual recommendation for native JMS access code anyway. However, with this ConnectionFactory, its use is mandatory in order to actually allow for Session reuse.

    ReplyDelete
  3. @truecube - This recommendation from the Spring CachingConnectionFactory is nothing new. JMS resources should be closed when you're done with them, but it's doubly important when you are caching those resources. This is due to the fact that there may be settings on those objects that may not work across all use cases in a given application. For example, if you are caching consumers, the keys for caching those consumers contains (among other things) the JMS selector for that consumer. The problem here is with the reuse of those consumers. JMS selectors are very specific and most definitely not usable across all situations. The best solution for this situation is to use a separate instance of the CachingConnectionFactory for those consumers with JMS selectors.

    Hope that helps,

    Bruce

    ReplyDelete
  4. Hi Bruce,

    Even in a JEE container, the app server is only mandated to cache JMS connections and sessions, not JMS consumers.

    So using the JMSTemplate to do synchronous receives, even in an app server is still an anti-pattern since it will cause the creation of a consumer each time, which will probably involve a network round trip ==> slow.

    Worse still, if the synchronous receives() are on a topic, then the user may see "gaps" in the stream of messages, since a non durable consumer on a topic will miss any messages sent to the topic when it's not in existence.

    When I critice the JMSTemplate I make it clear that it can only be safely used from within an app server and only then for *sending* messages. I don't there is anything untrue about that, so nothing to be debunked :)

    ReplyDelete
  5. How do you close the session when using JMSTemplate and CachingConnectionFactory. JMSTemplate methods like convertAndSend() seem to get the session from the CachingConnectionFactory within the implementation of the method. So in the java class that uses convertAndSend(), how to close the session?

    thanks

    ReplyDelete
  6. @Tim, Thank you for your comments. I have rarely seen anyone use the Spring JmsTemplate for receiving messages. I only recommend using the JmsTemplate for sending messages because of the potential for indefinite blocking with the receive() method. The recommended way to receive JMS messages is asynchronously using the Spring message listener containers (http://bit.ly/dr8Opy).

    ReplyDelete
  7. @Raj, The JmsTemplate handles the closing of the session for you. This takes place in the finally clause of the execute method. See line #472 in the JmsTemplate where the JmsUtils.closeSession() method is called (http://bit.ly/aJLyez).

    If you need to use methods on the underlying JMS session object, (e.g., creating temporary destinations during the send operation, etc.) then you should implement your own SessionCallback to do so.

    ReplyDelete
  8. Thanks Bruce. Your reply helps clarify one aspect. I'm seeing the following in our process when JMSTemplate and CachingConnectionFactory is used :

    2010-05-19 13:27:58,964 [InactivityMonitor WriteCheck] DEBUG (InactivityMonitor.java:99) org.apache.activemq.transport.InactivityMonitor - 9999 ms elapsed since last write check.

    I wished to eliminate the prospect of open connections. Have you ever seen the above message?

    thanks again :-).

    ReplyDelete
  9. @Raj, That log statement is simply a debug message from the thread that executes the write check to report when it last took place. It is only informative for seeing how often the write check is occurring.

    ReplyDelete
  10. @Bruce.

    Thanks for your reply :)

    We actually see people using the JMSTemplate for *receiving* messages quite a lot at JBoss. That's the source of the frustration for us.

    I think we all agree that using JMSTemplate for receiving messages is fundamentally broken, and there are better ways to do this in Spring.

    Considering that, perhaps the receive methods should be deprecated with a view to removing them completely?

    It would certainly save us from more pain ;)

    ReplyDelete
  11. @Tim, There is a place for synchronous message consumption, the problem is that some messaging newbies tend to abuse it. They use it once and it works, so they don't bother to explore asynchronous message consumption (which, IMO, is where the real power of messaging resides). So I don't believe that those methods should be deprecated. Also, I do not work on the Spring Framework team, so it's not my decision.

    FWIW, I always recommend asynchronous consumption to folks who are new to messaging. I only work with synchronous consumption when there is a specific use case for it, which I have found is definitely rarer.

    ReplyDelete
  12. Hi Bruce, thanks for your posts to my earlier questions. The challenge I face now is that when the Spring Context is closed (destroy), the connection to AMQ bus is still open. This prevents a clean closing of the context. The only way to stop the application and stop the connection to AMQ bus is by terminating the application manually. Here is the hanging thread:

    Name: ActiveMQ Transport: tcp://pubd-amq1-path2.path2.dev.ibsys.com/10.102.110.210:61616
    State: RUNNABLE
    Total blocked: 1 Total waited: 0

    Stack trace:
    java.net.SocketInputStream.socketRead0(Native Method)
    java.net.SocketInputStream.read(SocketInputStream.java:129)
    org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
    org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
    java.io.DataInputStream.readInt(DataInputStream.java:370)
    org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
    org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
    org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
    org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
    java.lang.Thread.run(Thread.java:637)

    Have you seen this before? I'm using CachingConnectionPool with JMSTemplate to enqueue msgs

    ReplyDelete
  13. @Raj, I have seen this before and there is an option on the TCP transport to address this situation. Add the daemon=true option to the transport and the transport thread in ActiveMQ should terminate cleanly. Below is an example of a broker URI with the transport option:

    192.168.1.29:61616?daemon=true

    Bruce

    ReplyDelete
  14. Bruce, thank you very much. My initial tests work fine when I set the daemon option.

    ReplyDelete
  15. Bruce,

    I am trying to use Spring JMS to publish to multiple queues. The queue destination is decided at the time of publishing message onto a queue. Is there anyway that can be achieved?

    Thanks,
    Raj S

    ReplyDelete
  16. @Raj, If you don't know the destination until right before the message is sent, you can inject the destination programmatically instead of via an XML configuration. Is this what you are referring to?

    Bruce

    ReplyDelete
    Replies
    1. Hi Bruce, How do I inject the destination programatically? Also is there a way to change the subscribe to destination programatically?

      Delete
  17. Hi, Bruce,
    Thank you for useful article.
    I need to send several messages within one transaction to ensure that all messages are really delivered. In case of errors no one message should appear in the queue.
    As I see JmsTemplate doesn't provide such ability by default.
    Could you please suggest the best way to achieve my goal.
    As for now I am thinking about extending JmsTemplate and its send method.
    Thank you

    ReplyDelete
  18. @pfen, I haven't ever seen a use case like this before! Maybe you could extend the JmsTemplate to create a CachingJmsTemplate whereby the CachingJmsTemplate allows you to specify how to cache messages (e.g., a certain number of messages, based on a period of time, etc.) and once that cache attribute has been fulfilled, then the messages are sent to the broker in a single batch using a transaction. That way all the messages are sent together and that operation is treated as a single unit of work.

    ReplyDelete
  19. I have found the JMSTemplate useful in my use case for receiving messages. I have a distributed application that comprises a single message producer and multiple consumers. Each message can be handled by one and only one consumer. Each consumer has a certain number of configured threads in a thread pool. The consumer can consume a message only if it has a free thread in the pool to handle that message. The consumers also use a blocking receive call so that they are always connected to the queue.

    I don't think the JmsTemplate.receive() should be deprecated. If you think there is a better solution, I would love to hear about it.

    ReplyDelete
  20. Hi
    I am new to spring and I have a simple question
    I am trying to write a simple main program that will send and recieve messages (using activemq and Spring)
    but how can I tell the spring to load my config file ?
    I tried using "ClassPathXmlApplicationContext" but got nosuchmethod exception
    any idea for how can i load my conf file???
    Many thanks
    Ofir

    ReplyDelete
  21. @ofir, Are you trying to load the activemq.xml file via Spring? You must be embedding ActiveMQ in your application instead of starting it in a stand alone manner, correct?

    If you want to load the activemq.xml file into the Spring config for your application, the BrokerFactoryBean makes this quite easy. Below is a quick example:

    <bean id="broker"
    class="org.apache.activemq.xbean.BrokerFactoryBean">
    <property name="config" value="file:path/to/activemq.xml/>
    <property name="start" value="true" />
    </bean>

    I hope that addresses your question. If not, please clarify with more detail.

    Bruce

    ReplyDelete
  22. This guide has been massively useful, thanks for that! If I'm sending a message in this way, is there a setting I can flick on that will enable local persistence of messages, to guarantee delivery if the endpoint isn't up at the time? And also if the local (sending) machine is rebooted or whatever, for those messages to come back up and start trying to send on restart?

    Thanks for your help either way!

    Rob

    ReplyDelete
  23. Hi, Bruce.

    Thanks for this blog. The explanation is simple and yet informative.

    I have a question with jmsTemplate.send method. If the messages are sent asynchronously to an MQ server, is there a way to capture the MQ return/response code (e.g. MQ Return Code = 1 for successful, MQ return code if there's an exception, etc)? I am looking for a way to capture this code for asynchronous messaging if possible.

    Thanks,
    Rudy

    ReplyDelete
  24. @Robert, The concept of persistence on the client side is not addressed by the JMS spec, so there is no consistent answer across message brokers. This is left as an implementation detail for the message broker implementation. With ActiveMQ, I recommend that people send to a local broker so that it will persist the message quickly and send it along to another broker when there is consumer demand for the message. Oftentimes this is an ideal use of an embedded ActiveMQ broker.

    Bruce

    ReplyDelete
  25. @Rudy, The JmsTemplate#send method is a void method which boils down to an invocation of the MessageProducer#send method which is also a void method. So, per the JMS spec, there is not a return value for the send method. The way to know if a send failed is if a JMSException is thrown. This is not to say that some message broker implementations do not offer their own send method with a return value, i.e., vendor-specific classes.

    Bruce

    ReplyDelete
  26. Burce,

    First and foremost - excellent post. Great to know that there is a way to debunk this issue with Spring and jms anti-pattern. Thanks for that.

    Now I do have one question on how to truely 'wire' the CacheConnectionFactory properly into our application. Our application is structured in such a way that we have 2-3 teirs (depending on the operation) before the calls are made to the database. Our requirement is to send data passed from the upper layers on as messages to a queue. However, the 'application context' is not available at this level.

    So if we would like to use the CacheConnectionFactory that is configured in the main application context, how would we do that ? Would it be ok to set a static reference to the application context at a higher tier and then access that reference at the lower tier ? Would that be safe to do without causing threading issues ? Sorry I'm not that familiar with Spring to understand the negative implication of doing that. I know that's not a desireable approach (we lock the application to spring and do introduce a dependency). But we can't think of any other way to make the Spring JMS objects available at the lower tiers other that hand wire the jms objects using code.

    Any advise would be appreciateed.

    ReplyDelete
  27. @groove, Thank you for the complement.

    As for the questions you are asking, is it not possible to expose the Spring application context via utility class using a getter method? Is this a WAR or an EAR or what? It's difficult to understand the situation without some additional context.

    Bruce

    ReplyDelete
  28. Hi Bruce,

    Sorry for the delayed response. But I was trying to see if I could find a suitable solution my self.

    The application is a web app that deployed
    as a 'war' and runs on Tomcat. It's uses Spring MVC to process web requests. The basic issue is that the 'View' calls lower level components that then perform database updates. It's not a good design, but that's what we inherited and we're trying to make the best of it.

    Now, we are looking to implement a solution similar to this one :

    http://forum.springsource.org/archive/index.php/t-31316.html

    However, my concern is whether this solution is thread safe if we use it to retrieve the cache connection factory from 'AppContextProxy' ?

    Thanks for your help, Bruce.

    ReplyDelete
  29. @groove, As the post in that forum suggests, you can expose the Spring application context via a method call, but it will need to contain hooks into the Spring lifecycle so that the instance can be populated at the right time. But I'm not sure why you're asking about thread safety relative to simply retrieving the CachingConnectionFactory? FWIW, there should not be any inherent thread safety concerns with regard to using the CachingConnectionFactory either.

    Bruce

    ReplyDelete
  30. Thanks Bruce. I was just making sure that there were no inherent risks with using this approach. Appreciate all your help.

    ReplyDelete
  31. Can we use CachingConnectionFactory inside a j2ee container?
    My web app publishes messages to websphere Default jms provider bus (SI). Can i use Spring's cachingconnectionfactory?

    I am also getting strange errors, using caching with Springs' DMLC from a standard app. The error is related executing a commit on a connecttion that is closed.
    Appreciate your valuable comments.
    Thanks
    sri

    ReplyDelete
  32. Hi Bruce,

    Thanks for this and it works very well. However, we encountered some issue when dealing with failover.

    We have 2 jms servers and configured to have HA and they are clustered. Only one server can received topic from 1 topic producer.
    Let's say A and B, and by default A will be the master and B is on standby)

    The producer is using jmstemplace and cachingconnectionfactory to send message to JBoss JMS (the hornetQ).

    Now, when the A died, B will be notified and will assume as the master. But the JMSTemplate is still pointing to A.





    org.jnp.interfaces.NamingContextFactory
    jbossA:1100,jbossB:1100
    org.jboss.naming


















    Any idea why?

    Thanks!

    /Vins

    ReplyDelete
  33. Sorry... here's with xml tag

    Thanks,
    /Vins

    <bean id="jndiTemplateSender" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
    <props>
    <prop key="java.naming.factory.initial">org.jnp.interfaces.NamingContextFactory</prop>
    <prop key="java.naming.provider.url">jbossA:1100,jbossB:1100</prop>
    <prop key="java.naming.factory.url.pkgs">org.jboss.naming</prop>
    </props>
    </property>
    </bean>

    <!-- jboss messaging connection factory -->
    <bean name="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate">
    <ref bean="jndiTemplateSender" />
    </property>
    <property name="jndiName" value="ConnectionFactory" />
    </bean>

    <bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
    <property name="sessionCacheSize" value="10"/>
    <property name="cacheProducers" value="false"/>
    </bean>

    ReplyDelete
  34. @Vins, I don't know much about HornetQ, but it sounds to me like the connection has not been re-established (i.e., failed over) from brokerA to brokerB. And I don't see any configuration in the XML you provided for that purpose. Although the Spring CachingConnectionFactory provides for reconnect on exception, I think that you need to utilize the HornetQ client failover/reconnection features. Check out the HornetQ docs for configuring reconnection here:

    HornetQ 2.1 User Manual: Configuring reconnection/reattachment attributes

    Hope that helps!

    ReplyDelete
  35. Bruce,

    Could you please share if you have any examples on how to user Sonic MQ with Spring

    @GG

    ReplyDelete
  36. @gowda, To my knowledge, use of Spring JMS with Sonic MQ is no different with the exception that the JNDI configuration has some additional properties that must be set on a JndiTemplate, e.g.:

    <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
    <props>
    <prop key="java.naming.factory.initial">com.sonicsw.jndi.mfcontext.MFContextFactory</prop>
    <prop key="java.naming.provider.url">tcp://hostname:2506</prop>
    <prop key="java.naming.security.principal">username</prop>
    <prop key="java.naming.security.credentials">password</prop>
    <!-- Properties for Sonic MQ -->
    <prop key="com.sonicsw.jndi.mfcontext.domain">MyDomain</prop>
    <prop key="com.sonicsw.jndi.mfcontext.idleTimeout">12000</prop>
    </props>
    </property>
    </bean>

    <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate" ref="jndiTemplate" />
    <property name="jndiName" value="MyConnectionFactory" />
    </bean>

    Hope that helps!

    ReplyDelete
  37. Bruce,
    Using the CachingConnectionFactory, is there a way to configure how long the shared connection remain avail? If I pause even 5 minutes and try to reuse, the CCF closes the cached session (reason code 2009). I can call resetConnection() and reuse but would think the connections would stay open longer. I also seem to lose a message on the first resend after calling resetConnection(). Any clues? Thanks!

    ReplyDelete
  38. @Justin, the CachingConnectionFactory extends the SingleConnectionFactory which is just a wrapper around a connection factory implementation. The CachingConnectionFactory contains no lifecycle management for the wrapped connection.

    However, the DefaultMessageListenerContainer's superclass (the AbstractJmsListeningContainer) does provide some lifecycle management for the connection via the refreshSharedConnection() method. But there is no timeout behavior of the underlying connection, so any kind of timeout that you're experiencing is probably coming directly from the configuration of the underlying connection.

    ReplyDelete
  39. @Bruce
    Thanks! We're using JbossMessaging instead that has full support for clustering, HA and failover. CachingConnectionFactory is not doing reconnect by the Jboss is handling the connection and and failover. Similar to HornetQ that link that you sent.

    ReplyDelete
  40. @Vins, I'm glad it worked out. Just so you know, Apache ActiveMQ also provides support for high availability, clustering and failover with or without the Spring CachingConnectionFactory.

    ReplyDelete
  41. Hello Bruce! Have you got any example about JMS Topic?

    Regards

    ReplyDelete
  42. @ERA, Yes, I do have full examples for using Spring JMS with ActiveMQ. I'm actually polishing them up and I will post them on GitHub very soon. I will try to get this done very soon and post a new blog entry about it when I do.

    ReplyDelete
  43. Excellent! I'll wait for them! I built a service with JMS to send messages to a queue. In this case, one provider sends messages to one consumer, it's ok? I want to have one provider that sends messages to N consumer. In this case, I need use JMS Topic, it's ok? I have not found examples to build a client spring to do that.

    Regards

    ReplyDelete
  44. Hi Bruce, this is good article for me , who want to start using Spring JMSTemplate. One quick question however: Since Spring JMSTemplate integrated with cachingConnectionFactory does pooling of resources. Does CachingConnectionFactory provide any means of eagerly refereshing connection. i.e. we have reconnectOnException , but in our case a network device chops all connections that are idle for 2 minutes. So I need a facility that re-establishes a connection before exception occurs. (I am assuming that any exception in Spring JMS world will only occur when we are sending or receiving messages .. please correct me if I am worong).

    ReplyDelete
  45. @Khurram, The CachingConnectionFactory itself does not refresh connections. But if you are fetching the JMS connection via JNDI using the Spring JndiObjectFactoryBean, then you need to use the proper configuration. Below are a couple of examples of a JNDI lookup configuration:

    Spring 2.5.x:

    <beans:bean id="jmsConnectionFactory"
    class="org.springframework.jndi.JndiObjectFactoryBean">
    <beans:property name="jndiTemplate" ref="jndiTemplate"/>
    <beans:property name="jndiName" value="${jms.jndi.connection.factory.name}"/>
    <beans:property name="lookupOnStartup" value="false" />
    <beans:property name="cache" value="false" />
    <beans:property name="proxyInterface" value="javax.jms.ConnectionFactory" />
    </beans:bean>

    Spring 3.0.x

    <jee:jndi-lookup id="jmsConnectionFactory"
    jndi-name="${jms.connection.factory.name}"
    cache="false"
    lookup-on-startup="false"
    proxy-interface="javax.jms.ConnectionFactory">
    <jee:environment>
    java.naming.factory.initial=${broker.context.class}
    java.naming.provider.url=${broker.url}
    </jee:environment>
    </jee:jndi-lookup>


    Note that both of these examples use the following settings:

    cache=false - Do not cache the connection. Instead, perform a new lookup every time it is used. This causes more overhead but sometimes it is necessary when the message broker is going up and down.

    lookupOnStartup=false - Do not perform the JNDI lookup upon startup of the MLC. Instead, perform the lookup upon first use of of the object. This causes a bit more over for the first use of the object.

    proxyInterface=javax.jms.ConnectionFactory - This tells the JndiObjectFactoryBean how to cast the object after lookup and is important for re-establishing a connection.

    I'm not sure if you are fetching the connection from JNDI, but if you are, hopefully these examples will help you.

    ReplyDelete
  46. Hi Bruce,

    We expect a lot of smaller to medium sized messages to our broker. The load can indeed become high. We do have options for horizontal scaling, but on a single node do you think disabling producer flow control is a good idea?
    AMQ in action recommends doing that to prevent chocking of TCP transports. We use only persistent messaging to queues. Your thoughts???

    ReplyDelete
  47. @kcorg, Disabling producer flow control is a highly debated topic. I suggest experimentation with it enabled as well as disabled to find the optimal setting for your environment. There are some interesting options for the producer flow control settings that are outlined here:

    http://activemq.apache.org/producer-flow-control.html

    Utilizing a destination policy with both a memoryLimit and producer flow control definitely wields some power. But also note that using producer flow control in conjunction with message cursors is another very viable strategy.

    ReplyDelete
  48. This is really good post about JMS Spring.
    Good to start & know this thing. :)

    ReplyDelete
  49. Hi Bruce

    When using a JMSTemplate with an ActiveMQ connection factory configured for fail-over (as shown below) - is there anyway for the code to discover which one of the broker URLs was actually used?

    <amq:connectionFactory id="queueConnectionFactory" brokerURL="failover:(
    failover:(tcp://m3001.uat.something.net:61616,tcp://m3002.uat.something.net:61716)?randomize=false,
    failover:(tcp://m3002.uat.something.net:61616,tcp://m3003.uat.something.net:61716)?randomize=false,
    failover:(tcp://m3003.uat.something.net:61616,tcp://m3001.uat.something.net:61716)?randomize=false
    )?randomize=true"/>

    thanks, asankha

    ReplyDelete
  50. @Asankha, When I need to know to which broker a JMS client is connected, I use the following code snippet from within the JMS client to grab the broker URL:

    String brokerURL = ((ActiveMQConnectionFactory) connection).getBrokerURL();

    ReplyDelete
  51. Bruce, is it really possible to cast a connection to a connection factory?

    I would have thought that the connection factory can only return the broker url as defined during creation time (so possibly including multiple master and slave urls as part of a compound url as in the example of Asankha).

    Looking at the JavaDoc I think the actual url might be available when casting the connection to an ActiveMQConnection and retrieving the BrokerInfo object which seems to store information about the broker node the actual connection is using.

    I must admit I did not try it out and was just reading through the JavaDoc as I found this question interesting.

    ReplyDelete
  52. @Eric, Whoops, you are correct, my mistake. I just quickly typed out that code snippet from memory and fucked it up. The real snippet should look like this:

    String brokerURL = ((ActiveMQConnection) connection).getBrokerURL();

    Thanks for catching my mistake.

    ReplyDelete
  53. Thanks Bruce! That works :)

    ((ActiveMQSession) session).getConnection().getBrokerInfo().getBrokerURL()

    ReplyDelete
  54. I am trying to get this code work with TIBCO EMS. How will the Spring app context config look like specifically the destination config below? I have the destination Queue name but where does it fit below?

    ReplyDelete
  55. Oops the code snippet doesn't show up for some reason but it the bean definition for the destination in your app context file.

    ReplyDelete
  56. @Rahul, with Tibco EMS it appears that you are required to perform JNDI lookups of objects. I found an example of doing this with Spring JMS here:

    http://allenpaulsblog.blogspot.com/2010/10/integrating-spring-jms-with-tibco-ems.html

    Regarding the code snippet, you cannot include < or > characters in the comments on Blogger, you must substitute them with the appropriate HTML/XML character entity for them to appear correctly.

    ReplyDelete
  57. Can someone show me how to do batch message sending with JmsTransactionManager or other ways?
    Thank you.

    ReplyDelete
  58. @liny, In order to achieve batch JMS sends within a single transaction, you will need to handle this programmatically instead of declaratively. There is a topic on programmatic transactions in the Spring Framework documentation. In encourage you to read through that documentation. I believe that wherever you see the example method called whose name is updateOperation1(), this is where you would put a call to your method to handle the batch JMS sends. But I would need to build an example to verify this.

    If you have any further questions on this topic, please post your questions in the Spring JMS forum here:

    http://forum.springsource.org/forumdisplay.php?30-JMS

    This is much more appropriate place for your questions to be best addressed by may Spring experts.

    ReplyDelete
  59. Hi Bruce ,

    Have been following your blogs for a while now and think can ask a few questions of mine .If a exception listener is autowired into the Cached Connection Factory then the onException method of the exception listener is called(provided the listener Container has the Cached Connection Factory targeted) whenever a Connection is lost (due to JMS server down or network causes).However when using jmsTemplate.receiveSelected() if a connection is lost how do we configure an exception listener so that it can keep retrying for the connection (the way the DMLC does ).Please let me know if am clear in explaining the scenario .

    ReplyDelete
  60. @Rittik, There is no facility for configuring an exception listener on the Spring JmsTemplate. The best alternative here is to use the javax.jms.Connection.setExceptionListener() method to set an exception listener on the underlying JMS connection. Then you need to handle such exceptions in your custom exception listener.

    ReplyDelete
  61. This comment has been removed by the author.

    ReplyDelete
  62. Thanks Bruce ,

    Any pointers as to how to fetch the underlying javax.jms.Connection object .Have my jmsTemplate configured to a CachedConnectionFactory .(Have the custom exception listener with the logic present with me).


    Thanks again

    ReplyDelete
  63. @Rittik, My mistake, you did mention that you are using the Spring CachingConnectionFactory. In that case, see the CachingConnectionFactory.setExceptionListener() method. This will allow you to set your custom exception listener on any connections that are created.

    ReplyDelete
  64. Bruce , have already tried that approach .The thing is with the jmsTemplate configured to the SpringCachingConnectionFactory(which in turn is wired with the custom exception Listener) somehow it doesnt retry(it just throws a JMSException ) , however if we use the default listener container (targeted to the same CachingConnectionFactory)then the exception Listener gets invoked appropiately .Please let me know if am clear .Thanks again .

    ReplyDelete
  65. @Rittik, The DMLC has automatic retry built into it, but the JmsTemplate does not have automatic retry. If you want your sends and/or receives with the JmsTemplate to automatically retry, you will need to create that in your own class.

    ReplyDelete
  66. Thanks Bruce ..created that .

    ReplyDelete
  67. Bruce,
    have you ever seen the following error:
    weblogic.jms.frontend.FEConnectionFactoryImpl cannot be cast to weblogic.rmi.internal.StubInfoIntf

    I am using spring jms template to connect to a weblogic distributed queue (clustered - two nodes). I use jmstemplate to put messages on the queue, and then use a listener to receive messages from queue. below is a bit of info from my messaging context.

    ....



    weblogic.jndi.WLInitialContextFactory
    @JNDI_PROVIDER_URL@

































    we seem to get the error when our app gets redeployed (but only on 1 server). it seems as if the apps need to be started up in a certain order (slave 1st, then master). after a redeploy of the app, the queue will work on one, but not the other server. we then need to restart the master to get messages to work and the error to go away.

    any info you could shed would be greatly appreciated.

    regards,
    Jim

    ReplyDelete
    Replies
    1. sorry...here is my code.

      <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
      <property name="environment">
      <props>
      <prop key="java.naming.factory.initial">weblogic.jndi.WLInitialContextFactory</prop>
      <prop key="java.naming.provider.url">@JNDI_PROVIDER_URL@</prop>
      </props>
      </property>
      </bean>

      <bean id="jmsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
      <property name="jndiTemplate" ref="jndiTemplate"/>
      <property name="jndiName" value="@JNDI_QUEUE_FACTORY@"/>
      </bean>

      <bean id="queueName" class="org.springframework.jndi.JndiObjectFactoryBean">
      <property name="jndiTemplate" ref="jndiTemplate"/>
      <property name="jndiName" value="@JNDI_QUEUE_NAME@"/>
      </bean>

      <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory">
      <bean class="org.springframework.jms.connection.SingleConnectionFactory">
      <property name="targetConnectionFactory">
      <ref bean="jmsQueueConnectionFactory"/>
      </property>
      </bean>
      </property>
      <property name="defaultDestination" ref="queueName"/>
      </bean>

      <bean id="pupsMsgContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="connectionFactory" ref="jmsQueueConnectionFactory"></property>
      <property name="destination" ref="queueName"></property>
      <property name="messageListener">
      <bean class="foo.bar.JmsListener" />
      </property>
      </bean>

      thanks.
      Jim

      Delete
  68. @Jim, I have not seen that error specifically, but what you describe sounds like it could related to the Spring beans being created before the Weblogic services (on which they depend) are created. I'm not sure if this is possible in your environment, but you could create a separate bean to poke the Weblogic services until they are alive and making other beans depend on that bean using the Spring depends-on attribute. Just a thought.

    ReplyDelete
    Replies
    1. Bruce,
      thank you very much for your reply. and you are absolutely correct. when I refer to my jndi url:

      <prop key="java.naming.provider.url">@JNDI_PROVIDER_URL@</prop>

      I need to refer to both nodes in the cluster. what was happening is essentially both my apps deployed to both servers were pointing to only one of the servers in the cluster, but because the server being referenced was not avail (due to the order is which they were getting started), all attempts to put messages on the queue would fail ( with that really useless weblogic error ). what I needed to do was something like the following:

      <prop key="java.naming.provider.url">t3://dev-weblogic1:7001,dev-weblogic2:7002</prop>

      one of our application integrators figured it out.

      could I ask one quick follow up question. I am using the org.springframework.jms.connection.SingleConnectionFactory versus the org.springframework.jms.connection.CachingConnectionFactory that you mention in your article above. is there any explicit (or performance based) reason to use the cachingconnection? I typically use my queue to send off 100 or so emails at a time. but this would only occur a dozen or so times a day.

      thanks again for your help.

      cheers,
      Jim

      Delete
  69. @Jim, the SingleConnectionFactory only provisions a single connection/session to the message-oriented middleware and was meant mainly for testing purposes. The CachingConnectionFactory is an extension of the SingleConnectionFactory that provisions a single connection but multiple sessions and caches them (i.e., does not close them). This results in lower overhead for resources and less time spent creating and destroying the same objects over and over.

    ReplyDelete
  70. Hello Bruce -

    This is a great article! Thanks for sharing.

    I have a couple of questions.

    1) I am sending messages to ActiveMQ asynchronously using "jms.useAsyncSend=true" option. In case activeMQ goes down or an exception occurs, is there a way to capture and log the error with MQ callback asynchronously? BTW, I am using Spring's JMSTemplate (convertAndSend method).


    2) When I try to use the failover along with async, in the event the broker goes down my requests are hung.

    failover:(tcp://localhost:61616?jms.useAsyncSend=true)

    Is there any way to fix it?

    Regards,
    AVP

    ReplyDelete
    Replies
    1. @AVP, Below are answers to your questions:

      1) You should look into creating your own custom ExceptionListener that is registered with the Connection via the Connection.setExceptionListner method.

      2) Instead of the URI that you pasted above, try the following URI:

      failover:(tcp://localhost:61616)?jms.useAsyncSend=true

      That slight change in the URI should do the trick.

      Delete
    2. Looks like Blogger didn't post the following comment for some reason:

      AVP has left a new comment on your post "Using Spring to Send JMS Messages":

      Thanks Bruce for your earlier comment.

      I tried the failover URL. It did not work. The requests are still hung when the broker is down.

      I tried these two:
      failover:(tcp://localhost:61616)?jms.useAsyncSend=true

      failover:(tcp://localhost:61616)?jms.useAsyncSend=true&wireFormat.maxInactivityDuration=0

      When I use it without prepending failover, it works. For example:
      tcp://localhost:61616?jms.useAsyncSend=true

      Any idea why it would fail when I add "failover"?

      Thanks,
      AVP

      Delete
    3. @AVP, You must follow this syntax exactly:

      failover:(tcp://localhost:61616)?jms.useAsyncSend=true

      Note the use of parenthesis and the option is outside of them.

      Bruce

      Delete
  71. Hi Bruce,

    Have a question. I tried running your producer code in spring. Everything works fine with tcp as transport connector. When I use stomp as transport connector and try to run the producer client by using the protocol stomp://localhost:61616, the producer code just hangs. I believe it hangs while creating the session. Any ideas/thoughts on it?

    ReplyDelete
  72. I debugged it from the eclipse . I found that it hangs in this line of code
    jmsTemplate.send(new MessageCreator().

    Any suggestion/pointers would be really helpful.

    ReplyDelete
    Replies
    1. @Lakshman, I'm not seeing a problem using the Stomp transport instead of the TCP transport. I configured the Stomp transport in the conf/activemq.xml file like so:

      <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
      </transportConnectors>

      Then I changed the the producer-jms-context.xml file to use the Stomp transport like so:

      <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
      p:brokerURL="stomp://localhost:61613" />

      Then I sent 100 messages successfully.

      Delete
  73. a quick look on JMS programming using Topic and Queue in JMS Provider (ACTIVEMQ)

    ReplyDelete
  74. Hi Bruce,The following exception was received from an activeMq broker usage:

    org.springframework.jms.UncategorizedJmsException : Peer (vm://xxx-broker) disposed

    This was a random exception that occured. After restarting server this exception was gone. Till the time we restarted, further execution of business logic was stopped.

    I want to know why this exception occured in the first place? Secondly, why does this vanish upon server restart?

    I am using activemq - 5.4.2 jar.

    How can I handle this without restarting server?

    ReplyDelete
    Replies
    1. @prejula p p, I honestly cannot even begin to speculate as to why this exception occurred because I have very little context to your situation. I'm not even sure what the underlying exception is because only the Spring wrapper exception was noted here. I suggest you email the ActiveMQ user mailing list with more information on your situation. Information on the mailing lists is available here:

      http://activemq.apache.org/mailing-lists.html

      Delete
  75. Hi Bruce ,

    In one of our environments we are getting the below exception "org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is weblogic.jms.common.JMSException: Only one thread may use a JMS Session at a time.:commit failure; nested exception is weblogic.jms.common.JMSException: Only one thread may use a JMS Session at a time.:commit failure"

    Am using a cached connection faction with a session cache size of 10 .Ours is a custom implementation of JmsTemplate .This is happening when it is trying to manually acknowledge the message .

    ReplyDelete
    Replies
    1. @Rittik, I have no idea what version of WebLogic you're using, but I just did a quick Google search and found this:

      http://docs.oracle.com/cd/E13222_01/wls/docs81/notes/resolved_sp02.html#1792843

      Perhaps it has something to do with your issue? Of course, this is just a wild guess and quite possibly not applicable to the version of WebLogic you're using. But there is very little information here about the error and the environment.

      Delete
  76. hi bruce,

    how to reconnect while using a jmstemplate to send messages, m using a cachingconnectionfactory with setreconnect(true), but if the MQ(IBM) Qmanager goes down and than comes up ,the jmstemplate is unable to reconnect.

    can u please help me with this?

    ReplyDelete
    Replies
    1. Sunit, I assume you're doing a JNDI lookup of the JMS connection from WebSphere MQ, so you must make sure to configure it properly for reconnection. Below are two examples of a proper JNDI lookup config:

      Spring 2.5.x:

      <beans:bean id="jmsConnectionFactory"
      class="org.springframework.jndi.JndiObjectFactoryBean">
      <beans:property name="jndiTemplate" ref="jndiTemplate"/>
      <beans:property name="jndiName" value="${jms.jndi.connection.factory.name}"/>
      <beans:property name="lookupOnStartup" value="false" />
      <beans:property name="cache" value="false" />
      <beans:property name="proxyInterface" value="javax.jms.ConnectionFactory" />
      </beans:bean>

      Spring 3.0.x:

      <jee:jndi-lookup id="jmsConnectionFactory"
      jndi-name="${jms.connection.factory.name}"
      cache="false"
      lookup-on-startup="false"
      proxy-interface="javax.jms.ConnectionFactory">
      <jee:environment>
      java.naming.factory.initial=${broker.context.class}
      java.naming.provider.url=${broker.url}
      </jee:environment>
      </jee:jndi-lookup>

      Note that both of these examples use the following properties:

      cache=false - Do not cache the connection. Instead, perform a new lookup every time it is used. This causes more overhead but sometimes it is necessary when the message broker is going up and down.

      lookupOnStartup=false - Do not perform the JNDI lookup upon startup of the MLC. Instead, perform the lookup upon first use of of the object. This causes a bit more over for the first use of the object.

      proxyInterface=javax.jms.ConnectionFactory - This tells the JndiObjectFactoryBean how to cast the object after lookup and is important for re-establishing a connection.

      If this information does not address your problem, then I will need more information.

      Delete
  77. This comment has been removed by the author.

    ReplyDelete
  78. Hi Bruce

    Thanks for the details in your blog.
    I want the spring to retry JNDI lookup when MQ server is not available.
    I have used the lookupOnStartup and cache properties.
    Please see the config














    Throws the below exception on startup.

    Exception in thread "ServiceMonitor" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'topicObserverContainer' defined in class path resource [common/applicationContext-jms.xml]: Initialization of bean failed; nested exception is org.springframework.beans.ConversionNotSupportedException: Failed to convert property value of type '$Proxy6 implementing javax.jms.ConnectionFactory,org.springframework.aop.SpringProxy,org.springframework.aop.framework.Advised' to required type 'javax.jms.Destination' for property 'destination'; nested exception is java.lang.IllegalStateException: Cannot convert value of type [$Proxy6 implementing javax.jms.ConnectionFactory,org.springframework.aop.SpringProxy,org.springframework.aop.framework.Advised] to required type [javax.jms.Destination] for property 'destination': no matching editors or conversion strategy found
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:527)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:456)
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:291)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:288)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:194)
    at org.springframework.context.support.DefaultLifecycleProcessor.getLifecycleBeans(DefaultLifecycleProcessor.java:273)
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:125)
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:108)
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:908)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:428)
    at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:139)
    at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:83)


    Kindly help.

    Thanks
    l

    ReplyDelete
    Replies
    1. Unfortunately I cannot see your config because Blogger does not allow the posting of XML without manually converting the > symbols to &gt; and the < to &lt;.

      But given your question, I believe that this may be the configuration you need:

      <jee:jndi-lookup id="jmsConnectionFactory"
      jndi-name="${jms.connection.factory.name}"
      cache="false"
      lookup-on-startup="false"
      proxy-interface="javax.jms.ConnectionFactory">
      <jee:environment>
      java.naming.factory.initial=${broker.context.class}
      java.naming.provider.url=${broker.url}
      </jee:environment>
      </jee:jndi-lookup>


      Note the use of the following properties:

      cache=false - This tells Spring not to cache the connection. Instead, perform a new lookup every time it is used. This causes more overhead but sometimes it is necessary when the message broker is going up and down.

      lookupOnStartup=false - This tells Spring not to perform the JNDI lookup upon startup of the DMLC. Instead, perform the lookup upon first use of the object. This causes a bit more overhead for the first use of the object.

      proxyInterface=javax.jms.ConnectionFactory - This tells the Spring JndiObjectFactoryBean how to cast the object after lookup and is important for re-establishing a connection.

      Hopefully this helps your situation.

      Delete
  79. Hi Bruce

    Many thanks for your response.

    Please see the configuration:

    <bean id="defaultTopicDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate" ref="jndiTemplate" />
    <property name="jndiName" value="${jms.topic.name}" />
    <property name="lookupOnStartup" value="false" />
    <property name="cache" value="false" />
    <property name="proxyInterface" value="javax.jms.ConnectionFactory" />
    </bean>

    <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
    <props>
    <prop key="java.naming.factory.initial">${jms.naming.connection.factory}</prop>
    <prop key="java.naming.provider.url">${jms.provider.url}</prop>
    <prop key="java.naming.security.principal">${jms.username}</prop>
    <prop key="java.naming.security.credentials">${jms.password}</prop>
    <!-- MQ implementation specific properties -->
    <prop key="com.sonicsw.jndi.mfcontext.domain">${jms.domain}</prop>
    <prop key="com.sonicsw.jndi.mfcontext.idleTimeout">${jms.idle.timeout}</prop>
    </props>
    </property>
    </bean>

    As I have mentioned, I am getting the below exception:
    Exception in thread "ServiceMonitor" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'topicObserverContainer' defined in class path resource [common/applicationContext-jms.xml]: Initialization of bean failed; nested exception is org.springframework.beans.ConversionNotSupportedException: Failed to convert property value of type '$Proxy5 implementing javax.jms.ConnectionFactory,org.springframework.aop.SpringProxy,org.springframework.aop.framework.Advised' to required type 'javax.jms.Destination' for property 'destination'; nested exception is java.lang.IllegalStateException: Cannot convert value of type [$Proxy5 implementing javax.jms.ConnectionFactory,org.springframework.aop.SpringProxy,org.springframework.aop.framework.Advised] to required type [javax.jms.Destination] for property 'destination': no matching editors or conversion strategy found
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:527)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:456)
    at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:291)


    Can you please tell me how should I resolve the
    "Caused by: org.springframework.beans.ConversionNotSupportedException: Failed to convert property value of type '$Proxy5 implementing javax.jms.ConnectionFactory,org.springframework.aop.SpringProxy,org.springframework.aop.framework.Advised' to required type 'javax.jms.Destination' for property 'destination'; nested exception is java.lang.IllegalStateException: Cannot convert value of type [$Proxy5 implementing javax.jms.ConnectionFactory,org.springframework.aop.SpringProxy,org.springframework.aop.framework.Advised] to required type [javax.jms.Destination] for property 'destination': no matching editors or conversion strategy found".

    Can you explain how to cast the object after lookup?
    Appreciate your help.

    Thanks
    l

    ReplyDelete
    Replies
    1. In looking at your configuration, I see that you have defined a JMS Destination that is being looked up from JNDI, but the type for the Destination in the proxyInterface property is set as javax.jms.ConnectionFactory instead of javax.jms.Destination. Try changing that and see what happens.

      Delete
    2. Hi Bruce

      Many thanks for your reply.

      Tried your suggestion and the ConversionNotSupportedException is resolved.
      However with the new properties in configuration like lookupOnStartup, cache set to false, on the log the below Warn is continuously occurring.
      [topicObserverContainer-3] WARN o.s.j.l.DefaultMessageListenerContainer - Setup of JMS message listener invoker failed for destination ''- trying to recover. Cause: Unknown destination type - $Proxy5
      javax.jms.InvalidDestinationException: Unknown destination type - $Proxy5

      And the destination it is reporting in the Warn message is a topic.
      I have included the pubSubDomain property in "org.springframework.jms.listener.DefaultMessageListenerContainer" configuration and set it as true. Still it did not resolve the issue.

      configuration:
      <bean id="topicObserverContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="pubSubDomain" value="true" />
      <property name="connectionFactory" ref="cachingTopicConnectionFactory" />
      <property name="destination" ref="defaultTopicDestination" />
      <property name="messageListener" ref="jmsMessageListener" />
      </bean>

      <bean id="defaultTopicDestination" class="org.springframework.jndi.JndiObjectFactoryBean">
      <property name="jndiTemplate" ref="jndiTemplate" />
      <property name="jndiName" value="${jms.topic.name}" />
      <property name="lookupOnStartup" value="false" />
      <property name="cache" value="false" />
      <property name="proxyInterface" value="javax.jms.Destination" />
      </bean>


      Thanks in advance.

      l

      Delete
    3. It appears that the lookup of the destination upon startup of your Spring app is failing for some reason. What happens if you remove the following property?:

      <property name="proxyInterface" value="javax.jms.Destination" />

      Delete
  80. Hi Bruce.
    Thank you for such a nice blog. In case of Tibco EMS, will daemon=true close the JMS connections after the spring context is destroyed?

    ReplyDelete
    Replies
    1. Yes, that is how to get threads to shut down properly.

      Delete
  81. Hi Bruce

    Thanks for your reply.
    I assumed the destination it tries to lookup is a topic which not available at the time Spring does the lookup. But I dont know why setting pubSubDomain to true does not work.

    I cannot remove proxyInterface, results in the following exception.
    Exception in thread "ServiceMonitor" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'defaultTopicDestination' defined in class path resource [common/applicationContext-jms.xml]: Invocation of init method failed; nested exception is java.lang.IllegalStateException: Cannot deactivate 'lookupOnStartup' without specifying a 'proxyInterface' or 'expectedType'

    Any thoughts will be appreciated.

    Thanks
    l

    ReplyDelete
    Replies
    1. Well that was what I thought initially, but your config had lookupOnStartup=false. So that kinda stumped me a bit.

      Yeah, you won't be able to remove the proxyInterface property.

      I suggest posting a question on the Spring JMS forum to get more eyes on the problem.

      Delete
  82. Hi Bruce

    Thanks for your reply. I have posted the query on Spring JMS forum as you suggested.

    Thanks
    l

    ReplyDelete
  83. Hi Bruce,

    We have a requirement of synchronous messaging where message send on one WMQ queue and excepting response on another WMQ queue. In both cases our application acts as client only. Now we have two option

    Option-1: Use DMLC and configure concurrentconsumer etc parameters and handle the request and response message using wait and notify on same co-relation id on request and response queue messages

    Option-2: We use receiveSelected of jmstemplate and wait for some time to pick the response from response queue

    We tested both these options and working, but we need to know these two options from performance perspective. May you please throw some light on these 2 options based on your expertize?

    Thanks
    //Tomer

    ReplyDelete
  84. The best advice I can provide is twofold:

    1) Pay attention to the following documentation from the Javadoc of the JmsTemplate.receiveSelected() method:

    'This method should be used carefully, since it will block the thread until the message becomes available or until the timeout value is exceeded.'

    This blocking call can really, really slow things down so beware of it.

    2) Test these two options, and not just for functionality. You must test using real world data, real world load in a real world environment. There is no substitute for this type of exercise. It requires an investment in time and resources but it will pay dividends toward building a robust application that is ready to handle a live load from a real user base.

    ReplyDelete
  85. Hi, I am using spring jmstemplate with cachingConnectionFactory and ActiveMQConnectionFactory. I am been seeing that after some time activemq stop processing messages from queue. And when it does that any new producer who tries to send message jmsTemplate will hang and never return on send() method and causes an application to hang. anything I can do so that if it cannot send message, it come out of that method.

    ReplyDelete
    Replies
    1. It sounds like a problem with the connection exhaustion or something similar, but I can't be sure without diving deeper. I suggest that you ask your question on the ActiveMQ user mailing list/discussion forum where some ActiveMQ experts can put more eyes on your issue. Information on the mailing lists and discussion forums can be found here:

      http://activemq.apache.org/mailing-lists.html
      OR
      http://activemq.apache.org/discussion-forums.html

      Delete
  86. Hi, I am using jmsTemplate but in the mean time I need message ordering. For this reason, I use grouping mechanism of JMS (JMSXGrouping). Although everything must work perfect, after a while my message order is breaking. I think we should not use connectionfactory used for consumers in jmsTemplate. Can you give any hints about why ordering is not working?

    ReplyDelete
    Replies
    1. In general, it is true that you should use a different connection pool for producing messages than you use for consuming messages. The difference is mainly the configuration for one versus the other.

      Regarding why the ordering is not working using ActiveMQ, this could be due to the consumer explicitly being closed or the consumer dying for some reason (e.g., network loss, etc.). I'm not sure what the problem could be without going through some troubleshooting and for this I suggest that you post a message to the ActiveMQ user mailing list. Information on subscribing can be found here:

      http://activemq.apache.org/mailing-lists.html

      Delete
  87. Thank you for this solution:

    In looking at your configuration, I see that you have defined a JMS Destination that is being looked up from JNDI, but the type for the Destination in the proxyInterface property is set as javax.jms.ConnectionFactory instead of javax.jms.Destination. Try changing that and see what happens.

    I have straggle last two days for resolving the destination issues.

    ReplyDelete
  88. I am trying to post messages from my local tomcat to remote Topic using WS
    My activemq.xml










    My spring bean configuration





    true















    My application code

    @Autowired
    @Qualifier(value="stompMsgProducerTemplate")
    JmsTemplate lqmStompJMSSender;


    lqmStompJMSSender.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    TextMessage message = session.createTextMessage(triggerMessage.toString());
    return message;
    }
    });

    Getting exception

    createTransport() method must be implemented!

    ReplyDelete
  89. If I have multiple JMSTemplates, should I create a CachingConnectionFactory for each one?

    ReplyDelete
    Replies
    1. The CachingConnectionFactory manages a pool of JMS connections so there is no need to create more than one pool. The only exception to this is if you need different connection pools with different configurations or connections to different servers.

      Delete
  90. Currently I am using synchronous send and receive mechanism provided by the class javax.jms.MessageProducer and javax.jms.MessageConsumer to publish and consume messages from ActiveMQ. They are created over connections created using org.apache.activemq.jms.pool.PooledConnectionFactory. The requirement is to create persistent connections to broker over ssl protocol through load balancer. However while testing, it was that there is idleTimeout which default to 30 seconds. Hence after 30 seconds, the connection is removed from the pool, which forces to recreate a new connection doing ssl handshake steps taking about 100+ ms for sending a message over ssl. The loadbalancer is currently terminates the connections every 2.5 hours.

    Hence my question is how we can create a persistent connection to this loadbalancer using activemq or spring JMS API's.

    ReplyDelete
    Replies
    1. There could be a way to do this using the failover protocol, but since I've been away from ActiveMQ for some time now, it would be best for you to ask on the ActiveMQ users mailing list. Information on the mailing lists is available here:

      http://activemq.apache.org/mailing-lists.html

      Delete
  91. Hi Bruce,

    Thanks for the nice blog.
    I am trying to understand the behavior of cachingConnectionFactory. Does it cache connections if so is there a way for us to define max connections to cache.

    we are observing a behavior where a new connection is being created for every message we send to queue(IBM MQ) and the connection gets inactive after a while. I was under the impression that cachingConnectionFactory will create only one connection and reuse that.

    will you able to give some pointers on this?

    Thanks
    Ravi

    ReplyDelete
    Replies
    1. Are you wrapping the target connection factory in the CachingConnectionFactory? Are you also setting a sessionCacheSize? Also, what is the cacheLevel that you have set?

      It is normal to utilize the calls to open/close a connection for every message send as this is mitigated through the use of the CachingConnectionFactory where it gets/puts connections from a pool rather than actually creating/destroying a connection for every message send.

      Delete
    2. Yes I am.Below is my configuration.












      We are seeing a lot of connections from our app in the MQserver of which most of them are inactive..

      Also CachingConnectionFactory extends SingleConnectionFactory which always uses singleConnection so how CachingConnectionFactory is managing pool of connections...

      Delete
  92. jee:jndi-lookup id="mqfactory" jndi-name="jndiName"
    cache="true" lookup-on-startup="false" proxy-interface="javax.jms.QueueConnectionFactory"

    bean id="tpCredentialsConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter"
    <property name="targetConnectionFactory" ref="mqfactory"
    <property name="username" value="${username}"
    <property name="password" value="${password}"
    bean

    bean id="jmsConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory"
    property name="targetConnectionFactory" ref="tpCredentialsConnectionFactory"
    bean

    ReplyDelete
    Replies
    1. The CachingConnectionFactory will create many sessions from a single connection. This is how the JMS spec was designed to work and most implementation scale very well using this model. So make sure to set the sessionCacheSize appropriately for your environment. You will need to experiment to determine what size works best.

      Additionally, don't use a connection pool for both receiving messages. The Spring DMLC caches connections, sessions and consumers so there's no need need to use a connection pool underneath it.

      Delete
  93. Hi,
    Can I use the same thing to log Jboss Session Timeout Event ?

    ReplyDelete
    Replies
    1. I'm not understanding your question, could you please explain with more detail?

      Delete
  94. Hi Bruce,
    I am using WebSphere 7.0. Whenever I receive a message I am getting the following error.

    An illegal attempt to commit a one phase capable resource with existing two phase capable resources has occurred.
    [5/27/15 16:38:19:224 PDT] 00000045 RegisteredRes E WTRN0086I: XAException encountered during prepare phase for transaction InternalEAR#InternalWeb.war 0000014D97BDA564000000010000021DCC1C1FA8241A61441F563966DC20AD8908A567A40000014D97BDA564000000010000021DCC1C1FA8241A61441F563966DC20AD8908A567A400000001. Local resources follow.
    [5/27/15 16:38:19:224 PDT] 00000045 JTAResourceBa I WTRN0089I: LocalTransactionWrapper@:38233823 localTransaction:com.ibm.ws.rsadapter.spi.WSRdbSpiLocalTransactionImpl@4ca74ca7 enlisted:trueHas Tran Rolled Back = false registeredForSynctruemcWrapper.hashcode()539631658: Vote: none.
    [5/27/15 16:38:19:224 PDT] 00000045 JTAResourceBa I WTRN0089I: XATransactionWrapper@ c700c70 XAResource: com.ibm.ejs.jms.JMSManagedSession$JMSXAResource@c870c87 enlisted: trueHas Tran Rolled Back = false mcWrapper.hashCode()1430279488: Vote: commit.
    [5/27/15 16:38:19:347 PDT] 00000045 DefaultMessag W org.springframework.jms.listener.DefaultMessageListenerContainer handleListenerSetupFailure Setup of JMS message listener invoker failed for destination 'queue://I1U/OUTBD.REQUESTS?priority=3&targetClient=1&CCSID=819&persistence=1' - trying to recover. Cause: JTA transaction unexpectedly rolled back (maybe due to a timeout); nested exception is javax.transaction.RollbackException
    [5/27/15 16:38:19:348 PDT] 00000045 DefaultMessag I org.springframework.jms.listener.DefaultMessageListenerContainer refreshConnectionUntilSuccessful Successfully refreshed JMS Connection

    The message is being processed and this error is causing the message to be redelivered. Is there any way to overcome this?

    ReplyDelete
    Replies
    1. I just googled for the error and I found the following blog post where someone got a solution from IBM for this error:

      http://extremeportal.blogspot.com/2010/08/wtrn0086i-xaexception-encountered.html

      Delete
  95. Very nice post Bruce. A question from my side - The spring docs says this about the SingleConnectionFactory class which is the super class of CachedConnectionFactory class.

    "A JMS ConnectionFactory adapter that returns the same Connection from all createConnection() calls, and ignores calls to Connection.close(). According to the JMS Connection model, this is perfectly thread-safe (in contrast to e.g. JDBC). The shared Connection can be automatically recovered in case of an Exception."

    My questions :

    1 - So does this mean if there is a connection failure between the JMS client application and a remote JMS provider due to network failures or if the JMS provider goes down for some time then the same old connection object will be re-used by the client when the connection is re-established?

    2 - If client is using CachingConnectionFactory to cache the sessions , then what happens to the cached session object in the above scenario? Will they be created newly from the same connection object?

    3 - I am seeing the below error while trying to send messages from my Mule ESB application (Client App) to a remote Weblogic JMS broker. Session caching is enabled in Mule & it uses Spring's CachingConnection factory & Weblogic's "wlthint3client-10.3.3.jar" client driver class to interact with the JMS broker.

    Error Stack :

    Exception stack is:
    1. Producer not found (java.lang.Exception)
    weblogic.messaging.dispatcher.InvocableManager:107 (null)
    2. java.lang.Exception: Producer not found (weblogic.messaging.dispatcher.DispatcherException)
    weblogic.messaging.dispatcher.Request:1169 (null)
    3. weblogic.messaging.dispatcher.DispatcherException: java.lang.Exception: Producer not found (weblogic.messaging.dispatcher.DispatcherException)
    weblogic.messaging.dispatcher.DispatcherWrapperState:345 (null)
    4. weblogic.messaging.dispatcher.DispatcherException: weblogic.messaging.dispatcher.DispatcherException: java.lang.Exception: Producer not found(JMS Code: null) (weblogic.jms.common.JMSException)
    weblogic.jms.dispatcher.DispatcherAdapter:116 (null)

    ReplyDelete
  96. Ayaskant, it's been a while since I've worked with all of this but if I recall correctly a new connection is created from the Session object when necessary.

    I'm not familiar with the Weblogic JMS client so I have no idea what the 'Product not found' error is exactly.

    Bruce

    ReplyDelete
  97. This comment has been removed by the author.

    ReplyDelete
  98. Hello Bruce, thanks for the blog. However, I don't understand why everybody is scared of receive*() methods on JMSTemplate. Also, I didn't see many use cases for Request-Reply that is much more common in real life. All the clients I worked with used only Request-Reply except one.
    Also, all clients were using IBM MQ. And setting files only allowed me to use org.springframework.jms.connection.SingleConnectionFactory with org.springframework.jndi.JndiObjectFactoryBean. Most client I worked didn't have high concurrency requirement. But I wonder whether reply can be missed if JMSTemplate creates connection/session/consumer for each receive. I tried to see code and it did seem to check Session etc before creating new Connection. I am not using any Session (Transaction). I suspect JNDI resource will cache connection. But what about Session and Consumer? Is it going to create new one for each receive*() method post send method? Please note that so far we never saw a ticket for missed replies. But then as I said the load was never high. Maybe 100 transactions per minute max.

    ReplyDelete
    Replies
    1. You should read up on the differences between the SingleConnectionFactory vs. the CachingConnectionFactory. Basically the CachingConnectionFactory is much more efficient, even if you don't need a large pool of connections. The SingleConnectionFactory opens/closes connections upon *every request to the message broker*, i.e., lots of additional and unnecessary overhead.

      I have no idea what your JNDI implementation will do with the sessions/connections. You will need to investigate the docs for the implementation to find out.

      The client won't miss any messages due to creation/destruction of connections. The client and broker are designed to handle this well.

      Delete
  99. Hi Bruce, I am using websphere application server and following is my Producer context xml :








    com.ibm.websphere.naming.WsnInitialContextFactory
    iiop://localhost:2809
























    i have same code as yours as below but the message is not going to the queue and there are no errors, using similar configuration receive works but Send doesn't. There is no issue with the queue as such as simple non spring JMS works and can send message :


    //following doesnt seem to throw any error but mesage doesnt go to the queue
    jmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    TextMessage message = session.createTextMessage(payload);
    message.setIntProperty("messageCount", count);
    LOG.info("Sending message number '{}'", count);

    return message;
    }
    });

    ReplyDelete
  100. Hello Bruce. Many thanks for the wonderful article. I am using the similar stuff but the problem is my application restarts every day through cron and it seems after some days the MQ reaches its maximum number of connections. This means connections are not getting closed somehow.














    xxx.request=queue:///xxx.CREDIT.REQ?targetClient=1

    Your help will be highly appreciated.

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete

    2. bean id="MQQueueConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"
      p:hostName="${queue.host}"
      p:port="${queue.port}"
      p:queueManager="${queue.manager}"
      p:channel="${queue.channel}">



      /bean

      bean id="JmsTemplate" class="org.springframework.jms.core.JmsTemplate"
      p:connectionFactory-ref="JMSConnectionFactory"
      p:messageConverter-ref="MessageConverter"
      p:defaultDestinationName="${xxx.request}"/>


      bean id="JMSConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
      p:targetConnectionFactory-ref="MQQueueConnectionFactory"/>
      /beans

      Delete
    3. Sounds like a connection factory may not be closing connections properly. I would profile the app to determine exactly what is happening. If it is indeed the connections not being closed, then you will need to understand why and from where this behavior is originating.

      Delete
    4. Thank you Bruce for your prompt reply. I was able to gather some more details from MQ. The connections are closed when the application is restarted but with every request throughout the day, it keeps on adding a new connection and suspending the previous connections without closing them.

      CONNAME(10.20.100.xx) ASTATE(SUSPENDED)
      CONNAME(10.20.100.xx) ASTATE(SUSPENDED)
      CONNAME(10.20.100.xx) ASTATE(SUSPENDED)
      CONNAME(10.20.100.xx) ASTATE(ACTIVE)
      CONNAME(10.20.100.xx) ASTATE(SUSPENDED)

      Out of 5 connections to which the queue is connected, 4 are in suspended state and 1 remains active. Is there a way to close the suspended ones instead of changing it to suspended.

      The connection is established through jmsTemplate.receiveSelectedAndConvert(String destinationName, String messageSelector) and the destination name is provided with each request.

      Thanks again in advance.

      Delete
    5. The suspended states on those connections is due to WebsphereMQ, not Spring. So, I would investigate why this is happening from the WebsphereMQ side. A quick Google search revealed the following document with some information about why this is occurring:

      https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.5.0/com.ibm.mq.explorer.doc/e_properties_appconn.htm

      This document makes it clear that the issue is coming from WebsphereMQ.

      Delete