21 May 2010

Tuning JMS Message Consumption In Spring



In a previous blog post titled Using Spring to Receive JMS Messages, I introduced the use of the Spring default message listener container for asynchronous consumption of JMS messages. One very common discovery that folks make when first using JMS is that producers can send messages much faster than consumers can receive and process them. When using JMS queues, I always recommend the use of more consumers than you have producers. (When using JMS topics, you should only use a single consumer to guard against receiving the same message multiple times.) This is a normal situation with message-oriented middleware (MOM) and it is easy to handle if you are using the Spring message listener container.

The Spring DefaultMessageListenerContainer (DMLC) is a highly flexible container for consuming JMS messages that can handle many different use cases via the numerous properties that it provides. For the situation mentioned above, the DMLC offers the ability to dynamically scale the number of consumers. That is, as the number of messages available for consumption increases, the DMLC can automatically increase and decrease the number of consumers. To configure the DMLC to automatically scale the number message consumers, the concurrentConsumers property and the maxConcurrentConsumers property are used. Below is an example JMS namespace style of XML configuration that employs these properties:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://foo.example.com:61616" />

<!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

<!-- A JMS namespace aware Spring configuration for the message listener container -->
<jms:listener-container
container-type="default"
connection-factory="connectionFactory"
acknowledge="auto"
concurrency="10-50">
<jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
</jms:listener-container>
</beans>


Notice the concurrency="10-50" property above. This is a simplified configuration for setting the concurrentConsumers=10 and the maxConcurrentConsumers=50 properties of the DMLC. This tells the DMLC to always start up a minimum of 10 consumers. When a new message has been received, if the maxConcurrentConsumers has not been reached and the value of the idleConsumerLimit property has not been reached, then a new consumer is created to process the message. This behavior from the DMLC continues up to the limit set by the maxConcurrentConsumers property. When no messages are being received and the consumers become idle, the number of consumers is automatically decreased.

(NOTE: The idleConsumerLimit property is used to specify the the maximum number of consumers that are allowed to be idle at a given time. The use of this property was recently clarified a bit in the Spring 3.x trunk. Increasing this limit causes invokers to be created more aggressively. This can be useful to ramp up the number of consumers faster.)

It is important to be aware of a couple of things related to this dynamic scaling:
  1. You should not increase the number of concurrent consumers for a JMS topic. This leads to concurrent consumption of the same message, which is hardly ever desirable.
  2. The concurrentConsumers property and the maxConcurrentConsumers property can be modified at runtime, e.g., via JMX

The dynamic scaling can be tuned even further through the use of the idleTaskExecutionLimit property. The use of this property is best explained by a portion of the Javadoc:


