21 May 2010

50% Off ActiveMQ In Action!



The ActiveMQ In Action book has made available a new early access release before going into final review and copy editing. All 14 chapters are included in this MEAP release.

For a limited time you can get the definitive book on ActiveMQ at a 50% discount! Just use the coupon code *activemq50* at the time of checkout. Hurry and get this discounted price while it lasts because this offer expires on Monday, May 31, 2010.

I want to purchase ActiveMQ in Action today!

Tuning JMS Message Consumption In Spring



In a previous blog post titled Using Spring to Receive JMS Messages, I introduced the use of the Spring default message listener container for asynchronous consumption of JMS messages. One very common discovery that folks make when first using JMS is that producers can send messages much faster than consumers can receive and process them. When using JMS queues, I always recommend the use of more consumers than you have producers. (When using JMS topics, you should only use a single consumer to guard against receiving the same message multiple times.) This is a normal situation with message-oriented middleware (MOM) and it is easy to handle if you are using the Spring message listener container.

The Spring DefaultMessageListenerContainer (DMLC) is a highly flexible container for consuming JMS messages that can handle many different use cases via the numerous properties that it provides. For the situation mentioned above, the DMLC offers the ability to dynamically scale the number of consumers. That is, as the number of messages available for consumption increases, the DMLC can automatically increase and decrease the number of consumers. To configure the DMLC to automatically scale the number message consumers, the concurrentConsumers property and the maxConcurrentConsumers property are used. Below is an example JMS namespace style of XML configuration that employs these properties:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://foo.example.com:61616" />

<!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

<!-- A JMS namespace aware Spring configuration for the message listener container -->
<jms:listener-container
container-type="default"
connection-factory="connectionFactory"
acknowledge="auto"
concurrency="10-50">
<jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
</jms:listener-container>
</beans>


Notice the concurrency="10-50" property above. This is a simplified configuration for setting the concurrentConsumers=10 and the maxConcurrentConsumers=50 properties of the DMLC. This tells the DMLC to always start up a minimum of 10 consumers. When a new message has been received, if the maxConcurrentConsumers has not been reached and the value of the idleConsumerLimit property has not been reached, then a new consumer is created to process the message. This behavior from the DMLC continues up to the limit set by the maxConcurrentConsumers property. When no messages are being received and the consumers become idle, the number of consumers is automatically decreased.

(NOTE: The idleConsumerLimit property is used to specify the the maximum number of consumers that are allowed to be idle at a given time. The use of this property was recently clarified a bit in the Spring 3.x trunk. Increasing this limit causes invokers to be created more aggressively. This can be useful to ramp up the number of consumers faster.)

It is important to be aware of a couple of things related to this dynamic scaling:
  1. You should not increase the number of concurrent consumers for a JMS topic. This leads to concurrent consumption of the same message, which is hardly ever desirable.
  2. The concurrentConsumers property and the maxConcurrentConsumers property can be modified at runtime, e.g., via JMX

The dynamic scaling can be tuned even further through the use of the idleTaskExecutionLimit property. The use of this property is best explained by a portion of the Javadoc:


Within each task execution, a number of message reception attempts (according to the "maxMessagesPerTask" setting) will each wait for an incoming message (according to the "receiveTimeout" setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified "idleTaskExecutionLimit", it will shut down (in case of dynamic scaling).

Raise this limit if you encounter too frequent scaling up and down. With this limit being higher, an idle consumer will be kept around longer, avoiding the restart of a consumer once a new load of messages comes in. Alternatively, specify a higher "maxMessagesPerTask" and/or "receiveTimeout" value, which will also lead to idle consumers being kept around for a longer time (while also increasing the average execution time of each scheduled task).


Note the recommendations if you experience dynamic scaling taking place too often. To deal with this situation, you should experiment with increases to one or more of the following properties:
  • idleTaskExecutionLimit - The limit for the number of allowed idle executions of a receive task. The default is 1 causing idle resources to be closed early once a task does not receive a message.
  • maxMessagesPerTask - The maximum number of messages to process in a single task. This determines how long a task lives before being reaped. The default is unlimited (-1) so you may not need to change this property.
  • receiveTimeout - The timeout to be used for JMS receive operations. The default is 1000 ms.
  • As I noted above, in the Spring 3.x trunk, the idleConsumerLimit property was clarified a bit recently and exposed as a writable property. This is yet another property for tuning for situations where you need to ramp up the number of concurrent consumers faster.

One important thing to note about using these various properties for tuning. These are not usable in the JMS namespace style of XML configuration. To use these properties, you must use either a pure Spring XML configuration or straight Java. Below is an example of how to use the receiveTimeout property and the idleTaskExecutionLimit property:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://foo.example.com:61616" />

<!-- The JMS destination -->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"
physicalName="TEST.FOO" />

<!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

<!-- A pure Spring configuration for the message listener container -->
<bean id="msgListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:destination-ref="destination"
p:messageListener-ref="simpleMessageListener"
p:concurrentConsumers="10"
p:maxConcurrentConsumers="50"
p:receiveTimeout="5000"
p:idleTaskExecutionLimit="10"
p:idleConsumerLimit="5" />

</beans>

In the example configuration above, the receiveTimeout property is set to five seconds to tell the DMLC's receive operation to poll for message for five seconds instead of the default one second. Also, the idleTaskExecutionLimit property is set to 10 to allow tasks to execute 10 times instead of the default value of 1. Lastly, the idleConsumerLimit property specifies the limit on the number of idle consumers. This property can be used to more aggressively ramp up the number of concurrent consumers.

In addition to tuning these various properties for dynamic consumer scaling, it is also important to understand that the DMLC can also provide various levels of caching for JMS resources (i.e., JMS connections, sessions and consumers). By default, the DMLC will cache all JMS resources unless an external transaction manager is configured (because some containers require fresh JMS resources for external transactions). When an external transaction manager is configured, none of the JMS resources are cached by defualt. The level of caching can be configured using the cacheLevel property. This property allows for a tiered caching from connection, to session, to consumer. This allows caching of:
  • The connection
  • The connection and the session
  • The connection, the session and the consumer

Below is an example configuration that uses the cacheLevel property to specify consumer level caching:


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- A JMS connection factory for ActiveMQ -->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://foo.example.com:61616" />

<!-- A POJO that implements the JMS message listener -->
<bean id="simpleMessageListener" class="com.mycompany.SimpleMessageListener">

<!-- A JMS namespace aware Spring configuration for the message listener container -->
<jms:listener-container
container-type="default"
connection-factory="connectionFactory"
acknowledge="auto"
concurrency="10-50"
cache="consumer">
<jms:listener destination="TEST.FOO" ref="simpleMessageListener" method="onMessage" />
</jms:listener-container>
</beans>

By caching at the consumer level, this means that the connection, the session and the consumer is cached. Notice that the cacheLevel property can be used with the Spring JMS namespace style of XML configuration.

The session is cached based on ack-mode and the consumer is cached based on the session, the selector and the destination. It's necessary to know this info to better understand where/when the JMS resources can and cannot be reused. For example, caching consumers that use different selectors and consume from different destinations is only going to be relevant if you partition these items appropriately for reuse. That is, you may need to use a separate connection and listener-container configuration if the cache keys are different and if you want to cache sessions or consumers for reuse.

The overall point of the caching is that it can help to reduce the potential recurring thrash involved in creation and destruction of JMS resources. Reducing the thrash by using caching and employing the appropriate partitioning of these resources so as to allow for reuse can definitely improve the overall performance of the application.

Hopefully this post helps you understand how to tune JMS message consumption in Spring. As you employ Spring JMS to your applications and experiment further and further, you will discover how much the DMLC is actually doing for you and how many more features it has beyond what you can easily build yourself.