Within each task execution, a number of message reception attempts (according to the "maxMessagesPerTask" setting) will each wait for an incoming message (according to the "receiveTimeout" setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified "idleTaskExecutionLimit", it will shut down (in case of dynamic scaling).

Raise this limit if you encounter too frequent scaling up and down. With this limit being higher, an idle consumer will be kept around longer, avoiding the restart of a consumer once a new load of messages comes in. Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value, which will also lead to idle consumers being kept around for a longer time (while also increasing the average execution time of each scheduled task).


Note the recommendations if you experience dynamic scaling taking place too often. To deal with this situation, you should experiment with increases to one or more of the following properties:
  • idleTaskExecutionLimit - The limit for the number of allowed idle executions of a receive task. The default is 1 causing idle resources to be closed early once a task does not receive a message.
  • maxMessagesPerTask - The maximum number of messages to process in a single task. This determines how long a task lives before being reaped. The default is unlimited (-1) so you may not need to change this property.
  • receiveTimeout - The timeout to be used for JMS receive operations. The default is 1000 ms.
  • As I noted above, in the Spring 3.x trunk, the idleConsumerLimit property was clarified a bit recently and exposed as a writable property. This is yet another property for tuning for situations where you need to ramp up the number of concurrent consumers faster.

One important thing to note about using these various properties for tuning. These are not usable in the JMS namespace style of XML configuration. To use these properties, you must use either a pure Spring XML configuration or straight Java. Below is an example of how to use the receiveTimeout property and the idleTaskExecutionLimit property:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://foo.example.com:61616" />

<!-- The JMS destination -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"
physicalName="TEST.FOO" />

<!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

<!-- A pure Spring configuration for the message listener container -->
<bean id="msgListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="destination"
p:messageListener-ref="simpleMessageListener"
p:concurrentConsumers="10"
p:maxConcurrentConsumers="50"
p:receiveTimeout="5000"
p:idleTaskExecutionLimit="10"
p:idleConsumerLimit="5" />

</beans>

In the example configuration above, the receiveTimeout property is set to five seconds to tell the DMLC's receive operation to poll for message for five seconds instead of the default one second. Also, the idleTaskExecutionLimit property is set to 10 to allow tasks to execute 10 times instead of the default value of 1. Lastly, the idleConsumerLimit property specifies the limit on the number of idle consumers. This property can be used to more aggressively ramp up the number of concurrent consumers.

In addition to tuning these various properties for dynamic consumer scaling, it is also important to understand that the DMLC can also provide various levels of caching for JMS resources (i.e., JMS connections, sessions and consumers). By default, the DMLC will cache all JMS resources unless an external transaction manager is configured (because some containers require fresh JMS resources for external transactions). When an external transaction manager is configured, none of the JMS resources are cached by defualt. The level of caching can be configured using the cacheLevel property. This property allows for a tiered caching from connection, to session, to consumer. This allows caching of:
  • The connection
  • The connection and the session
  • The connection, the session and the consumer

Below is an example configuration that uses the cacheLevel property to specify consumer level caching:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://foo.example.com:61616" />

<!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

<!-- A JMS namespace aware Spring configuration for the message listener container -->
<jms:listener-container
container-type="default"
connection-factory="connectionFactory"
acknowledge="auto"
concurrency="10-50"
cache="consumer">
<jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
</jms:listener-container>
</beans>

By caching at the consumer level, this means that the connection, the session and the consumer is cached. Notice that the cacheLevel property can be used with the Spring JMS namespace style of XML configuration.

The session is cached based on ack-mode and the consumer is cached based on the session, the selector and the destination. It's necessary to know this info to better understand where/when the JMS resources can and cannot be reused. For example, caching consumers that use different selectors and consume from different destinations is only going to be relevant if you partition these items appropriately for reuse. That is, you may need to use a separate connection and listener-container configuration if the cache keys are different and if you want to cache sessions or consumers for reuse.

The overall point of the caching is that it can help to reduce the potential recurring thrash involved in creation and destruction of JMS resources. Reducing the thrash by using caching and employing the appropriate partitioning of these resources so as to allow for reuse can definitely improve the overall performance of the application.

Hopefully this post helps you understand how to tune JMS message consumption in Spring. As you employ Spring JMS to your applications and experiment further and further, you will discover how much the DMLC is actually doing for you and how many more features it has beyond what you can easily build yourself.

236 comments:

  1. Bruce,

    The ActiveMQ page (http://activemq.apache.org/spring-support.html) suggests the use of org.apache.activemq.pool.PooledConnectionFactory as the connectionFactory for JmsTemplate. Your tutorial suggests the use of CachingConnectionFactory. How do you compare the two solutions?

    ReplyDelete
  2. The difference between the PooledConnectionFactory and the CachingConnectionFactory is a difference in implementation. Below are some of the characteristics that differ between them:

    * Although both the PooledConnectionFactory and the CachingConnectionFactory state that they each pool connections, sessions and producers, the PooledConnectionFactory does not actually create a cache of multiple producers. It simply uses a singleton pattern to hand out a single cached producer when one is requested. Whereas the CachingConnectionFactory actually creates a cache containing multiple producers and hands out one producer from the cache when one is requested.

    * The PooledConnectionFactory is built on top of the Apache Commons Pool project for pooling JMS sessions. This allows some additional control over the pool because there are features in Commons Pool that are not being used by the PooledConnectionFactory. These additional features include growing the pool size instead of blocking, throwing an exception when the pool is exhausted, etc. You can utilize these features by creating your own Commons Pool GenericObjectPool using your own customized settings and then handing that object to the PooledConnectionFactory via the setPoolFactory method. See the following for additional info:

    http://commons.apache.org/pool/api-1.4/org/apache/commons/pool/impl/GenericObjectPoolFactory.html

    * The CachingConnectionFactory has the ability to also cache consumers. Just need to take care when using this feature so that you know the consumers are cached according to the rules noted in the blog post.

    * But most importantly, the CachingConnectionFactory will work with any JMS compliant MOM. It only requires a JMS connection factory. This is important if you are using more than one MOM vendor which is very common in enterprise organizations (this is mainly due to legacy and existing projects). The important point is that the CachingConnectionFactory works very well with many different MOM implementations, not only ActiveMQ.

    ReplyDelete
  3. Hello Bruce,

    Can you plz help?

    I believe the JMSTemplate is never closing the connection due to which the connection is never getting returned to the pool.
    The application works very well till the moment there are free connections in the pool. But once i start with performance testing, i start getting exception "The JCA runtime failed to allocate a connection. (WAS6.1)" as till this time there are no free connections in the pool.

    Do we need to explicitly close the connection?
    Moreover, i dont see any use of using either SingleConnectionFactory or CachingConnectionFactory as they simply cache the connection and would not solve the root problem and i believe that two transaction cannot be committed on one shared connection.

    Please help, its urgent.

    ReplyDelete
  4. In continuation of my above message, i can see that JMSTemplate is calling a release method from a Util class which is ultimately closing the connection. And this method is getting called again and again (i believe may be for checking..)
    But somehow the connection is not getting returned to the pool.

    ReplyDelete
  5. This comment has been removed by the author.

    ReplyDelete
  6. Found the solution, but still figuring out how it gave problem with Websphere pooling.

    Was having maxConcurrentConsumer=5 (concurrentConsumer=1) in DefaultMessageListenerContainer and the maximum pool size was 10. It usually start well but the moment the load/concurrency level increased, it gallops all connections from the pool and i start receiving the error.

    Modified maxConcurrentConsumer=1 and everything went absolutely fine.

    Dont know but as per my understanding only 5 threads must be using the connection.. Still figuring out.

    ReplyDelete
  7. @Dapinder - Below is some information about your points as well as some questions from me:

    * A JMS transaction spans a session, not a connection. This allows many sessions to be opened from a single connection. My experience is that this model works very well.

    * When using the CachingConnectionFactory, you can increase the sessionCacheSize for high concurrency environments. This may have helped your problem, but I cannot be sure without doing some testing.

    * The JmsTemplate aggressively closes the session and connection every time a message is sent, so there is no need to manually close them. This is why it is imperative to use a connection factory that caches/pools the sessions. What JMS connection cache/pool are you using?

    * It is possible that lowering the concurrency works in your environment because there was not a sufficient number of sessions available for the consumers. But without more information and some testing I cannot say for sure. I also know that the IBM Websphere MQ connection factory can behave very different from some other connection factories with it's own aggressive caching.

    ReplyDelete
  8. Bruce,

    When I try with CachingConnectionFactory inside J2EE Container it throws below two errors,
    - Method setExceptionListener not permitted
    - Method stop not permitted.
    The problem is SingleConnectionFactory tries to set the exception listener to the target connection object and the container throws errors.
    As per the API for SingleConnectionFactory, "Useful for testing and standalone environments in order to keep using the same Connection for multiple JmsTemplate calls, without having a pooling ConnectionFactory underneath. This may span any number of transactions, even concurrently executing transactions."
    So this should not be used inside a J2EE container.
    From APP server we can pool Connection objects, but I need to pool session objects too.
    Is there a way to do this inside J2EE / Spring container ?

    ReplyDelete
  9. @Dapinder, I hazard a guess that this is due to restrictions in the J2EE spec. See the following for more information specifically regarding Websphere:

    http://www-01.ibm.com/support/docview.wss?uid=swg21114239

    However, you must either be explicitly calling the setExceptionListener method or setting the reconnectOnException to true. These are the two conditions guarding a call to the setExceptionListener method in the SingleConnectionFactory class:

    https://fisheye.springsource.org/browse/spring-framework/tags/spring-framework-3.0.2.RELEASE/org.springframework.jms/src/main/java/org/springframework/jms/connection/SingleConnectionFactory.java?r=HEAD#l355

    If you are using Websphere MQ, I recommend using the Websphere MQ connection factory because it can provide a pool of connections (can't recall if it also pools sessions). As long as the JmsTemplate is using pooled JMS resources, it will be fine.

    ReplyDelete
  10. Thanks a lot Bruce for your prompt response...

    I am now facing a new problem :-).. dont know till when these get resolved.

    Everything works fine on a Non-Clustered WAS 6.1 installed Linux Machine but the moment the code is promoted to Clustered one, it starts giving error.
    Switched on the Websphere logging with a property enabled to track Connection leak problem. (ConnLeakLogic=finest) but didnt get any leakage in my code.
    FYI, all connections, sessopns are managed by Spring i.e. JMSTemplate(no CachedConnectionFactory/SingleConnec... implemented) and DefaultMessageListenerontainer (CACHE_CONSUMER, sessionTrasacted).
    I have done enough local testing on WAS 6.1 installed on Linux.
    I and two of my team mates i.e. three of us simultaneously dropped nearly 400 messages on queue in a span of 2-3 minutes and it was a success.

    Using Websphere Default JMS provider and not MQ.

    Found some concerns in the trace.log. I am sharing them below:

    [6/4/10 13:12:07:125 UTC] 0000011f DefaultMessag I org.springframework.jms.listener.DefaultMessageListenerContainer handleListenerSetupFailure Setup of JMS message listener invoker failed for destination 'queue://gimEmailQueueDest?deliveryMode=Persistent&priority=5&busName=11119-te-applicationserverbeta' - trying to recover. Cause: CWSIA0085E: An exception was received during the call to the method JmsMsgConsumerImpl.createCoreConsumer: com.ibm.websphere.sib.exception.SIResourceException: CWSIP0517E: Cannot attach to queue point for destination gimEmailQueueDest on messaging engine s262381ch3el111Node01.11119-te-applicationserverbeta111-11119-te-applicationserverbeta..
    [6/4/10 13:12:07:127 UTC] 0000011f DefaultMessag I org.springframework.jms.listener.DefaultMessageListenerContainer refreshConnectionUntilSuccessful Successfully refreshed JMS Connection
    [6/4/10 13:15:07:185 UTC] 000001a1 ConnLeakLogic 3 MCWrapper id 3c183c18 Managed connection [com.ibm.ws.sib.api.jmsra.impl.JmsJcaManagedConnection@448010932 ]> ]> ]> ]> ]]> ] State:STATE_ACTIVE_INUSE
    in-use for 370233ms
    java.lang.Throwable
    at com.ibm.ejs.j2c.ConnectionManager.allocateConnection(ConnectionManager.java:770)
    at com.ibm.ws.sib.api.jmsra.impl.JmsJcaConnectionFactoryImpl.createConnection(JmsJcaConnectionFactoryImpl.java:255)
    at com.ibm.ws.sib.api.jms.impl.JmsManagedConnectionFactoryImpl.createConnection(JmsManagedConnectionFactoryImpl.java:208)
    at com.ibm.ws.sib.api.jms.impl.JmsManagedConnectionFactoryImpl.createConnection(JmsManagedConnectionFactoryImpl.java:161)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:184)


    I believe that the connection WAS is providing is a SHAREABLE one and calling close() on a shaeable connection doesn’t actually return it to the pool.
    But at the same time SHAREABLE connections are having advantage of not getting a new connection each time on createConnection call.
    Going to try this now..... but still not able to understand what exactly the problem is...

    ReplyDelete
  11. I was getting a memory leak when using the latest release version of ActiveMQ (5.3.1) and the latest release version of Spring (3.0.1). The memory leak happens many consumerId objects that are never garbage collected. I was using the org.apache.activemq.pool.PooledConnectionFactory as the connectionfactory for a jms:listener-container of type default and having concurrency set to 3-10. When I switch to a CachingConnectionFactory the memory leak dissapeared.

    ReplyDelete
  12. @vascoace, Given that the only thing you changed was the connection factory, it would appear that the issue is with the ActiveMQ PooledConnectionFactory. Did you create a JIRA issue for this? Are you now passing the ActiveMQ connection factory to the Spring CachingConnectionFactory?

    Please note that it's not necessary to use the CachingConnectionFactory with the Spring DefaultMessageListenerContainer (DMLC). The DMLC already caches Connections, Sessions and MessageConsumers by default (unless you have an external transaction manager configured).

    ReplyDelete
  13. Bruce, no I haven't created a JIRA issue but I will create one today. yes, I'm passing the ActiveMQ connection factory to the Spring CachingConnectionFactor. I will try your suggestion of using DefaultMessageListenerContainer (DMLC) with the PooledConnectionFactory and check if the memory leak is still there. I'll let you know the result.

    Thanks,

    Alberto Acevedo

    ReplyDelete
  14. I tested using the DefaultMessageListenerContainer (DMLC) with the PooledConnectionFactory and it works great with no memory leak. So to resume, the memory leak happens when combining the org.apache.activemq.pool.PooledConnectionFactory as the connectionfactory inside a jms:listener-container of type default. I will create a JIRA issue for this bug. I took me a while to find the source of the leak.

    Thanks,

    Alberto Acevedo

    ReplyDelete
  15. @Alberto, The DMLC actually is the jms:listener-container of type default. So your statement is confusing. Can you clarify?

    ReplyDelete
  16. Bruce, I get the leak when I use the pooledConnection with ... I get no more leaks.

    I rechecked my configuration and I saw that I was using PooledConnectionFactory for the jms:listener-container but I was using a CachingConnectionFactory for the JmsTransactionManager. I think the use of different factories (that is basically wrong) is what caused the leak. I will test again using the same pooledConnection factory. Below is a section of my configuration that caused the leak:








































    Thanks,

    Alberto Acevedo

    ReplyDelete
  17. Hi Bruce!

    I'm using the CachingConnectionFactory as you suggested and it works fine.
    I have now added the concurrency attribute, since I don't want
    to wait until a long process gets completed.
    My problem now is that the "receiveFromQueue" object is the same for all requests.
    I want spring to create a new object (receiveFromQueue) every time I start to process a new queue item.
    I have tried to add the scope="prototype" attribute, but it does not help.
    Do you have any idea what the problem can be?










    jms:listener-container connection-factory="queueConnectionFactory"
    cache="none"
    destination-type="queue"
    client-id="queueConnectionFactory"
    concurrency="4-6"
    prefetch="1">



    Thanks in advance,
    Pelle

    ReplyDelete
  18. Sorry, my code disappear, here it comes.

    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter" scope="prototype">
    <constructor-arg >
    >ref bean="receiveFromQueue"/>
    </constructor-arg>
    </bean>

    <bean id="receiveFromQueue" class="com.astrazeneca.earchive.transformation.manager.ReceiveFromQueueImpl" scope="prototype"/>
    <jms:listener-container connection-factory="queueConnectionFactory"
    cache="none"
    destination-type="queue"
    client-id="queueConnectionFactory"
    concurrency="4-6"
    prefetch="1">
    <jms:listener
    destination="queue.transformation"
    ref="queueListener"
    subscription="myQueueSubscription"/>
    </jms:listener-container>

    ReplyDelete
  19. Here is a link with the same issue I mentioned above.
    Maybe you understand me better reading that.

    Thanks,
    Pelle

    ReplyDelete
  20. Forgot the link :)

    http://forum.springsource.org/showthread.php?t=55457

    ReplyDelete
  21. @Simning/@Pelle (I think you are the same person), By design, the Spring DMLC will only create one instance of the message listener. The only way I can think to get more than one instance is to register multiple instances of the message listener with the DMLC or by manually injecting a new instance into the DMLC at runtime as necessary (but this would require you to access the DMLC via Java instead of XML config).

    This is not something that I have ever seen a requirement for. Why do you want a unique instance of the message listener created for each receiver?

    ReplyDelete
  22. For the moment we process one queue item at a time. But since it takes a really long time for some queue items to get processed I would like to process more than one queue item concurrently. I can do that by setting the concurrency flag in the xml. But since they are processed concurrently I do not want them to use the same object since they will override eachothers instance variables.

    Do you know what I mean?

    One solution could be to remove the instance varibles and pass them around as method parameters through all method calls, but there are a bunch of them and I dont think that is a nice solution.

    Kind regards,
    Pelle (yes, it's the same person :))

    ReplyDelete
  23. @Pelle, OK, now that I understand the use case, I can offer a couple of things.

    First, IMO a message listener should be stateless. There are many reasons to design message listeners (and POJOs in general) to be stateless, including the fact that it makes your code more loosely coupled which lowers the likelihood that changes to it will cause errors. The same is true for errors due to threads overwriting variables and race conditions, it makes them much less likely to occur. I could on an on with this topic, but you can Google for the benefits of statelessness.

    Second, if you really want to keep the state in the message listener, then there's actually a new scope in Spring that will help you. Check out the link to the thread scope that is within the following section of the Spring 3.0 Bean Scopes doc:

    http://static.springsource.org/spring/docs/3.0.x/spring-framework-reference/html/beans.html#beans-factory-scopes

    There is also some info here:

    http://www.springbyexample.org/examples/custom-thread-scope-module.html

    This will solve your problem because it will restrict the scope of the message listener to the thread. However, note the caveat in the SimpleThreadScope Javadoc (http://static.springsource.org/spring/docs/3.0.x/javadoc-api/org/springframework/context/support/SimpleThreadScope.html) that reads as follows:

    'Note that the SimpleThreadScope does not clean up any objects associated with it.'

    If this is a problem for you, the following information may be helpful to you since it demonstrates how to register a Callback for cleanup of the thread-scoped beans:

    http://springindepth.com/book/in-depth-ioc-scope.html#in-depth-ioc-scope-custom-scopes

    ReplyDelete
  24. Thanks!

    I will have a closer look at your solutions.

    Sorry for the late response but I have been on vacation.

    ReplyDelete
  25. Hi bruce,

    we use spring integration along with spring JMS with activemq connection factory,

    We have the following doubts,

    if we have concurrent consumer = 5, does the client maintain 5 connections with the activemq broker. we have Activemq pooled connection facotry with max Connections = 8.

    We also have prefetch = 1000, does that mean 1000 for all the 5 concurrent consumers or 1000 for a single consumer thread.

    ReplyDelete
  26. hi bruce,
    I am running into strange error Message driven channel adapter from spring integration.
    More details posted in spring integration forum.
    http://forum.springsource.org/showthread.php?t=98254

    I have seen this on load testing with 180K messages. I am using default jms provider SI bus from websphere v 7.x
    Appreciate, if you can shed some light on this.
    Thanks
    sri

    ReplyDelete
  27. @Radhakrishnan, When you set concurrency=5, that means that there are five consumers executing concurrently to receive messages and each of those consumers will need a connection to the message broker.

    As for the prefetch, that is set on a per consumer basis. So each consumer will have a prefetch set to 1000.

    I hope that helps.

    ReplyDelete
  28. @meltyourfat, Are you sure that you have 10 connections in the pool and they are not being closed? It looks as if the DMLC is caching the connections, but they are not actually alive.

    In the configuration for the JndiObjectFactoryBean, you should add the proxyInterface property. Below is an example of this:

    <beans:bean id="jmsQueueConnectionFactory"
    class="org.springframework.jndi.JndiObjectFactoryBean">
    <beans:property name="jndiTemplate" ref="jndiTemplate"/>
    <beans:property name="jndiName" value="${jms.jndi.connection.factory}"/>
    <beans:property name="lookupOnStartup" value="false" />
    <beans:property name="cache" value="false" />
    <beans:property name="proxyInterface" value="javax.jms.QueueConnectionFactory" />
    </beans:bean>

    Also note the addition of the lookupOnStartup=false and cache=false properties along with the proxyInterface property.

    Give it a try, see if it helps.

    ReplyDelete
  29. Thanks Bruce .

    We in our use case, tried to have 30 + consumers to a persistent ActiveMQ Queue.
    when we try consuming , we find many threads are waiting on deque() method.
    It looks like they starve for messages. This problem does not come when we have 10 - 20 threads. more than 20- 25 threads cause this problem .
    Is it expected because CPU, memory, GC every thing are normal on the Broker side.
    we have a prefetch of 1000, and use JMS caching conection Factory as sugested by you.

    Have you tested with more than 40 to 50 conections to a single ActiveMQ persistent queue.

    Thanks,
    D.Radhakrishnan

    ReplyDelete
  30. Hi bruce, this was the stack trace we got in all of the waiting threads /...
    java.lang.Thread.State: TIMED_WAITING
    at java.lang.Object.wait(Native Method)
    - waiting on <847d1f> (a java.lang.Object)
    at org.apache.activemq.MessageDispatchChannel.dequeue(MessageDispatchChannel.java:77)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:428)
    at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:554)
    at org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:74)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:405)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:978)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:970)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:871)
    at java.lang.Thread.run(Thread.java:619)

    ReplyDelete
  31. @Radhakrishnan D, I have tested with considerably more consumers and had much success. Obviously, supporting a large number of connections cannot happen without tuning the broker configuration and quite possibly the consumer configuration. Here are some questions and suggestions:

    * Have you increased the memory appropriately for the JVM in which ActiveMQ is started?
    * Have you increased the appropriately in the broker config?
    * If your consumers are backing up and/or processing slow, you should consider decreasing the prefetch for each one.
    * You should consider reducing the number of threads used for connections into the broker by using the NIO connector instead of the TCP connector in the broker config.
    * You should also consider changing the internal broker dispatch model by setting the system property org.apache.activemq.UseDedicatedTaskRunner=false so that a thread pool is used.
    * Depending on the version of ActiveMQ you are using (I assume 5.4.x), you should also consider tuning the KahaDB configuration.

    These are all pretty standard items when it comes to tuning ActiveMQ.

    ReplyDelete
  32. Hi bruce,

    We have 2048 MB heap allocated to the activemq broker on that machine.
    tab was commented in our xml.

    we use NIO and we have dedicatedTaskRunner= false. we tried with true and false, they produce same results in our case.

    When we have 15 consumers, no threads wait at dequeue.

    The memory limit allocated to the queue in policy entry was 100 mb.

    on the connection factory. we have optimizedAck = false and alwaysSessionAsync= false.

    This is our kahaDB configuration ,






    We use vmQueueCursor as our cursor policy and we have JMSXGroupID enabled.

    There was no pressure on CPU, memory and network on the broker side.

    Thanks,
    D.Radhakrishnan

    ReplyDelete
  33. Config was not displayed in the previous comment.
    Writing it again

    ReplyDelete
  34. @Radhakrishnan D, The XML config that you pasted will not show up if it is pasted as XML. You must convert the < and > symbols to proper HTML character entities for them to show up properly.

    ReplyDelete
  35. Hi Bruce,

    I have to listen to a jms topic which publishes messages for lot of items. I have to only listen to specific items which are in scope in the app. As time goes by, items come and go out of scope.

    I understand that spring DMLC can only listen to an item from topic using selector. Should i create new DMLC for each item that comes in scope? Is it advisable? Items can reach to number of 500-700 at any given point of time in the app. Or should i have only 1 DMLC listening to all the messages from topic and filter it in the app?

    Please help making this decision. I am not aware of dmlc performance and messages it can handle per second.

    Thanks
    Dipesh

    ReplyDelete
  36. @Dipesh, The DMLC is highly tunable because there is no magic bullet configuration for message consumption.

    Use of JMS selectors can be tricky for a couple of reasons. First, message selectors can slow down message consumption because the message broker must apply the selector to every message in a destination to see if it matches the subscription. This can be costly in terms of performance.

    Second, message selectors are cached by the DMLC invoker on the consumer (along with the destination). This is important because it has a direct impact on the amount of overhead from consumer caching and therefore reuse of the consumer. Because the selector and the destination are cached on each consumer, this limits the ability to reuse a consumer that has a selector defined. As stated in the blog post, the way to work around this is to use separate connection and DMLC instances so that those consumers can be reused.

    So it sounds to me like you should probably define separate connection and DMLC instances. My recommendation is to test different configurations to determine what works best in your environment. This is critical because no two environments are the same.

    ReplyDelete
  37. Thanks Bruce for the prompt response.

    JMS topic is publishing messages for items which are out of scope. Hence i have to use selector to only listen to items which are in scope.

    so i have to create new consumer everytime an item comes in scope. i.e new DMLC instance with separate connection and selector. I can only have 1 consumer withnin DMLC listening to topic as ordering matters for the item message.

    does it sound ok Bruce? i would need to create around 500 DMLC's to test as there can be these many items in scope.

    How can these be reused? Can you throw some light on it so that i can go ahead and test in my environment?

    Thanks
    Dipesh

    ReplyDelete
  38. @Dipesh, Instead of creating that many DMLC instances, you might want to consider a smaller number of DMLC instances with many listener instances for DMLC instance. Below is an example:

    ...
    <bean id="bookOrderService" class="com.mycompany.orders.BookOrderService" />
    <bean id="ebookOrderService" class="com.mycompany.orders.EBookOrderService" />
    <bean id="musicOrderService" class="com.mycompany.orders.MusicOrderService" />
    <bean id="movieOrderService" class="com.mycompany.orders.MovieOrderService" />

    <jms:listener-container
    connection-factory="mediaConnectionFactory"
    task-executor="mediaTaskExecutor"
    destination-resolver="mediaDestinationResolver"
    transaction-manager="springPlatformTxMgr">
    <jms:listener destination="ORDERS" ref="bookOrderService" method="placeOrder" selector="media = 'book'" response-destination="REPLIES" />
    <jms:listener destination="ORDERS" ref="eBookOrderService" method="placeOrder" selector="media = 'ebook'" response-destination="REPLIES" />
    <jms:listener destination="ORDERS" ref="musicOrderService" method="placeOrder" selector="media = 'music'" response-destination="REPLIES" />
    <jms:listener destination="ORDERS" ref="movieOrderService" method="placeOrder" selector="media = 'movie'" response-destination="REPLIES" />
    ...
    </jms:listener-container>
    ...

    This demonstrates another great advantage of the DMLC in its ability to separate the definition of listeners from the DMLC itself. This way the distinguishing factor for DMLC instances are the attributes of the DMLC instead of the attributes on each listener. Of course, you will need to experiment to figure out what works best in your environment.

    ReplyDelete
  39. This comment has been removed by the author.

    ReplyDelete
  40. hi,
    i configured a Queue listener as u said in spring config file, and my listener getting connected to the specified queue on the startup on the jboss server. and it is continuously listening to that queue for messages. but i want my queue listener should get connected to queue on a regular intervals of each 5 mins. and if there is a message consume it. otherwise i just want my listener should get de-connect from the queue.

    how to configure my listener to meet my requirement.

    Thanks,
    Anil kumar K

    ReplyDelete
    Replies
    1. Hi Anil,
      how did you deploy the Queue Listener on JBoss?
      Please can share the steps.

      Thanks,
      M.Madhukar

      Delete
  41. @Anil, The Spring DefaultMessageListenerContainer uses a message-driven approach, i.e., it is designed to poll constantly once it is started up and there is no way to adjust the polling interval. If you want to poll for messages at a set interval as you described, you could do this manually by using the Spring JmsTemplate along with a Spring TaskScheduler to periodically invoke the JmsTemplate.receive() method to synchronously receive messages. But handling all of this manually is a big pain in the neck because Spring Integration provides this very set of features.

    Spring Integration offers a solution to your very scenario using a type of message endpoint called a JMS Inbound Channel Adapter. It uses a JMS connection factory and a JMS destination along with an interval to periodically poll the destination for messages. This is the perfect solution for your scenario because it handles everything I described above but it does so using a very simple configuration.

    Hope that helps.

    ReplyDelete
  42. Hi Bruce,

    I am relative new to spring and jms. My app requires to have at least two connections to jms queue any time. This connections should come from connection pool.

    I am using spring for configuration, using jms:listener-container with concurrency of 5-10 and activemq PooledConnectionFactory.
    So when i run it, activemq console shows same connection with different session id.
    So i am not sure how to accomplish this.

    ReplyDelete
  43. @changed, The behavior you have observed is exactly how the connection/session model for JMS typically works; that is, many sessions are created from a single connection. According to the JMS spec, a JMS compliant message broker is required to support this model.

    The ActiveMQ PooledConnectionFactory (PCF) does work a bit differently than the Spring CachingConnectionFactory (CCF) in that the CCF strictly adheres to this connection/session model but the PCF actually creates many connections and many sessions from each connection. However, for situations that require separate JMS connections, the best solution is to define two distinct connection pools. This will guarantee that you are using two completely unique connections to the message broker.

    If you don't want to create two connection pools, the Spring SingleConnectionFactory is great for this type of scenario because it wraps a single connection instead of creating a pool of connections.

    Hope that helps.

    ReplyDelete
  44. Bruce,

    I tried the approach of using multiple CachingConnectionFactory and jms:listener-container.

    Now how can i generated more connections to queue on the fly, using the same approach as above.

    Also what is the importance of JmsTemplate for jms consumer.

    ReplyDelete
  45. @changed, Yes, the solution for two separate connections to the JMS message broker is to define two unique connection factories.

    But I'm not following what you mean when you say, 'how can i generated more connections to queue on the fly'. Do you mean that you want to increase the number of consumers that are listening to the queue? If this is what you mean, you need to increase the concurrency of the Spring DMLC. This will cause the DMLC to grow and shrink the number of concurrent consumers within the range that you specify. See the example in my blog entry for an example of this.

    ReplyDelete
  46. @changed, I'm sorry, I neglected to answer your last question about the JmsTemplate and what it means to a JMS consumer.

    The JmsTemplate provides the ability to send and receive JMS messages in an synchronous manner (as opposed to using a message listener in an asynchronous manner).

    Sending messages in a synchronous manner is very common and rather straightforward. For an example of using the Spring JmsTemplate for sending messages, see my blog post titled Using Spring to Send JMS Messages.

    Receiving messages in a synchronous manner is something that I don't recommend very often. This is simply because there is rarely a case that requires receiving messages in a synchronous manner and because you will eventually wind up placing the call to the receive() method in a loop anyway (and when you reach that point, there's little reason not to move to the use of a message listener so that messages are pushed from the message broker instead of you setting up your own polling).

    I hope that helps answer your question.

    ReplyDelete
  47. I want to use JMS for throttling incoming requests - like only want to consume and process only when 5 messages have arrived in the queue and process them all at ones or if there are less than 5 but 2 seconds has expired then get them all and process them. How can do this using JMS - looks if I configure a P2P queue a consumer will process a message as soon as a message arrives in the queue - any ideas on how should I configure the JMS using Spring.

    ReplyDelete
  48. @Ujjwal, What you describe is not how message-oriented middleware is intended to work. A message broker is not a database that can be queried. When a consumer registers a subscription on a destination, that consumer will receive messages from that destination regardless of how many messages exist there in total.

    The only way to achieve something similar to what you describe might be by creating a consumer using scheduling (e.g., Spring scheduling) from the consumer side. But that would mean creating/destroying a consumer and a session repeatedly which is very costly in terms of resources.

    Bruce

    ReplyDelete
  49. I was reading about the DefaultMessageListenerContainer and thought maxMessagesPerTask can be configured to say 5 so that the consumer will read max 5 messages at one time. Since DefaultMessageListenerContainer is a polling type of listner (not very sure on this) it will poll to get the messages and I can configure (not sure how) the polling interval to something like 2 seconds or what ever I want. Thought this will work.

    ReplyDelete
  50. @Ujjwal, The maxMessagesPerTask setting actually is used to determine the maximum number of message reception attempts within a given task execution until it reaches its timeout period. The receiveTimeout setting controls how long each attempt will wait for an incoming message. So if maxMessagesPerTask=5 and receiveTimeout=1 the task will not wait until it receives 5 messages, it will simply make 5 attempts to receive a message, each of which will timeout in 1 second. Once the task reaches the time specified by the idleTaskExecutionLimit setting, it will shut down.

    On a side note, the idleTaskExecutionLimit setting controls the idle time limit for a task that has not received any messages within its execution. This setting is typically tuned only if scaling up/down happens too often.

    Hope that helps.

    Bruce

    ReplyDelete
  51. Hello,
    did you try tuning JMS Consumption in Jboss
    I have confid of my listener
    property name="concurrentConsumers" value="3"
    property name="maxConcurrentConsumers" value="20"
    property name="idleConsumerLimit" value="5"

    And I discovered than consumers grows up to 20 under have load but after that when load is low the numbers of cunsumers didn't go down to 5

    ReplyDelete
  52. @Bruce, Is there a way to prevent Spring container from crashing when remote JMS Server is unavailable, when application is starting to deploy on a server.

    My configuration (Spring 2.5) is as follows: -

    1)Weblogic Foreign Queue pointing to TIBCO.
    2)Spring DMLC connecting to Weblogic QCF and Queues.
    3)Spring DMLC config: -

    ontainer-type="simple"
    connection-factory="qcf" concurrency="5" acknowledge="auto"
    destination-type="queue" destination-resolver="destinationResolver">


    When ever, remote JMS server is down, I am not able to deploy the application, as Spring fails to instantiate bean correctly. I am using JNDITemplate to lookup QCF directly from weblogic.

    ReplyDelete
  53. @Mirek, What message broker are you using? JBossMQ?

    Bruce

    ReplyDelete
  54. @Devajyoti, I see in the configuration that you posted you are using the simple message listener container (MLC). The simple MLC does not provide any dynamic behavior such as handling broker reconnections, scaling up/down via the concurrency setting, etc. Try using the default MLC for such behavior as it will automatically attempt reconnects. Then it's just a matter of properly configuring the JNDI lookup of the connection. Below are a couple of examples of the JNDI lookup config:

    Spring 2.5.x:

    <beans:bean id="jmsConnectionFactory"
    class="org.springframework.jndi.JndiObjectFactoryBean">
    <beans:property name="jndiTemplate" ref="jndiTemplate"/>
    <beans:property name="jndiName" value="${jms.jndi.connection.factory.name}"/>
    <beans:property name="lookupOnStartup" value="false" />
    <beans:property name="cache" value="false" />
    <beans:property name="proxyInterface" value="javax.jms.ConnectionFactory" />
    </beans:bean>

    Spring 3.0.x

    <jee:jndi-lookup id="jmsConnectionFactory"
    jndi-name="${jms.connection.factory.name}"
    cache="false"
    lookup-on-startup="false"
    proxy-interface="javax.jms.ConnectionFactory">
    <jee:environment>
    java.naming.factory.initial=${broker.context.class}
    java.naming.provider.url=${broker.url}
    </jee:environment>
    </jee:jndi-lookup>


    Note that both of these examples use the following settings:

    cache=false - Do not cache the connection. Instead, perform a new lookup every time it is used. This causes more overhead but sometimes it is necessary when the message broker is going up and down.

    lookupOnStartup=false - Do not perform the JNDI lookup upon startup of the MLC. Instead, perform the lookup upon first use of of the object. This causes a bit more over for the first use of the object.

    proxyInterface=javax.jms.ConnectionFactory - This tells the JndiObjectFactoryBean how to cast the object after lookup and is important for re-establishing a connection.

    Anyway, the Spring MLC and the JNDI lookup configs each require different settings depending on the situation and the message broker.

    I hope this helps.

    Bruce

    ReplyDelete
  55. @Bruce
    I'm using JBOSS 5.1 default JMS provider (JBOSS messaging)

    ReplyDelete
  56. @Mirek, I have used those options with other message brokers and they work correctly. I have no experience with JBoss Messaging myself, but I don't know of any reason why those settings would not work correctly. What connection factory are you using and how did you define it?

    ReplyDelete
  57. @Bruce,
    My connection factory is java:/JmsXA with config
    <tx-connection-factory>
    <jndi-name>JmsXA</jndi-name>
    <xa-transaction/>
    <rar-name>jms-ra.rar</rar-name>
    <connection-definition>org.jboss.resource.adapter.jms.JmsConnectionFactory</connection-definition>
    <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
    <config-property name="JmsProviderAdapterJNDI" type="java.lang.String">java:/DefaultJMSProvider</config-property>
    <max-pool-size>20</max-pool-size>
    <min-pool-size>3</min-pool-size>
    <idle-timeout-minutes>10</idle-timeout-minutes>
    <security-domain-and-application>JmsXARealm</security-domain-and-application>
    <depends>jboss.messaging:service=ServerPeer</depends>
    </tx-connection-factory>

    ReplyDelete
  58. @Bruce,

    Thanks for a very good post on JMS.

    At our client location, we use ActiveMQ JMS. On consumer side, there is a throttle on how many messages we can process at a given point of time (For instance only 6 messages at a time). This restriction (ridiculous is n't it) is imposed by a third party system that our JMS consumer talk to. If the Queue has 100 messages, consumer should pickup only 6 messages and on processing any message we get can pickup more and process.

    we have the logic to know how many messages are currently being processed and how many more slots are free (messages already processed).

    But wondering how can we implement a solution to pickup messages only when we can process it. So for example, if our code determines, third party can process 3 messages at this time, how can we tell our listener to go pickup only three messages not more that.

    I have a standalone ActiveMQ client written which implements MessageListener.onMessage but it picksup all messages that we have in the queue so far.

    ReplyDelete
  59. @siri, Using a MessageListener to consume messages in an asynchronous manner will not allow you to throttle the number of messages being received. The only way I can think to control the number of messages being received is to consume messages in a synchronous manner using the MessageConsumer.receive() method. This way you could grab one message at a time and just loop six times to grab six messages from the queue.

    Hope that helps.

    ReplyDelete
  60. Hi Bruce,

    We are getting the below error in ActiveMQ logs
    "INFO | Slow KahaDB access: cleanup took 761". After this error, my consumer is not consuming any message from the queue. After restarting my consumer, it started reading from queue. On googling it, we found that the above error message is because of kahadb indexing. But we wanted to know why the consumer stops consuming message even after indexing is complete? kindly provide us if there configuration needs to be done with kahadb?

    ReplyDelete
  61. Hi Bruce,

    Thanks again for the great blog and for all the help you provide answering all the questions posted. You are a great help to the jms + spring community as a whole.

    Bruce we have deployed jboss jms (jboss as 5.1.0.GA) in production with a spring based web application that consumes messages on queues hosted on the server. Everything is working quite well. We only have one quirk that we're trying to fix. The problem is that our consumers (DMLC) stops consuming messages when the jboss server is rebooted. As long as the server is up, there are no problems. But if and when we have to restart the jboss server, we also have to restart our tomcat application servers so 'reinitialize' the consumers. There after it works fine. Is there a way to have the DMLC 'retry' opening new connections without restarting the application ?

    My connectionfactory / jmstemplate / DMLC are configured as follows :















    queue/ResponseQueue













    Any insight into what's wrong with the configuration ?

    @Bruce, if what I'm trying to do is not possible, is it possible to at least 'restart' the consumers without restarting the whole application ? Currently we're restarting tomcat to get the consumers to process the messages. If there's a better way that's less disruptive, it would be great.

    Thanks in advance,
    Gurvinder

    ReplyDelete
  62. Follow up to earlier question - configuration details are as follows :

    bean id="jmsConFactory" class="org.springframework.jndi.JndiObjectFactoryBean"
    - property name="jndiTemplate" ref="jndiTemplate"
    - property name="jndiName" value="ConnectionFactory"
    - property name="lookupOnStartup" value="false"
    - property name="cache" value="false"
    - property name="proxyInterface" value="javax.jms.ConnectionFactory"

    * destination and listener for the response queue

    bean id="ResponseQueue"
    class="org.springframework.jndi.JndiObjectFactoryBean"
    - property name="jndiTemplate" ref bean="jndiTemplate"

    - property name="jndiName" value=queue/ResponseQueue

    bean id="messageListener" class="ws.jms.message.MessageReceiver"

    bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"
    - property name="concurrentConsumers" value="5"
    - property name="connectionFactory" ref="jmsConFactory"
    - property name="destination" ref="ResponseQueue"
    - property name="messageListener" ref="messageListener"
    - property name="sessionTransacted" value="true"

    ReplyDelete
  63. @groove, It sounds to me like you need to create a JMS exception listener to handle reconnection for you when JBoss goes down. Check out this post on StackOverflow about this very subject with JBoss.

    Hope that helps.

    ReplyDelete
  64. @Bruce, thanks for the quick response.

    I have come across the post you referenced. The problem at our end is that we don't see anything in the logs. There is no activity at all. It seems as though the DMLC does not attempt to 'ping' if the connection to Jboss Messaging is still alive. So no exception is thrown at all.

    ReplyDelete
  65. @Krishna, I have never seen the KahaDB indexing cause a consumer to stop it's consumption of messages. I recommend changing the ActiveMQ logging to debug level and seeing if there are any errors or exceptions there. I also encourage you to ask about your issue on the ActiveMQ mailing list. Information on the mailing list is available here:

    http://activemq.apache.org/mailing-lists.html

    ReplyDelete
  66. @groove, My suggestion is to decrease the log level in JBoss to debug so that you can see more of what is happening. This might yield some further details about the problem.

    ReplyDelete
  67. @Radhakrishnan D

    During load testing with non persistent messages, using ActiveMq 5.2 and a DMLC with 100 consumers, we had the same strange starvation problem. Many DMLC threads were waiting on #deque, whilst messages were still pending on the queue.

    After setting the queue prefetch to 0, everything went fine.

    I believe that during a message read from a consummer, the consumer read every waiting messages and enqueues them in it's prefetch buffer. Under consequent load, a consummer may locally buffer many messages, thus delaying the processing, and leaving other consummers starving.

    ReplyDelete
  68. @Pierre-Alain, This is a very good point, thank you for chiming in and catching my dropped ball here concerning @Radhakrishnan's problem!

    The symptom that @Pierre-Alain describes is a common problem where competing consumers are starved of messages because the prefetch size is too high. If a consumer's prefetch size is too high, although messages reside in the queue, the fact that the messages have been prefetched by one of the consumers prevents other consumers from consuming the messages.

    ReplyDelete
  69. Bruce, thanks a bunch for you comment, which helped me to configure my app handle inconsistently available message broker. I had updated my configurations to use the following: -



    <beans:property name="proxyInterface"

    However, I completely missed the part where my MLC was using simple message listener container and you rightly pointed out that the default MLC resolves the dyanamic reconnection issues. I am yet to test the fix completely, but I was able to start the server when the remote message broker was down.

    ReplyDelete
  70. @Devajyoti, I'm very happy to hear that my blog helped you out! Thank you for coming back to comment.

    ReplyDelete
  71. Hi Bruce,

    Is it recommended to use Spring JMS from within Tomcat container? If I am using ActiveMQ as provider, and Spring DMLC for consuming messages, what is a good deployment model?
    a) Standalone AMQ + Standalone Spring DMLC
    b) AMQ, Spring DMLC operating within Tomcat

    All components in a and b are on same node.

    ReplyDelete
  72. @kcorg, Using Spring JMS from within Tomcat is a perfectly acceptable solution. I have done this myself and it works just the same as outside of Tomcat.

    Regarding the two deployment models you mention above, neither one is better than the other. It's more a matter of what suits your situation better or what you prefer. Obviously solution A will require that Tomcat and ActiveMQ be running as separate processes, so you will need to manage them independently. This is not a disadvantage, just a different than solution B where everything is managed inside the Tomcat container.

    Both of these deployment models are discussed in the ActiveMQ in Action book that was published in March (http://bit.ly/2je6cQ). The stand alone model is very common, especially when more than one application will communicate with ActiveMQ to produce or consume messages. This is not due to any technical reason per se, it's just an easy approach.

    Using some of the content from ActiveMQ in Action and expanding it a bit, I also wrote up a series of articles about embedding ActiveMQ in Tomcat as a JNDI resource. The articles were published over at TomcatExpert.com:

    * ActiveMQ and Apache Tomcat: Perfect Partners
    * Integrating ActiveMQ With Apache Tomcat Using Local JNDI
    * Integrating ActiveMQ With Apache Tomcat using Global JNDI

    Of course, these are not the only way to embed ActiveMQ in Tomcat. Another common solution is to configure it via Spring and have a web app load the Spring config. When Spring loads the config, one of the beans it starts up is the ActiveMQ message broker. For this situation, I recommend using the BrokerFactoryBean. This is a convenience class that is a Spring FactoryBean for embedding ActiveMQ in Spring while still utilizing an activemq.xml config file.

    Hopefully this info helps you out with your situation. Let me know if you have any further questions.

    ReplyDelete
  73. Hi Bruce,

    How do you set the prefetch so that consumers are not starved?

    Following is the scenario;

    ActiveMQ runs inside Tomcat configured through global JNDI and the DMLC (for consumer) is activated through a web-application deployed on same Tomcat container.

    1. How can I set the prefetch?
    2. Tomcat starts the JVM. So if I allocate a memory of 4GB to Tomcat then configure ActiveMQ XML to take 2GB through system usage, does that mean AMQ will start with 2GB memory?

    ReplyDelete
  74. @kcorg, Below are my answers:

    1) See the prefetch FAQ entry for info on how to set it for various situations. You can also set it via an XML definition for the connection factory like so:

    <bean id="connectionFactory"
    class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="tcp://localhost:61616"
    p:prefetchPolicy-ref="prefetchPolicy"/>

    <bean id="prefetchPolicy"
    class="org.apache.activemq.ActiveMQPrefetchPolicy"
    p:queuePrefetch="1" />

    Please note that there are other properties besides just the queuePrefetch that are available on the ActiveMQPrefetchPolicy object

    2) Yes, that's correct. ActiveMQ will be limited to a total of 2gb of memory by noting that in the <systemUsage> element.

    ReplyDelete
  75. Thanks Bruce. I have another query.

    Does ActiveMQ by default remove the messages from the persistent store (I am using KahaDB) once the message is delivered?

    ReplyDelete
  76. Yes, that is correct. Once a message has been acknowledged, ActiveMQ will automatically purge it from the persistent store.

    ReplyDelete
  77. Ok thanks Bruce.

    Does it require explicit acknowledge or Auto acknowledge is fine?

    We are seeing a another particular problem.

    I have a message sender load testing application which sends 3000 concurrent messages to AMQ deployed in Tomcat. The payload size is 10KB.On the consumer side I have Spring DMLC with 300 concurrent consumers. The results for enqueue and dequeue rates are 1000 messages per second. Took 3 samples and all gave me identical results.


    Now I increased the numberof consumers to 600 and continued the tests. But it seems increase in conmsumers didn't have any effect on the rate of dequeue. It still shows 1000 msgs/s.

    What are the possible options by which I can increase the throughput?

    ReplyDelete
  78. Hi Bruce,

    I am trying to setup an activemq instance that can handle hundreds of topics. I develop some stress tests and for that I use spring JMS support (JmsTemplate for sending message and a SimpleMessageListenerConsumer on the reception side).

    On the producer side, I use a connection pool (PoolConnectionFactory not CachingConnectionFactory because it can hold only one connection). It seems to work fine.

    On the consumer side I am not using a pool because the documentation of PoolConnectionFactory does not recommend to use it for message consumption. But I do not want to use one connection per consumer (I use one SMLC per listen destination) and using more than one connection for all my consumers will be good thing. Because I use topics, I have only one concurrent consumer but I use a thread pool to handle message processing.

    Do you have a recommandation about consumers for this use case ?

    Another question about performance. Do you think activemq can easely handle hundreds of topic with multiple subscribers. I did not see a lot of benchmarks for that use case but I am trying to build my own bench. I have followed most of the recommendation written on the ActiveMQ website. Do you have anymore tips ?

    Thanks.

    ReplyDelete
  79. @kcorg, How much memory have you given the message broker in the <systemUsage> element? What is the prefetch limit setting for the consumers? Are the consumers all creating sessions from the same connection? This is not inherently a problem, but with 600 consumers, I would begin to create more connections for them. There are many different options that can affect this, but I need some additional info. I suggest that you ask this question on the ActiveMQ user mailing list as it's more appropriate for such back and forth discussion.

    ReplyDelete
  80. @willdit,

    > Do you have a recommandation about consumers for this use case ?

    What kind of recommendation do you need? I'm not sure what you're seeing here. One thing I notice, you should not use any concurrency for a topic consumer as it is pointless. Concurrency is for queue consumers.

    Yes, ActiveMQ can handle hundreds of topics with multiple subscribers. Make sure to adjust JVM memory, broker memory and reduce the thread usage for the broker. Also adjust the prefetch limit for consumers. Take a look at the following page for some instructions in this and other areas.

    http://activemq.apache.org/scaling-queues.html

    Another thing that will help with such a configuration is configuring file cursors instead of vm cursors so that messages and references are both spooled to disk:

    http://activemq.apache.org/scaling-the-depth-of-a-queue.html#ScalingtheDepthofaQueue-Topicsubscribers

    ReplyDelete
  81. Hi Bruce,

    Thanks for the info. I think you have given me a hint and seems with the Connections.

    Please let me know your thoughts on configuring more connections.

    Here is the additional info.

    Deployment:- AMQ inside Tomcat, consumer web applications in same Tomcat using vm://

    Memory:- Tomcat JVM 4GB, Active MQ System Usage-memoryUsageLimit 2GB,System Usage-storeUsageLimit 10GB, System Usage-tempUsageLimit 1GB

    Prefetch setting for Consumers - Here is the policy entry for the queue.

    Decided to set prefetch as 1, as per recomendations in tuning AMQ.


    Are all the consumers creating sessions from same connection? So this depends upon what is the default setting of Spring Default Message Listener Container. It refers to a ConnectionFactory which isnot pooled. Now this is also done based on my understanding of Spring DMLC, where I thought it takes care of all caching and pooling.

    Do you see any issues in these configurations?

    ReplyDelete
  82. @kcorg, The Spring DMLC caches connections, sessions and consumers. For each listener that is registered, a new instance of the DMLC is created and a single connection is used by each instance. All the consumers for a given instance will spawn many sessions from this single connection and this model works very well with JMS spec compliant MOMs. The DMLC handles all of this by itself without the need for the CachingConnectionFactory.

    Generally speaking I don't see any issues with your configuration. In fact, you have even given the JVM a considerable amount of headroom above the amount of memory that ActiveMQ is configured to use. In my experience, giving the JVM some headroom is a good thing (vs. allowing apps to consume 100% of available memory). I would also recommend decreasing the thread usage in ActiveMQ as suggested on the following page under the Reducing Threads heading:

    http://activemq.apache.org/scaling-queues.html

    ReplyDelete
  83. Hi Bruce,

    Thank you for the answer.

    For topics, I don't use concurrency for my topics'consumers in order to avoid duplicated. Furthermore I can use a thread pool to handle business message processing with a SMLC (but with a context switching cost).

    My problem is the following : I don't want to have a connection opened per consumer (so a connection per topic). To my understanding, I should create a SMLC with no concurrency to consume on one topic. The SMLC will get a connection and then a session in order to create a consumer. The connection and the session will not be closed/stopped unless I shutdown the SMLC (stop the consumption).

    => So if I want to listen to 100 topics, I should create 100 SMLC.

    The documentation of PoolConnectionFactory says that it should not by use for message consumption since it doesn't cache Consumer (from the doc : this implementation is only intended for use when sending messages) . First I thought that getting a connection from the pool and never call stop or close will never return the connection to the pool. By looking at the connection pool code, I see that a connection is never really removed from the pool when but put at the end of the queue to be reused but a another thread while it is still in use. So I can use a pool of 5 connection to handle 50 SMLC by using a PoolConnectionFactory and I will never consume more than 5 connections.

    So after some tests, I think we can use PoolConnectionFactory for message consumption despite the warning of the documentation.

    Thanks for the tips about scaling, I will look at them.

    For my use case, I want to use ActiveMQ to dispatch messages very quickly from my producers to my consumers. A consumer on a topic will be created only if a "notification of interest" is received. So some topics may have no consumers registered but some producers may send messages to those topics.

    Message are send without transaction and with an expiration time in order to ensure that they will not be kept in memory too long. Because consumers are designed to be fast, I wanted to use vmCursor.

    File cursors will help to reduce memory consumption but what is the overhead of this type of cursor compare to vm cursors ?

    I can't find a way to see if messages are really expired. I checked the ActiveMQ mailing list and activate advisorySupport and DLQ as recommended but there are no DLQ created. Is there any jmx or trace that can help just to check if it works in order to check if configuration is right ?

    Thanks.

    William.

    ReplyDelete
  84. @willdit, Below are answers to your inquiries:

    > The documentation of PoolConnectionFactory says that it should not by
    > use for message consumption since it doesn't cache Consumer

    This info in the doc is a recommendation because the overhead involved with the constant creation/destruction of consumers can be quite high.

    > First I thought that getting a connection from the pool and never call
    > stop or close will never return the connection to the pool. By looking
    > at the connection pool code, I see that a connection is never really
    > removed from the pool when but put at the end of the queue to be
    > reused but a another thread while it is still in use. So I can use a pool
    > of 5 connection to handle 50 SMLC by using a PoolConnectionFactory
    > and I will never consume more than 5 connections.

    Although a connection is reused and not closed, the pool actually creates many sessions from a single connection. Your logic is correct.

    > File cursors will help to reduce memory consumption but what is the
    > overhead of this type of cursor compare to vm cursors ?

    The overhead of a file cursor is higher than a vm cursor simply because the file cursor is writing to disk, and, as your probably know, disk I/O is costly.

    > I can't find a way to see if messages are really expired. I checked the
    > ActiveMQ mailing list and activate advisorySupport and DLQ as
    > recommended but there are no DLQ created. Is there any jmx or trace
    > that can help just to check if it works in order to check if configuration
    > is right ?

    Notification messages for message expirations are sent out via an advisory message to a topic that is specifically for message expiry notifications. You can certainly check this manually via JMX, but that's not very scalable. The way that I check for expired messages is to create a special consumer to watch for those particular advisory messages. Check out the page about advisory messages for more info. Specifically, the consumer needs to subscribe to the topic whose name is ActiveMQ.Advisory.Expired.Queue for queue message expirations or to the topic whose name is ActiveMQ.Advisory.Expired.Topic for topic message expirations.

    The consumer can be built to do whatever you like, but typically if I need to monitor message expiration, I have made the consumer consume messages from the DLQ and either log a message in a database or reprocess the message (you will need to create a deep copy of the message and change its expiry date, Apache Camel is good for this). You could also choose to make use of the DiscardingDLQ plugin to simply clear the DLQ based on a regex.

    ReplyDelete
  85. Hi,
    I am using DMLC for listening queue, i dont want to listen queue immdiately after the context intialization, i would like to start manually. As you suggested previous threads autoStartup="false", its not working. its able to establish the connection. can you guide me on this issue

    Thank you

    ReplyDelete
  86. @Bals, the autoStartup=true just tells the DMLC whether to start listening. I believe what you are experiencing is the startup of the connection(s) to the message broker which is normal. The only way to delay this is to create those connections is to create your own code to control when this happens. This is because Spring starts up those connections when the Spring Framework container starts up and there is no built in way to delay that from happening.

    ReplyDelete
  87. Hi Bruce,

    I am trying to use a file cursor for my topics like you recommend but I want to use a specific directory (/tmp because it is mapped directly in memory).

    I can't see any configuration property available for that. Is it possible to specify a persistence directory for file cursors ?

    Thanks.

    William.

    ReplyDelete
  88. @William, When using a file cursor, by default, ActiveMQ uses its data directory for this purpose. There is no way to specify a particular directory only for use by the file cursor config. However, you can change the location of the ActiveMQ data directory via the activemq.xml config file.

    ReplyDelete
  89. Hello Bruce,
    In our application we are supposed to be getting 2.5 millions messages in a day on a queue. We are using plain JMS messaging.
    So can I have multiple queue listeners on that particular queue? So that they will keep picking up messages without overloading the JMS queue. We are not using Spring or any other framework but simply JMS messaging? Obviously there wont be any duplicate processing of same message as once picked by an instance of message listener it will be removed from queue.
    Please suggest?

    Thanks
    kapil

    ReplyDelete
  90. @kapil, Answer below:

    * Yes, you can put multiple listeners on a queue so that they compete for messages.

    * You do not need to use Spring, JMS will work with or without Spring. The advantage of using Spring is that it provides many conveniences that makes JMS much easier.

    * There will not be duplicate message processing as long as you use JMS queues. With queues, messages are dispatched in a round-robin fashion across consumers. Also, once a message has been dispatched to a consumer, the message broker won't deliver that same message to another consumer on the queue unless the first consumer's connection dies (this is known as message redelivery).

    ReplyDelete
  91. Thanks a lot Bruce. Your explanations have been really helpful.

    regards,
    Kapil

    ReplyDelete
  92. Hey @Bruce,
    Hope you are doing great.

    I am facing an issue with the connections and I believe you could guide me : )

    We use AMQ broker 5.5 and Spring 3.0 for configuring connection factory and other stuffs.
    The connection factory we are using is PooledConnectionFactory and a part of my config looks like this:

    -bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    -property name="connectionFactory">
    -bean class="org.apache.activemq.ActiveMQConnectionFactory">
    -property name="brokerURL" value="some_url"/>
    -/bean>
    -/property>
    -/bean>

    !-- Spring JMS Template --
    -bean id="jmsTemplate"
    class="org.springframework.jms.core.JmsTemplate">
    -property name="connectionFactory">
    -ref local="jmsFactory" />
    -/property>

    -property name="explicitQosEnabled" value="true"/>
    -property name="timeToLive" value="86400000"/>
    -/bean>

    A few days back our broker crashed and kept restarting with this error:

    # java.lang.OutOfMemoryError: requested
    369384 bytes for Chunk::new. Out of swap space?

    At that point of time, from jconsole, I could not find anything unusual with the broker, except that one of our client application
    which talks with the server (via broker) by sending and listening to messages every *minute* had created ~3000 connections (saw it on jconsole).
    Once we had shut it down, everything was back to normal.

    So, to avoid this I tried closing the connection in finally block doing something like this.

    try {
    connection = myJmsTemplate.getConnectionFactory().createConnection();
    session = connection.createSession(false, 1);

    String messageSelector = "JMSCorrelationID='" + correlationId + "'";
    responseConsumer = session.createConsumer(receiveDestination, messageSelector);
    LOG.info("Starting connection");
    connection.start();
    myJmsTemplate.send(sendDestination, new SimpleTextMessageCreator(
    message, receiveDestination, correlationId));
    LOG.info("Waiting for message with " + messageSelector + " for " + DEFAULT_TIMEOUT + " ms");
    TextMessage responseMessage = (TextMessage) responseConsumer.receive(DEFAULT_TIMEOUT);

    String messageSelector = "JMSCorrelationID='" + correlationId + "'";
    responseConsumer = session.createConsumer(receiveDestination, messageSelector);
    LOG.info("Starting connection");
    connection.start();
    myJmsTemplate.send(sendDestination, new SimpleTextMessageCreator(
    message, receiveDestination, correlationId));
    LOG.info("Waiting for message with " + messageSelector + " for " + DEFAULT_TIMEOUT + " ms");
    TextMessage responseMessage = (TextMessage) responseConsumer.receive(DEFAULT_TIMEOUT);
    LOG.info("Received response : " + responseMessage.getText());
    }
    catch (Someexception e) {do something}
    finally {
    responseConsumer.close();
    session.close();
    connection.close();
    }

    But even then I can see the connections floating around in jconsole and are only lost if the client app which publishes the messages is brought down.
    Can you please help me understand what's happening here and how I can close the connections after each pub sub cycle.

    Thank you in advance,
    Hari

    ReplyDelete
  93. @Bruce - I even tried printing out the idle timeout property and it said 30000. But none of the connections get closed after 30 seconds.

    FYI, the client app is a web app and the process which sends and listens for message to and from server via the broker is run using scheduledExecutorService.scheduleWithFixedDelay().

    I tried programmatically creating the pooled connection factory, connection, session and other stuffs and closed the connection in the finally blocked and it worked like a charm.

    * So, does it mean that the connections which I see on jconsole will never get closed until the app is shutdown or the destroy method of the connection factory is called?

    * Is it a good practice to handle connection factory and connections programmatically?

    ReplyDelete
  94. Hey @Bruce, I kind of need a solution to this problem urgently as we need to put the fix in prod within next 2 days : (

    ReplyDelete
  95. @Hari, the connections/sessions in the pool are not actually closed until the pool is shut down via the PooledConnectionFactory.stop() method.

    However, you don't want to do all of this work programmatically because then you're not getting any benefit of auto pool management. What you need to do is adjust the number of connections/sessions in the pool to an appropriate level and test it thoroughly to be sure that it is working as you expect in your environment before moving to production.

    ReplyDelete
  96. @Bruce, I tried doing as you suggested and this is what I did.

    I had set:
    max connections = 3 ( Number of connections to be set)
    max active = 1 (Number of sessions per connection)
    idle timeout = default (30 secs)
    my process ping duration = 10 secs.

    and observed this:
    For every ping from my process, 3 connections were being opened. During the subsequent pings, the first 2 connections from previous ping are removed while the 3 rd one stays. This I saw on jconsole for each of the pings.

    I was under an impression that as per my above setup, only 3 connections would be opened at the max and would be reused for all subsequent pings, but it seems to be different. Can you kindly explain me how the broker treats these configurations with an example.

    ReplyDelete
  97. @Bruce,
    I also tried exploring the possibilities of tweaking the idleTimeout. Right now the default idleTimeout of PCF is 30 secs and my application pings every 60 secs. When I reduced the pinging period to 10 or 20 (something less than 30) the connections were getting reused / closed as expected. So, I increased the idleTimeout of PCF to 200 secs and my process started pinging every 60 secs. This did not close / reuse the connections as expected.

    However, when I print out the connection factory's idleTimeout it says the number I had set in the spring config but it does not seem to be working. Is this a known issue with AMQ 5.5?

    Here's the config am using for your reference:

    ReplyDelete
  98. This comment has been removed by the author.

    ReplyDelete
  99. @Bruce,
    Here's the config am using:

    -bean id="tcaHeartBeatConnectionFactory"
    class="org.apache.activemq.pool.PooledConnectionFactory"
    destroy-method="stop">
    -property name="connectionFactory" ref="connectionFactory" />
    -property name="idleTimeout" value="200000" />
    -/bean>

    ReplyDelete
  100. @Hari, I suggest you take a look at the ConnectionExpiryEvictsFromPoolTest and expand it as necessary to test your use case if necessary.

    I have also posted a patch to the activemq-pool module in the ActiveMQ source code to add to some debug logging so that you can see what is going on inside of the PooledConnectionFactory as the test is run or even as your app is running. You can download the patch from GitHub here:

    https://gist.github.com/1291630

    To utilize this patch file, expand it into a known directory. Then you will need to grab the latest ActiveMQ source code from the SVN repo, apply the patch and build the source. Here are the steps to follow:

    1) svn co http://svn.apache.org/viewvc/activemq/trunk/
    2) cd ./trunk/
    3) mvn -Dtest=false -DfailIfNoTests=false clean install
    4) cd ./activemq-pool
    5) patch -p0 < /path/to/patch/file
    6) mvn -Dtest=false -DfailIfNoTests=false clean install

    After applying the patch and rebuilding the activemq-pool module, all of the ActiveMQ JARs will be available in your local Maven repo (~/.m2/repository/org/apache/activemq/). Just remember that this is the latest trunk version, 5.6-SNAPSHOT. If you are using Maven to manage your Java app build, then you can just need to change the version of ActiveMQ on which it depends to 5.6-SNAPSHOT and rebuild your project. If you're not using Maven, you will need to grab the JARs from the local Maven repo manually.

    Again, the point of this patch is to see the internal activities of the ActiveMQ PooledConnectionFactory. My hope is that this will help you sort out your issues because I don't have your app or your environment in front of me to test.

    ReplyDelete
  101. @Hari, I forgot to comment on the error you received:

    # java.lang.OutOfMemoryError: requested
    369384 bytes for Chunk::new. Out of swap space?

    Based on this error and on the fact that your application created ~3000 connections in a short period of time, it seems to me that your app is errantly creating connections. What would cause it to loop like this and create so many connections/sessions in such a short period of time?

    ReplyDelete
  102. @Bruce, thank you for the test case. I checked my app thoroughly once more and finally zeroed in on the code which was leaving the connection open. Closing it solved the issue.

    About the ~3000 connections, the client app, which sends sensitive information to the server to be processed, checks if the server is up and running by periodically posting a message on a queue and listening for response from the server on a predefined topic using message selector. This pub/sub happens every minute to make sure server is up and running all ways.

    There were 2 pieces of code which were calling connection.start() separately due to which every ping created 2 connections. Those ~3000 connections were created over a weekend and we noticed only on Monday morning.

    I had closed one and was not aware of the other. Fixed it now and my app is working perfectly.

    Thank you for your suggestions Bruce.

    Regards
    Hari

    ReplyDelete
  103. @Hari, Why is your app explicitly calling connection.start() if it is using the Spring JmsTemplate? Or is this a separate app that does not make use of the JmsTemplate?

    ReplyDelete
  104. @Bruce, so this is the picture.

    My app, sends the message to the server via broker, and immediately starts listening for the response in the same class, i.e. it follows a synchronous messaging.

    I began with using jmstemplate, but looks like by the time the client app sends the message and makes a call to myJmsTemplate(responseDestination, messageSelectory), the response message is already posted by the server on the response destination and is lost. So, the client just hangs off and times out. Basically its a race case am facing where producer is faster than consumer.

    So, for sending message, am using jmstemplate.

    But for receiving response message successfully, I had to specifically get a reference to the connection, create a message consumer out of the session and start the connection even before producing and then start listening for the response, all to make sure it does not looses the response message.

    ReplyDelete
  105. @Bruce, FYI the client app publishes messages to a queue and listens for the response message from server on a topic.

    ReplyDelete
  106. This comment has been removed by the author.

    ReplyDelete
  107. Hi Bruce,

    I have a problem with jms consumers

    My Environment:: ActiveMQ 5.2.0 -NON-PERSISTENT MODE, JDK 1.6.0_27, Jboss 4.2.0 and using springs 2.5.6

    We tried a negative test case by putting the consumer on wait in asynchronous mode.
    using springs-jms consumer with ThreadPoolTaskExecutor.

    I have successful simple producer and consumer programs These are Thread pooled

    Properties used under DefaultMessageListenerContainer are
    concurrentConsumers=5
    maxConcurrentConsumers=10
    cacheLevelName=CACHE_CONSUMER
    sessionTransacted=false
    taskExecutor ref=ListenerThreadPool
    idleTaskExecutionLimit=100
    prefetchSize=1000
    dispatchAsync=false
    retroactive=false
    exclusive=false
    priority=0
    noLocal=false
    maximumPendingMessageLimit=0


    The properties used under org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor are

    corePoolSize=5
    maxPoolSize=10
    queueCapacity=15
    keepAliveSeconds=900
    threadNamePrefix=ListenerThreadPool

    The scenario is

    when a messgae recieved in onmessage() api, we put Thread to wait (like thread to sleep Thread.sleep(60*1000*1000)).

    and i'm continousely publishing the messages ~10, five messages are comsuming properly (base on the configuration "concurrentConsumers = 5". )

    when we produce the sixth,seventh....tenth message the message is getting stuck and no consumer is getting created after five. as my understanding the consumer creation should increase upto ten (base on the configuration "maxConcurrentConsumers = 10".) and message should consume.


    For positive test case it works fine with a load of ~1000 message per second (like without putting thread to wait).

    please, any help can be appriciated.
    Thank you.

    ReplyDelete
  108. @mahi_reddy54, I'm not sure what the exact issue is without seeing a test case that can be debugged. I suggest that you ask your question on the Spring JMS forum here:

    http://forum.springsource.org/forumdisplay.php?30-JMS

    I highly suggest that you prepare the smallest possible test case that demonstrates the problem because even in the Spring JMS forum someone may ask you for this.

    ReplyDelete
  109. hi Bruce,
    Thank for your prompt response.

    I'm unable to publish the small test case. some html tags error was coming so i deleted that and send you previous post. can you please provide me any mailid so that i will attach my test case to that.
    Thanking you

    ReplyDelete
  110. @mahi_reddy54, As I suggested previously, it would be best for you to ask your question and post your test case in the Spring JMS forum here:

    http://forum.springsource.org/forumdisplay.php?30-JMS

    You will get many more people looking at your problem there.

    ReplyDelete
  111. Hi Bruce,
    Thanks a lot for your blog it's really intersting.
    I'm using jms spring 3.0 with ibm mq as broker in a standlone java application(requirement that no app server).
    i use a main classe which :
    *uppload the spring xml config file.
    *send messages to a queue (this will provoque a response on 3 differnent queue).
    So I use 3 DMLC to listen on each queue, and for each received message i need to create a new file with the content of the messsage (IO). it works fine but sometimes even if i receive the message(i have a trace) the file is not generted and no IOException (it's like the process of creating the file stops and returns ) i don't know what can be the problem.
    I'm using a cachedConnectionFactory with 10 sessionCacheSize, the 3 DMLC are using the same connection and for each of them the concurrentConsumers is set to 1 as we want to preserve the order. I can give more info on the code if needed , please help me we will be in prod soon.

    Thanks's,

    ReplyDelete
  112. @silaJava, If the messages are being consumed correctly, then it sounds like a problem with the code that is creating the file.

    ReplyDelete
  113. Hmmm, it looks like Blogger is still not consistently posting comments. I was emailed the following comment by Blogger, but it doesn't appear here for some reason:

    Thank's a lot for your quick reply,
    It's strange i explain to you the use case when receiving message:(for each message)
    1-Print out the received message.
    2-Create a file with the text message.
    3-Create a new file with a formatted text from the file created in step 2.

    The problem is that the same code executed behave differently, let's say for a message all steps are respected, but not for the next message (just step 1 and 2 are performed) and other times juste 1 is performed.and some times all the 3 steps are performed for all received messages.
    It's seems like the JVM end without leaving the DMLC finishes it's work, is the threads creted DMLC are deamons? or should i use a transaction manager (JMS or JTA) i'm using CLIENT_ACKNOWLEDGE as sessionAcknowledgeModeName.
    Is there any parameter to set to avoid this to happen(I test the code for creationg the files in isolation and it's works fine the proof is that sometimes i got all the messages created perfectly in the files).
    Please advice, any indication are really welcome.

    Thank you,

    ReplyDelete
  114. @silaJava, Have you put a debugger on the code to watch it walk through the file creation when it receives a message? Either that or add lots of logging throughout the file creation code. This is just standard debugging, nothing special about JMS here.

    ReplyDelete
    Replies
    1. Hi Bruce,
      Sorry i was in vacation...
      I think that the problem is caused by some thread (DMLC) that not complete normally.
      My application isn't runnig in a JEE container,(standloane) and runs as .exe. i have a main method that just load the applicationcontext for spring .
      I use spring with quartz to call my sendMessage it's working perferctly but on reception i can see the files on the queue but when i run the pg i don't receive all of them it's like that i share messages with another thread that still there from my previous run.
      How to be sure that when the pg ends it's stops all threads and DMLC? i add appContext.registerShutdownHook() in the main method to clean every thing when shutting down but i still have this strange behaviour.
      Here is my main:
      AbstractApplicationContext appContext = new ClassPathXmlApplicationContext(new String[] {"context_dao.xml","context_mq.xml"});
      appContext.registerShutdownHook();
      }catch(Exception e){
      e.printStackTrace();
      System.exit(-1);
      }

      Thanks in advance!

      Delete
    2. @silaJava, Upon the shutdown of the Spring app context, the DefaultMessageListenerContainer.shutdown() method is invoked to stop the connection and interrupt any threads that are running in the DMLC. So that should be closing down without a problem.

      I would make sure that the file creation code works correctly independent of the DMLC. Once that is verified, I would debug what is happening with it while running in the DMLC.

      At this point I honestly have no idea what could possibly be happening because I have not seen a test case.

      Delete
    3. Thanks a lot for the quick reply,
      I will debug and let you know, sorry i don't have a test case.
      Just another detail the program runs as windows service and even if i remove the service and i kill the process (pg.exe and java) some connection TCP still open from my adresse to the MQ, i'm using jmsTemplate and CachingConnectionFactory with sessionCacheSize set to 2, should i explicitly close the connection or it will close when stopping the pg.

      Thanks again

      Delete
    4. This comment has been removed by the author.

      Delete
  115. Hi Bruce,
    I am using spring DMLC. If the broker from which the DMLC is trying to fetch the message is down, DMLC keeps trying to connect every 5 seconds. Will it keep trying forever? Is it possible to increase and decrease the retry interval?

    ReplyDelete
  116. @Sunil, Unfortunately the auto-reconnection code does not appear to be configurable, even in the latest release of the Spring Framework (3.1.x).

    ReplyDelete
  117. Hi Bruce, Thanks for a great article.
    I'm using DMLC with CachingConnectionFactory to consume messages from a IBM MQ Queue (below is my configuration).
    The problem is when the MQ server is restarted, the CachingConnectionFactory didn't refresh the connections and instead I saw a lot of JMSExceptions.

    org.springframework.jms.listener.DefaultMessageListenerContainer - Setup of JMS message listener invoker failed for destination 'queue:///A.B.C.Queue' - trying to recover.
    Cause: MQJMS2002: failed to get message from MQ queue; nested exception is com.ibm.mq.MQException: MQJE001: Completion Code 2, Reason 2019
    javax.jms.JMSException: MQJMS2002: failed to get message from MQ queue

    Upon talking to MQ admins I found that Reason Code 2019 = The connection handle is not valid.
    This made me think that the connections were stale. Once I restarted my application, everything worked fine.
    I thought the reconnectOnException was true by default for CachingConnectionFactory. Any idea where I went wrong or if I missed something?
    Thank you much.

    ReplyDelete
  118. Here is my spring config

    <bean id="myListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="jmsConnectionFactory" />
    <property name="destination" ref="myQueue" />
    <property name="messageListener" ref="myListenerBean" />
    </bean>

    <bean id="jmsConnectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="myMQConnectionFactory" />
    <property name="sessionCacheSize" value="2" />
    </bean>

    <bean name="myMQConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
    <property name="hostName" value="${MQ_SERVER}" />
    <property name="port" value="${MQ_PORT}" />
    <property name="channel" value="${MQ_CHANNEL}" />
    <property name="queueManager" value="${MQ_QUEUE_MGR}" />
    <property name="transportType" value="${MQ_TRANSPORT_TYPE}" />
    </bean>

    ReplyDelete
  119. @Muddu, A couple of questions:

    1) Are you using a connection pool on the MQ broker side and is it configured correctly? In WebsphereMQ this needs to be configured on the server-side.

    2) Are you are using a Spring JndiTemplate to perform the lookup of the myMQConnectionFactory? If so, you need to make sure to specify the proxyInterface on the JndiTemplate. Please see a previous reply from me on this topic that that shows an example of this.

    ReplyDelete
  120. @Bruce - Thanks for the quick reply.
    1) This is a webapp running on Tomcat connecting to WebsphereMQ. I didn't get your question about the connection pool configuration on server-side.
    2) I'm not doing a JNDI lookup, I'm connecting to WebsphereMQ by configuring hostname & port to the MQQueConnectionfactory (please see my spring xml configuration in the above post). I'm passing that bean as targetConnectionFactory for CachingConnectionFactory.

    ReplyDelete
  121. @Muddu, Neither of my points are pertinent to your situation because you're not doing a remote JNDI lookup of the connection factory that is configured to hand out pooled connections from the broker side.

    Try configuring the the CachingConnectionFactory as the exception listener in the DMLC like so:

    <bean id="myListenerContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="destination" ref="myQueue" />
        <property name="messageListener" ref="myListenerBean" />
        <property name="exceptionListener" ref="jmsConnectionFactory" />
    </bean>

    <bean id="jmsConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="myMQConnectionFactory" />
        <property name="sessionCacheSize" value="2" />
    </bean>

    <bean name="myMQConnectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
        <property name="hostName" value="${MQ_SERVER}" />
        <property name="port" value="${MQ_PORT}" />
        <property name="channel" value="${MQ_CHANNEL}" />
        <property name="queueManager" value="${MQ_QUEUE_MGR}" />
        <property name="transportType" value="${MQ_TRANSPORT_TYPE}" />
    </bean>

    The result of this is that every time an exception occurs that is handled by the exception listener, the implementation of the SingleConnectionFactory.onException() method is invoked and it will reset the connection.

    ReplyDelete
  122. hi bruce,
    we are facing a different problem, our jms default listener container stops polling after some time.. it works fine whe started again. what could be the problem we are not able to make out. I read on the blog as well as spring site that, container on start-up obtains fixed number of session to listener. does this could lead a problem?
    what happens if max number of sessions is reached.
    note that we are not using an caching mechanism for session or connection.
    please help we are stuck

    ReplyDelete
    Replies
    1. Rajendra, If I was dealing with this problem the first thing I would do is enable debug level logging for Spring so that I could watch what is happening in the DMLC when it stops. I also recommend implementing a JMS exception listener to be plugged into the DMLC. The ExceptionListener.onException() method will be invoked if any exceptions are being thrown. Hopefully these two steps will provide you with enough information to determine what is happening.

      Just randomly trying different solutions without taking the time to understand the problem will only cost you time and have you chasing your tail. Therefore I do not recommend configuring connection/session caching until you have a good enough grasp of the problem to know that this is the solution.

      Delete
  123. Hi Bruce

    this post is very helpful. i can setup tomcat+spring+activemq perfectly. Recently, i added a new queue which would handle some slow tasks. so i changed prefetch to 0, "jms.prefetchPolicy.all=0", if i leave prefetch to 1, then two tasks could be assigned to 1 consumer even there are other consumers empty.

    but i got a new problem, i can not shutdown the tomcat gracefully anymore. I looked at the thread-dump, All queue consumers are in TIME-WAIT status.

    Do i do some wrong in the setting?

    thanks a lot for help

    eric


    DefaultMessageListenerContainer-1" prio=10 tid=0x00007f6f3c6c4000 nid=0x5c7e in Object.wait() [0x00007f6f3b6f5000]
    java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:485)
    at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87)
    - locked <0x0000000780c59230> (a java.lang.Object)
    at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:452)
    at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:575)

    ReplyDelete
    Replies
    1. @eric, This is a common issue and there is an option on the ActiveMQ TCP transport to address this very situation. You need to add the daemon=true option to the broker URI so that the transport thread in ActiveMQ will terminate cleanly. Below is an example of a broker URI with the transport option:

      tcp://somehost:61616?daemon=true

      After you add this option, then Tomcat should be able to shut down correctly.

      Delete
  124. Hi Bruce

    thanks for the information. i did add it in my broker URL
    failover:(tcp://10.203.9.201:61616?daemon=true,tcp://10.83.3.126:61616?daemon=true)?randomize=true&jms.prefetchPolicy.all=0

    not Shutdown only happened with prefetchPolicy.all=0, when i changed it to prefetchPolicy.all=1, eveything is fine.

    do i fill anything wrong on above broke url?

    thanks

    eric

    ReplyDelete
    Replies
    1. @eric, If you are using message listeners then you cannot set prefetch=0. When using a message listener (i.e., an asynchronous consumer) the consumer does not ask for messages, instead the broker is pushing messages so you need to set prefetch to something greater than zero. The prefetch=0 will only work for a polling consumer (i.e., a synchronous consumer) because it is constantly asking the broker for messages.

      Delete
  125. Hi Bruce

    env: ActiveMQ 5.5.1, tomcat 6.0.32
    I am using Springframework 3.0.6 DefaultMessageListenerContainer, which extends from AbstractPollingMessageListenerContainer, i think it is a polling listener, i noticed this piece of code in ActiveMQMessageConsumer.receive(long timeout)

    if (info.getPrefetchSize() == 0) {
    md = dequeue(-1); // We let the broker let us know when we timeout.
    } else {
    md = dequeue(timeout);
    }

    the timeout is ignored and receiver will wait for ever, could be be related?


    thanks

    eric

    ReplyDelete
    Replies
    1. @eric, Yes, this is exactly what I was referring to in my last reply. Set the prefetch to something greater than zero and you should be fine.

      Delete
  126. hi Bruce

    if i set prefetch=1, then i have another issue.
    I have two machines to consume incoming messages,
    MaxConcurrentConsumers=50, so there are total 100 consumers, there are 90 messages in the queue, i expect all 90 messages are distributed to different consumers (those message may take 1 - 10 minutes to process), but i noticed some consumers have two, some have one, some have 0.

    is there a way to evenly distribute the load?

    thanks

    eric

    ReplyDelete
    Replies
    1. @eric, There is no way to distribute messages in a strict round robin fashion when using message listeners because of the asynchronous nature of message dispatch in the broker. What you're seeing is pretty normal.

      Delete
  127. Hi Bruce,
    How did you deploy the Queue Listener on JBoss?
    Please can share the steps.

    I have questions like:

    How will Jboss know how to start/instantiate the Spring JMS Listener bean?
    How will tell JBOSS abou that?

    Thanks,
    M.Madhukar

    ReplyDelete
    Replies
    1. @Madhukar, In the past, I have used the Spring DMLC from a client-side application running in the Spring container to connect to a remote JBoss instance. Is this what you are asking?

      Delete
    2. This comment has been removed by the author.

      Delete
    3. @Bruce, I wanted to know how to run the Spring DMLC within the JBOSS instance.

      Delete
    4. @Madhukar, from *within* a JBoss instance? Do you mean with the Spring DMLC running on the server-side instead of the client side? Well you should be able to use the DMLC from within any artifact that is deployable to JBoss. But it really depends on what you're trying to achieve. The reason I say this is that if you want to consume and process messages on the server-side, then perhaps Spring Integration is better suited to your use case because you can build a pipeline type of applications. But this is just a guess.

      Delete
  128. Hi Bruce

    Thanks for the conclusion, now i can stop chase this issue now. :)

    eric

    ReplyDelete
  129. Summary : The threadpool size used by DMLC not increasing beyond about 340, though the maxConcurrentConsumers is 1000. The DMLC is initialized mostly with default params, except the maxConcurrentConsumer which is 1000, and the ackMode which is Session.AUTO_ACKNOWLEDGE. The prefetch limit is set to 1 on the shared connection.

    The following is the scalability we are trying to achieve.

    On receiving message M from the broker, the corresponding consumer thread in DMLC is cloning the former into say 4000 messages (M_1,M_2,....M_4000) and posting them back to the broker.

    Now,we have three consuming nodes (meaning three DMLC, one in each box, configured with the above specs), and we want to get to a point where 1000 consumers(in each of the three nodes) are executing the messages concurrently. The maximum concurrency we have seen so far, by grepping the consumer box logs is around 340

    Can you suggest some pointers on how to achieve this by tweaking the appropriate params?
    The most relevant ones I see are increasing the IdleTaxExecutionLimit/IdleConsumerLImit etc

    ReplyDelete
    Replies
    1. @Santosh, You might want to consider creating your own thread pool using the Spring ThreadPoolTaskExecutor and handing it to the DMLC.setTaskExecutor() method. This might allow you to scale further because you have more control. You will need to test this out in your environment to determine if it is an option for you.

      Delete
  130. Hi, Bruce.
    How are you? I have the following issue. Can you please point me in the right direction?

    I have configured Spring DefaultMessageListenerContainer as ActiveMQ consumer consuming messages from a queue. Let's call it "Test.Queue" I have this code deployed in 4 different machines and all the machines are configured to the same ActiveMQ instance to process the messages from the same "Test.Queue" queue.

    I set the max consumer size to 20 as soon as all the 4 machines are up and running, I see the number of consumers count against the queue as 80 (4 * max consumer size = 80)

    Everything is fine when the messages produced and sent to the queue grows high.

    When there are 1000's of messages and among the 80 consumers, let's say one of them is stuck it puts a freeze on Active MQ to stop sending messages to other consumers.

    All messages are stuck in ActiveMQ forever.

    As I have 4 machines with up to 80 consumers , I have no clue as to see which consumer failed to acknowledge.

    I go stop and restart all the 4 machines and when I stop the machine that has the bad consumer which got stuck, then messages starts flowing again.

    I don't know how to configure DefaultMessageListenerContainer to abandon the bad consumer and signal ActiveMQ immediately to start sending messages.

    I was able to create the scenario even without Spring as follows:

    I produced up to 5000 messages and sent them to the "Test.Queue" queue
    I created 2 consumers (Consumer A, B) and in one consumer B's onMessage() method, I put the thread to sleep for a long time ( Thread.sleep(Long.MAX_VALUE)) having the condition like when current time % 13 is 0 then put the thread to sleep.

    Ran these 2 consumers.

    Went to Active MQ and found that the queue has 2 consumers.
    Both A and B are processing messages
    At some point of time consumer B's onMessage() gets called and it puts the Thread to sleep when the condition of current time % 13 is 0 is satisified.
    The consumer B is stuck and it can't acknowledge to the broker
    I went back to Active MQ web console, still see the consumers as 2, but no messages are dequeued.
    Now I created another consumer C and ran it to consume.
    Only the consumer count in ActiveMQ went up to 3 from 2.
    But Consumer C is not consuming anything as the broker failed sending any messages holding them all as it is still waiting for consumer B to acknowledge it.
    Also I noticed Consumer A is not consuming anything
    I go and kill consumer B , now all messages are drained.
    Let's say A, B, C are managed by Spring's DefaultMessageListenerContainer, how do I tweak Spring DefaultMessageListenerContainer to take that bad consumer off the pool (in my case consumer B) after it failed to acknowledge for X number of seconds, acknowledge the broker immediately so that the broker is not holding onto messages forever.

    Thanks for your time.

    Appreciate if I get a solution to this problem.

    ReplyDelete
    Replies
    1. @serverfaces2012, This is just a guess, but what is the prefetchLimit set to on your consumers? If you're just using the default value of 1000, drop it to 1 and see if that helps.

      Delete
  131. Is it possible to define multiple DMLCs all referencing the same connectionfactory bean ? Is it advisable to do so ?

    ReplyDelete
  132. @Edwin Dhondt, sure it's possible to do that and there is no specific reason that you should not do it. As always, I suggest testing it in your environment with your application to measure the results of various tunings that affect performance to find the best combination.

    ReplyDelete
  133. I am using Spring DMLC in web app deployed on WAS 6.1. Following are important configurations:

    1. Using websphere MQ 6.0 with remote queue
    2. Using Spring 3.0.5
    3. Using XA Transactions for Oracle DB and MQ
    4. Using Commonj workmanager with DMLC
    5. Using concurrency in to 3.

    With this configuration messages are consumed and processed, but connections are not released. The log file shows below exception. The connections at the queue manager keeps increasing and at max level stops working with fatal exception.

    JMS$JMSConnectionFactory$JMSManagedConnection@1309 036038. The exception which was received is javax.jms.JMSException: MQJMS2008: failed to open MQ queue
    [9/30/11 17:36:00:607 CEST] 000028e3 ConnectionEve A J2CA0056I: The Connection Manager received a fatal connection error from the Resource Adapter for resource jms/JMSConnectionFactory. The exception which was received is javax.jms.JMSException: MQJMS2008: failed to open MQ queue
    [9/30/11 17:37:24:863 CEST] 000028f4 MCWrapper E J2CA0081E: Method destroy failed while trying to execute method destroy on ManagedConnection com.ibm.ejs.jms.JMSManagedSession@7d6c7d6c
    physical session = com.ibm.mq.jms.MQXAQueueSession@21ac21ac
    session type = XA_SESSION
    enlisted = false
    open session handles = []
    managed session factory = com.ibm.ejs.jms.JMSManagedSessionFactory@69ac69ac from resource No longer available. Caught exception: javax.resource.spi.ResourceAdapterInternalExceptio n: Failed to close session
    at com.ibm.ejs.jms.JMSCMUtils.mapToResourceException( JMSCMUtils.java:176)
    at com.ibm.ejs.jms.JMSManagedSession.destroy(JMSManag edSession.java:557)
    at com.ibm.ejs.j2c.MCWrapper.destroy(MCWrapper.java:1 781)
    at com.ibm.ejs.j2c.FreePool.cleanupAndDestroyMCWrappe r(FreePool.java:722)
    at com.ibm.ejs.j2c.PoolManager.reclaimConnections(Poo lManager.java:4213)
    at com.ibm.ejs.j2c.PoolManager.executeTask(PoolManage r.java:4096)
    at com.ibm.ejs.j2c.TaskTimer.run(TaskTimer.java:89)
    Caused by: javax.jms.JMSException: MQJMS2012: XACLOSE failed
    at com.ibm.mq.jms.services.ConfigEnvironment.newExcep tion(ConfigEnvironment.java:622)
    at com.ibm.mq.jms.MQXASession.close(MQXASession.java: 236)
    at com.ibm.ejs.jms.JMSManagedSession.destroy(JMSManag edSession.java:552)
    ... 5 more
    ---- Begin backtrace for Nested Throwables
    javax.transaction.xa.XAException: XA operation failed, see errorCode
    at com.ibm.mq.MQXAResource.close(MQXAResource.java:17 9)
    at com.ibm.mq.jms.MQXASession.close(MQXASession.java: 212)
    at com.ibm.ejs.jms.JMSManagedSession.destroy(JMSManag edSession.java:552)
    at com.ibm.ejs.j2c.MCWrapper.destroy(MCWrapper.java:1 781)
    at com.ibm.ejs.j2c.FreePool.cleanupAndDestroyMCWrappe r(FreePool.java:722)
    at com.ibm.ejs.j2c.PoolManager.reclaimConnections(Poo lManager.java:4213)
    at com.ibm.ejs.j2c.PoolManager.executeTask(PoolManage r.java:4096)
    at com.ibm.ejs.j2c.TaskTimer.run(TaskTimer.java:89)

    I am not understanding if this is issue with Spring DMLC or with MQ? Can you suggest some solutions to this issue..

    ReplyDelete
    Replies
    1. @Sharad, why is the number of connections hitting the maximum? Do you have the number of connections specified in the connection pool on the WebSphere side matched up with the number of consumers in the Spring DMLC configuration?

      Delete
    2. @Bruce , On Websphere side connection pool setting are as below.
      Maximum connection =10
      Minimum connection =1
      and
      concurrency="3" in the Spring DMLC configuration

      Please let me know if any thing needs to be changed.

      Delete
    3. On the Spring side, is there anything that could be preventing connections from being closed (i.e., your code or config)?

      On the WebSphere side, are there any other settings on the connection pool that could be prohibiting it from reusing existing connections?

      Delete
    4. I am not able to get where is exactly problem exists. Can you give please help in identify the problem.
      - Thanks.

      Delete
  134. Hi Bruce,
    I Want have multiple destinations and for each destination, one consumer!
    Should i have one listener container for each pair (destination,consumer), or i can use one listener container for all pairs?

    ReplyDelete
    Replies
    1. Or what should i do to register multiple consumers (listener) for one destination!?

      Delete
    2. To listen to multiple destinations, you will need to register a listener for each destination. Below is a quick example:

      <bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">
      <bean id="complexMessageListener" class="com.mycompany.ComplexMessageListener">
      <jms:listener-container>
      <jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
      <jms:listener destination="TEST.FOO.BAR.Q" ref="simpleMessageListener" method="onMessage" />
      <jms:listener destination="SOME.OTHER.DEST" ref="complexMessageListener" method="onMessage" />
      </jms:listener-container>

      If you want to register multiple listeners for a single destination, then you just specify one listener but you set the concurrency on the listener-container to the number of consumers you need. Please note that you are allowed to specify more than one listener-container element. There's no restriction to define just one. When you need to specify different settings at the listener-container level for different listeners, this is a situation where you might need more than one listener-container element.

      Delete
    3. Hi again,
      I'm using jms (with jms provider: HornetQ)
      I have one destination with multiple listener on it! i want each message listener get and consume specific message from the destination! I know i should use message selector and i have this sample but i don't want to create MessageConsumer :
      String redSelector = "color='red'";

      MessageConsumer redConsumer = session.createConsumer(queue, redSelector);
      redConsumer.setMessageListener(new SimpleMessageListener("red"));

      TextMessage redMessage = session.createTextMessage("Red");
      redMessage.setStringProperty("color", "red");

      producer.send(redMessage);

      what should i do?

      Delete
    4. I want do this with spring beans and properties :D

      Delete
    5. This comment has been removed by the author.

      Delete
    6. This comment has been removed by the author.

      Delete
    7. To post XML, you need to replace the 'greater than' and 'less than' symbols with their equivalent HTML entities &lt; and &gt;

      Delete
  135. To use the message selector with the DMLC you just need to add the messageSelector property to the listener-container element. Below is an example:

    <jms:listener-container
    container-type="default"
    connection-factory="connectionFactory"
    acknowledge="auto"
    messageSelector=""SYMBOL = 'AAPL'">
    <jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
    </jms:listener-container>

    Hope this helps.

    ReplyDelete
  136. This comment has been removed by the author.

    ReplyDelete
  137. Hello Bruce,

    What is the polling interval of the DMLC ?

    That is, at what basic rate will messages be consumed by a DMLC listener ?

    Let's say I have one DMLC container with one listener listening on queue1.

    Let's say 100 messages are present on that queue.

    What determines how fast these messages are consumed ?

    Will the listener only poll each second ?
    That is, will processing of those messages at least take 100 seconds ?

    Can you elaborate a bit on this please ?
    Thanks.

    ReplyDelete
    Replies
    1. The DMLC's AsyncMessageListenerInvoker class invokes the listener in an ongoing loop, there is no interval at which it is called specifically, and each listener invocation will consume a single message.

      In general, with JMS queue based messaging, more consumers on a queue will consume messages faster. So if you want to consume messages faster, then you should increase the number of consumers on that queue.

      As for the processing of the message, that is dependent upon what kind of processing is being done to the message. For example, if the processing that's taking place on each message is complex, then the receipt of each message is probably going to take longer than just simply receiving the message and moving on.

      Delete
  138. Hi Bruce,
    Do you have any suggestion for tuning jms message production?

    ReplyDelete
    Replies
    1. Regarding message production, one thing that can be problematic is that the JMS connection tuning can be quite different for producers than it is consumers. This requires an evaluation of the properties necessary for the JMS connection and the use of sessions that are spawned from each connection.

      One issue concerns consumer connection starvation where a message broker will prefetch a particular number of messages (the default in ActiveMQ is 1000 -- usually too high) and this causes the first few connections in a pool to get many messages prefetched and the subsequent connections to get very few messages. The problem manifests itself when you notice that messages don't seem to be processed by consumers. To remedy the problem you decrease the prefetch limit (and possibly other properties as well) and the side effect could have a detrimental affect on a producer (of course, this depends on the necessary properties for producers).

      This is just one concern when it comes to message production. Oftentimes, the most significant areas where tuning can occur on the producer side don't have anything to do with the actual message send but instead lie with accessing and processing the data that is used to create the message. Some examples include database access for each message that is produced, heavy amounts of data processing prior to message send that, in some cases, can be deferred, etc. Like I said, items outside the scope the actual message send itself.

      Is there anything specific with regard to message production where you're experiencing trouble?

      Delete
    2. Thanks bruce ;)
      All i want is handling connections, sessions in message production (caching them and closing them when required). Is jmsTemplate a good solution for this? [I'm using jmsTemplate (only for producing messages)]

      Delete
    3. Yes, the Spring JmsTemplate is great for message production. Just make sure to utilize a connection pool with it for the best results. You'll need to experiment to determine the best connection pool settings for your situation.

      Delete
  139. Hi,

    I have set up a JndiTemplate with LDAP as below and I want to ge the MQ ConnectionFactory and MQ QueueName after connecting to LDAP. Now the LDAP credential are good, because I used the exact same thing through a custom adaptor class and that works fine.

    <!-- JNDI template -->

    <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">

    <property name="environment">

    <props>

    <prop>

    <prop key="java.naming.factory.initial">com.sun.jndi.ldap.LdapCtxFactory</prop>

    <prop key="java.naming.provider.url">ldap://server.domain.com</prop>

    <prop key="java.naming.security.principal">xxx<//prop>

    <prop key="java.namimg.security.credentials">xxx<//prop>

    </props>

    </property>

    </bean>

    <!-- ConnectionFactory from JNDI Template -->

    <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">

    <property name="jndiTemplate" ref="jndiTemplate"/>

    <property name="jndiName">
    <value>${msg.queuefactory}
    </property>

    </bean>

    <!-- QueueName from JNDI Template -->

    <bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">

    <property name="jndiTemplate" ref="jndiTemplate"/>

    <property name="jndiName">
    <value>${msg.queuename}

    </property>

    </bean>
    ==============================================

    I am getting this error when I try to run ....

    ==============================================
    Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'connectionFactory' defined in class path resource [MwmsMessageListener.xml]: Invocation of init method failed; nested exception is javax.naming.NamingException: [LDAP: error code 1 - 000004DC: LdapErr: DSID-0C0906DD, comment:

    In order to perform this operation a successful bind must be completed on the connection., data 0, v1772
    ==============================================


    What am I doing worng?? Any help appreciated. Thanks.

    ReplyDelete
    Replies
    1. Although this question is quite far off-topic to my blog post, it looks to me like your problem is explained in the comment section of the error code where it states the following:

      In order to perform this operation a successful bind must be completed on the connection., data 0, v1772

      It looks like your LDAP server is configured to disallow anonymous binds, i.e., you must successfully bind as an authenticated user before you can perform a search. It appears that Spring is trying to perform a search without first performing an authenticated bind. I suggest looking into the Spring configuration so that it performs a bind before it attempts the search.

      Delete
  140. Hi Bruce,
    I create multiple instance of a listener to consume from multiple queue & send it to multiple queue,it was working fine for multiple queue & single messagelistener bean,but when i added multiple listener bean it stopped,here is code sample:-














    Can you please help ASAP?

    ReplyDelete
  141. Sorry my code disappeared,trying again:-
    bean id="messageListenerA3" class="com.org.dept.message.client.AMessageListener">
    <property name="jmsSender" ref="jmsSenderA3"
    <property name="counter" ref="counter"
    </bean
    <bean id="messageListenerA1A2" class="com.org.dept.message.client.AMessageListener"
    <property name="jmsSender" ref="jmsSenderA1A2"
    <property name="counter" ref="counter"
    </bean

    <jms:listener-container
    container-type="default"
    connection-factory="jmsQueueConnectionFactory"
    destination-type="queue"
    destination-resolver ="jmsReceiveDestinationResolver"
    acknowledge="client"
    concurrency="1-10"
    cache="consumer"
    <jms:listener destination="${jboss.queue.a3.name}" ref="messageListenerA3" method="onMessage"
    <jms:listener destination="${jboss.queue.a1a2.name}" ref="messageListenerA1A2" method="onMessage"
    <jms:listener-container

    ReplyDelete
    Replies
    1. You said that you created multiple instances of a single listener to consume from multiple queues and that worked correctly. But when you added multiple listeners that it stopped working. I'm guessing you mean you wired multiple listeners with a single Spring DMLC in the XML config, is that correct? Also, what do you mean it stopped -- the message consumption stopped? Are messages building up in the queue now?

      Wiring multiple beans to a single Spring DMLC in the XML config is definitely a supported feature. I've used it that way many times myself. What happens behind the scenes is that Spring will start up a new instance of the DMLC for each listener (you'll only see this if you look at the bean instances that are running in memory using VisualVM or something similar).

      In the provided configuration above, I see two separate MessageListener beans wired up to the Spring DMLC to consume from two separate queues. I also see that these listeners are configured to use CLIENT_ACKNOWLEDGE mode, a concurrency of between 1-10 and you've told the Spring DMLC to cache the consumers. I don't see a problem with the configuration that would prevent messages from being consumed.

      Delete
  142. Hello Bruce,

    Really good work you are doing here...please keep it up. I am using activemq 5.6.0 with a spring application...the issue i have is, how do i prevent the producer from sending duplicate messages to the queue? is there a configuration for that? and there are times when the consumer just stops picking from the queue without any error being displayed...and all the messages just remain in the queue...what could cause that. Would appreciate a swift reply....thanks in advance

    ReplyDelete
    Replies
    1. Because your questions are not related to this blog post, I encourage you to ask about your issues on the ActiveMQ mailing list. Information on the mailing list is available here:

      http://activemq.apache.org/mailing-lists.html

      Delete
  143. hi bruce,
    I am looking for a spring jms listener ThreadPoolTaskExecutor sample.

    ReplyDelete
    Replies
    1. The Spring DMLC already allows you to inject your own java.util.concurrent.Executor via the DefaultMessageListenerContainer.setTaskExecutor() method. This is much easier than building your own multi-threaded consumer solution.

      Delete
  144. Hi,

    Very helpful blog. Thanks.
    I just wanted to ask you that if I am using a defaultmessagelistenercontainer it executes the onMessage method only when it gets a message from the queue right? So, is there any way to set a particular method to run when the messagelistener is not receiving any message from the queue? Basically i want the user to see that the messagelistener is polling the queue for messages rather then a blank screen. Also can i set a sleep time for the messagelistener so that if the messagelistener does not get any messages for like 1 sec then it will go to sleep for a specific amount of time?

    ReplyDelete
    Replies
    1. Correct, the onMessage() method is only called upon receipt of a message.

      There is no way to call another message when the DMLC is not receiving a message.

      You can specify the amount of time to sleep in between invocations via the sleepInbetweenRecoveryAttempts() method.

      Delete
    2. Thanks Bruce. I would like to know something more from you. Can you give me a detail on what are the properties that I can configure in a jmsTemplate through the xml file?

      Delete
    3. You have access to all the setter methods from the entire class hierarchy for the JmsTemplate class. So take a look at the JavaDoc for the class hierarchy and any setters you see are available in an XML config. For example, if you want to use the setDeliveryMode() method, then you would use 'deliveryMode' as the XML property name (dropping the 'set' prefix).

      Delete
    4. Got it Bruce. Thanks :). One more question. I am using a container to get messages from a IBM Websphere MQ. I noticed some thing like this appreared when I try to stop the application while message receiving is in progress - "Rejecting received message because of the listener container having been stopped in the meantime: " . Does it mean that I am losing my messages? But I am using client acknowledge mode. Please help me on this. Below is the configuration and code of the onMessage method:

      Configuration:




































      Code:

      @Override
      public void onMessage(Message msg){
      if (msg instanceof TextMessage) {
      try {
      this.parseMessage(msg);
      msg.acknowledge();
      totalCount++;
      perRunCount++;

      } catch (Exception e) {
      LOG.error("", e);
      throw new RuntimeException(e);
      }
      }
      }

      Delete
    5. Configuration related to above question.....
      ++++++++++++++++++++++++++++++++++++++++++


      bean id="connectionFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory">
      property name="hostName" value="${mq.server.host}" />
      property name="port" value="${mq.server.port}" />
      property name="channel" value="${mq.server.channel}" />
      property name="queueManager" value="${mq.server.qm}" />
      property name="transportType" value="${mq.server.tt}" />

      -----------------------------------
      bean id="cachedConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"
      p:targetConnectionFactory-ref="connectionFactory"
      p:sessionCacheSize="10" />
      ----------------------------------------
      bean id="destination" class="com.ibm.mq.jms.MQQueue">


      -----------------------------------
      Bean that receive message(s) -->
      bean id="messageReceiver" class="com.verizon.ves.mqadaptor.receiver.MessageReceiver"
      init-method="initialise"
      destroy-method="cleanUp">


      -------------------------------------------
      jms:listener-container
      container-type="default"
      connection-factory="cachedConnectionFactory"
      acknowledge="client">
      jms:listener id="MessageListener" destination="${msg.queue.name}" ref="messageReceiver" method="onMessage" />

      Delete
    6. The warning that you're seeing is due to the default setting for the acceptMessagesWhileStopping property which allows processing to continue while the DMLC is stopping. If you use the setAcceptMessagesWhileStopping() method to set this property to true, the warning should disappear.

      Delete
  145. Bruce,

    Can we use multiple listener for a same queue destination ?
    If so can you give me a small 2 liner spring config example ?

    Thanks

    ReplyDelete
    Replies
    1. Sure, you just specify multiple listeners as shown below:

      ...
      <bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

      <bean id="xyzMessageListener" class="org.blah.SomeOtherMessageListenerBean">

      <bean id="asdfBean" class="foo.bar.baz.AsdfBean">

      <jms:listener-container
      container-type="default"
      connection-factory="connectionFactory"
      acknowledge="auto"
      concurrency="10-50">
      <jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
      <jms:listener destination="TEST.FOO" ref="xyzMessageListener" method="someOtherMethod" />
      <jms:listener destination="TEST.FOO" ref="asdfBean" method="doSomethingWithMessage" />
      </jms:listener-container>
      ...

      Please note that each of the three listeners here will utilize the same settings from the <listener-container> element. If you want different settings for different listeners then you just need to define separate
      <listener-container> elements.

      Delete
    2. Hello Bruce,

      I have one DMLC with concurrency one and with one listener with a message selector.

      I'm using Spring 3.0.5 on WebLogic 10.3.5.
      I have configured spring's weblogic jta txn manager: class="org.springframework.transaction.jta.WebLogicJtaTransactionManager"
      with property name="transactionManagerName" value="javax.transaction.TransactionManager"

      In the WebLogic admin console I see the number of consumers and connections increasing to "huge" numbers.

      That concludes me to believe that the consumers and connections aren't reused/cached, but that each time new ones are created ?

      Do you have any idea what might be causing this and is it something I have to worry about ?

      Delete
    3. This is actually a feature of the Spring DMLC so that it maintains compatibility with Java EE containers.

      Search in my blog post above for the following text:

      'When an external transaction manager is configured, none of the JMS resources are cached by defualt.'

      Read the explanation and see the example.

      For further explanation, see the Javadoc for the DefaultMessageListenerContainer which says:

      '...Note that this listener container will automatically reobtain all JMS handles for each transaction in case of an external transaction manager specified...'

      Delete
    4. This comment has been removed by the author.

      Delete
    5. Thanks very much.

      I've read the explanation and the given example.

      Can I conclude that even if I use an external txn manager (see my configuration below) it is advisable to turn ON caching at the DMLC level ?

      If so, then the next thing is that I should be very carefull with turning it on: "separate connection and listener-container configuration if the cache keys are different and if you want to cache sessions or consumers for reuse".
      Correct ?

      Can you confirm that if I would turn on caching on the dmlc listed in the below example, it won't work reliably because that dmlc has multiple listeners that listen to the same queue but do that with slightly different selectors.

      If so, then I'll have to split them up in multiple DMLC beans ? Correct ?

      If so, must each of those DMLC beans then reference a separate queue connection factory, or can they all reference the same ?

      <jms:listener-container connection-factory="kbiJmsConnectionFactory" destination-resolver="kbiJmsDestinationResolver" transaction-manager="${davinci.transaction.manager}" concurrency="1" prefetch="1">

      <jms:listener id="azkbizndiovListener01Id" destination="${azkbizndiovJmsQueueJndiName01}" selector="(eventId = 'kbizndiov' OR eventId = 'probe') OR (eventId = 'kbiexpzndiov' AND tgtListenerId='${application.runtime.id}/azkbizndiovProcessDelegate01')" ref="azkbizndiovListener01"/>

      <jms:listener id="azkbizndimvListener01Id" destination="${azkbizndimvJmsQueueJndiName01}" selector="(eventId = 'kbizndimv' OR eventId = 'probe') OR (eventId = 'kbiexpzndimv' AND tgtListenerId='${application.runtime.id}/azkbizndimvProcessDelegate01')" ref="azkbizndimvListener01"/>

      <jms:listener id="azkbizndlesListener01Id" destination="${azkbizndlesJmsQueueJndiName01}" selector="(eventId = 'kbizndles' OR eventId = 'probe') OR (eventId = 'kbiexpzndles' AND tgtListenerId='${application.runtime.id}/azkbizndlesProcessDelegate01')" ref="azkbizndlesListener01"/>
      </jms:listener-container>

      Thanks

      Delete
  146. Your assessment is correct. Enabling caching for listeners that are each defined with a unique selector won't really benefit you much because those listeners cannot be used interchangeably by that DMLC. If I recall correctly, it's best to define those listeners in separate DMLCs if you want to use caching. My advice is to test out this config thoroughly in your environment to confirm that the behavior is what you expect.

    ReplyDelete
  147. How i can timeout the send call on jmstemplate

    it is hung at that position , i am using cacheconnectionfactory with ibm mq , and i tried with singleconnectionfactory also.

    It is working for correct mq conf , but let say if mq is not up or busy then it should throw some exception , it is not even throwing any exception...!!

    please help me

    here is stackoverflow question for more information

    http://stackoverflow.com/questions/18128102/jmstemplate-has-no-send-timeout

    Thanks
    Anshul K

    ReplyDelete
    Replies
    1. To my recollection, the JMS 1.1 spec not provide any requirement a send timeout. Furthermore, there is no feature in the Spring JmsTemplate for a send timeout either.

      Regarding IBM WebSphereMQ, I am not an expert on this message broker, so I cannot comment on why it is not throwing an exception.

      Delete
  148. Bruce,
    I am using apache cxf SOAP over JMS plugin to send messages to websphere mq. I am using MQQueueConnectionFactory wrapped inside org.springframework.jms.connection.CachingConnectionFactory. I am specifying a targetDestination as well as replyDestination in cxf JmsConfiguration. The messages are going and response is coming back synchronously.
    The solution works fine. However some messages are being missed. I can see from the MQ log file that a message with correct correlation id is sent from websphere mq. But it is not received JMSTemplate.doReceive() method and the application is throwing a time out error.
    Any idea what could go wrong?
    Thanks.
    Sub

    ReplyDelete
    Replies
    1. Even if the JMSTemplate.doReceive() method is throwing a timeout error, it seems strange to me that the message would disappear from the message broker. When a message is not fully consumed with a proper ack it should remain in the message broker. Unless the message is being redelivered and there is some odd policy in the message broker around redelivered messages.

      Delete
  149. Attaching my configuration:



    hostname


    37120


    QUEUE.MGR


    1


    SYSTEM.DEF.SVRCONN

















    The destination resolver is being used to set the target client property to 1.

    ReplyDelete
  150. Somehow the config details were removed while publishing.

    ReplyDelete
  151. Hi Snyder, how do I mention a custom Listener container in spring xml configuration? Providing the bean name in The container-class doesn't seem to work.

    ReplyDelete