What’s New in JMS 2.0, Part TwoNew Messaging Features

The second part of Nigel Deakin’s series on JMS 2.0 in the impending Java EE 7 looks at some of the new messaging features. Reprinted with permission from the Oracle Technology Network, Oracle Corporation.
The second part of Nigel Deakin’s series on JMS 2.0 in the impending Java EE 7 looks at some of the new messaging features. Reprinted with permission from the Oracle Technology Network, Oracle Corporation.
This article, which is the second in a two-part series, introduces some of the new messaging features introduced in Java Message Service (JMS) 2.0. It assumes a basic familiarity with JMS 1.1.
In Part One, we looked at the new ease-of-use features introduced in JMS 2.0. Here, we look at important new messaging features.
JMS 2.0, which was released in April 2013, is the first update to the JMS specification since version 1.1 was released in 2002. One might think that an API that has remained unchanged for so long has grown moribund and unused. However, if you judge the success of an API standard by the number of different implementations, JMS is one of the most successful APIs around.
In JMS 2.0, the emphasis has been on catching up with ease-of-use improvements that have been made to other enterprise Java technologies in recent years. The opportunity has been taken to introduce a number of new messaging features as well.
JMS 2.0 is part of the Java EE 7 platform and can be used in Java EE Web or EJB applications. It can also be used standalone in a Java SE environment. As I explain below, some of the features are available only in a standalone environment while others are available only in Java EE Web or EJB applications.
Here we discuss five important new messaging features in JMS 2.0.
Multiple Consumers allowed on the Same Topic Subscription
In JMS 1.1, a subscription on a topic was not permitted to have more than one consumer at a time. This meant that the work of processing messages on a topic subscription could not be shared among multiple threads, connections, or Java Virtual Machines (JVMs), thereby limiting the scalability of the application. This restriction has been removed in JMS 2.0 by the introduction of a new kind of topic subscription called a shared subscription.
Let’s review how topic subscriptions worked in JMS 1.1. In
Listing 1, the createConsumer
method
on Session
is used to create anondurable
subscription on the specified topic (we’ll
discuss durable subscriptions in just a
moment):
Listing 1
private void createUnsharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createConsumer(topic); connection.start(); Message message = messageConsumer.receive(10000); while (message != null) { System.out.println("Message received: " + ((TextMessage) message).getText()); message = messageConsumer.receive(10000); } connection.close(); }
In Listing 1, the consumer will receive a copy of every message sent to the topic. However, what if the application takes a long time to process each message? How do we make the application more scalable by sharing the work of processing these messages between, say, two JVMs, with one JVM processing some of the messages and the other JVM processing the remaining messages?
In JMS 1.1, there’s no way to do this in a normal Java SE
application. (In Java EE, you could do it using a pool of
message-driven beans [MDBs]). If you
use createConsumer
to create a second
consumer in a separate JVM (or a separate thread on the same JVM),
each consumer will use a separate subscription, and so it will
receive a copy of every message received by the topic. That’s not
what we want. If you think of a “subscription” as a logical entity
that receives a copy of every message sent to the topic, then we
want the two consumers to use the same subscription.
JMS 2.0 provides a solution. You can create a “shared”
nondurable subscription using a new
method: createSharedConsumer
. This method is
available both on Session
(for applications
using the classic API) and
on JMSContext
(for applications using the
simplified API). Since the two JVMs need to be able to identify the
subscription that they need to share, they need to supply a name to
identify the shared subscription, as shown in Listing 2.
Listing 2
private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException { Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer messageConsumer = session.createSharedConsumer(topic,"mySubscription"); connection.start(); Message message = messageConsumer.receive(10000); while (message != null) { System.out.println("Message received: " + ((TextMessage) message).getText()); message = messageConsumer.receive(10000); } connection.close(); }
If you run the code in Listing 2 in two separate JVMs, each message sent to the topic will be delivered to one or the other of the two consumers. This allows them to share the work of processing messages from the subscription.
The same feature is available to applications that use a durable
subscription. In JMS 1.1, a durable subscription was created using
the
method createDurableSubscriber
on Session
:
MessageConsumer messageConsumer = session.createDurableSubscription(topic,"myDurableSub");
This creates a durable subscription
called myDurableSub
on the specified topic.
However, as before, there is no way to share the work of processing
the messages on this durable subscription between two JVMs or
between two threads on the same JVM. Depending on exactly what you
try to do, you’ll either get
a JMSException
or two different
subscriptions.
Once again, JMS 2.0 provides a solution to this problem. You can
now create a “shared” durable subscription using the new
methodcreateSharedDurableConsumer
. This method is
available both on Session
(for applications
using the classic API) and onJMSContext
(for
applications using the simplified API).
MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic,"myDurableSub");
In summary, then, whereas JMS 1.1 defined two different types of topic subscription, JMS 2.0 defines four types, all of which can be created using either the classic or simplified APIs:
- Unshared nondurable subscriptions. These are
available in both JMS 1.1 and JMS 2.0 and are created
using
createConsumer
. They can have only a single consumer. Setting the client identifier is optional. - Unshared durable subscriptions. These are
available in both JMS 1.1 and JMS 2.0 and are created
using
createDurableSubscriber
or (in JMS 2.0 only)createDurableConsumer
. They can have only a single consumer. Setting the client identifier is compulsory, and the subscription is identified by the combination of the subscription name and client identifier. - Shared nondurable subscriptions. These are
available in JMS 2.0 only and are created
using
createSharedConsumer
. They can have any number of consumers. Setting the client identifier is optional. The subscription is identified by the combination of the subscription name and the client identifier, if it is set. - Shared durable subscriptions. These are
available in JMS 2.0 only and are created
using
createSharedDurableConsumer
. They can have any number of consumers. Setting the client identifier is optional. The subscription is identified by the combination of the subscription name and the client identifier, if it is set.
Delivery delay
You can now specify a delivery delay on a message. The JMS provider will not deliver the message until after the specified delivery delay has elapsed.
If you’re using the classic API, you need to set the delivery
delay (in milliseconds) by
calling setDeliveryDelay
on
theMessageProducer
prior to sending the message,
as shown in Listing 3.
Listing 3
private void sendWithDeliveryDelayClassic(ConnectionFactory connectionFactory,Queue queue) throws JMSException { // send a message with a delivery delay of 20 seconds try (Connection connection = connectionFactory.createConnection();){ Session session = con.createSession(); MessageProducer messageProducer = session.createProducer(queue); messageProducer.setDeliveryDelay(20000); TextMessage textMessage = session.createTextMessage("Hello world"); messageProducer.send(textMessage); } }
If you’re using the simplified API, you need to
call setDeliveryDelay
on
the JMSProducer
prior to sending the
message. This method returns
the JMSProducer
object, which allows you to
create the JMSProducer, set the delivery delay, and send the
message all on the same line, as shown in Listing 4.
Listing 4
private void sendWithDeliveryDelaySimplified(ConnectionFactory connectionFactory,Queue queue) throws JMSException { // send a message with a delivery delay of 20 seconds try (JMSContext context = connectionFactory.createContext();){ context.createProducer().setDeliveryDelay(20000).send(queue,"Hello world"); } }
Sending messages asynchronously
Another new feature of JMS 2.0 is the ability to send a message asynchronously.
This feature is available for applications running in Java SE or the Java EE application client container. It is not available to applications that run in the Java EE Web or EJB container.
Normally, when a persistent message is sent, the send method does not return until the JMS client has sent the message to the server and received a reply to notify the client that the message has been safely received and persisted. We call this a synchronous send.
JMS 2.0 introduces the ability to perform an asynchronous send. When a message is sent asynchronously, the send method sends the message to the server and then returns control to the application without waiting for a reply from the server. Instead of being blocked unproductively while the JMS client waits for a reply, the application can do something useful, such as sending a further message or performing some processing.
When a reply is received back from the server to indicate that
the message has been received by the server and persisted, the JMS
provider notifies the application by invoking the callback
method onCompletion
on an
application-specified CompletionListener
object.
There are two main ways in which you might use an asynchronous send in an application
- To allow the application to do something else (such as update the display or write to a database) during the interval when it would otherwise be waiting for a reply from the server
- To allow a large number of messages to be sent in succession without waiting for a reply from the server after each message
Listing 5 is an example of how you might implement the first of these using the classic API:
Listing 5
private void asyncSendClassic(ConnectionFactory connectionFactory,Queue queue) throws Exception { // send a message asynchronously try (Connection connection = connectionFactory.createConnection();){ Session session = connection.createSession(); MessageProducer messageProducer = session.createProducer(queue); TextMessage textMessage = session.createTextMessage("Hello world"); CountDownLatch latch = new CountDownLatch(1); MyCompletionListener myCompletionListener = new MyCompletionListener(latch); messageProducer.send(textMessage,new MyCompletionListener(latch)); System.out.println("Message sent, now waiting for reply"); // at this point we can do something else before waiting for the reply // this is not shown here // now wait for the reply from the server latch.await(); if (myCompletionListener.getException()==null){ System.out.println("Reply received from server"); } else { throw myCompletionListener.getException(); } } }
The class MyCompletionListener
used in
Listing 5 is a separate class provided by the application, which
implements
thejavax.jms.CompletionListener
interface, as
shown in Listing 6:
Listing 6
class MyCompletionListener implements CompletionListener { CountDownLatch latch; Exception exception; public MyCompletionListener(CountDownLatch latch) { this.latch=latch; } @Override public void onCompletion(Message message) { latch.countDown(); } @Override public void onException(Message message, Exception exception) { latch.countDown(); this.exception=exception; } public Exception getException(){ return exception; } }
In Listing 6, we use a new method
on MessageProducer
to send a message without
waiting for a reply from the server. This issend(Message
message, CompletionListener listener)
. Using this method to
send a message allows the application to do something else while
the message is processed in the server. When the application is
ready to continue, it uses
ajava.util.concurrent.CountDownLatch
to wait
until the reply has been received from the server. When the reply
is received, the application can then proceed with the same degree
of confidence that the message has been successfully sent that it
would have had after a normal synchronous send.
Sending a message asynchronously is slightly simpler if you are using the JMS 2.0 simplified API, as shown in Listing 7:
Listing 7
private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue) throws Exception { // send a message asynchronously try (JMSContext context = connectionFactory.createContext();){ CountDownLatch latch = new CountDownLatch(1); MyCompletionListener myCompletionListener = new MyCompletionListener(latch); context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world"); System.out.println("Message sent, now waiting for reply"); // at this point we can do something else before waiting for the reply // this is not shown here latch.await(); if (myCompletionListener.getException()==null){ System.out.println("Reply received from server"); } else { throw myCompletionListener.getException(); } } }
In this case, the method setAsync(CompletionListener
listener)
is called on
the JMSProducer
prior to
calling send(Message message)
. And since
the JMSProducer
supports method chaining,
you can do this both on the same line.
JMSXDeliveryCount
JMS 2.0 allows applications that receive a message to determine
how many times the message has been redelivered. This information
can be obtained from the message
property JMSXDeliveryCount
:
int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
JMSXDeliveryCount
is not a new property; it
was defined in JMS 1.1. However, in JMS 1.1, it was optional for a
JMS provider to actually set it, which meant application code that
used it was not portable. In JMS 2.0, it becomes mandatory for JMS
providers to set this property, allowing portable applications to
make use of it.
So why might an application want to know how many times a message has been redelivered?
If a message is being redelivered, this means that a previous attempt to deliver the message failed for some reason. If a message is being redelivered repeatedly, the cause is likely to be a problem in the receiving application. Perhaps the application is able to receive the message but is unable to process it, and so it throws an exception or rolls back the transaction. If there is a prolonged reason why the message cannot be processed, such as the message being “bad” in some way, the same message will be redelivered over and over again, wasting resources and preventing subsequent “good” messages from being processed.
The JMSXDeliveryCount
property allows a
consuming application to detect that a message has been redelivered
multiple times and is, therefore, “bad” in some way. The
application can use this information to take some special action
(instead of simply triggering yet another redelivery), such as
consuming the message and sending it to a separate queue of “bad”
messages for administrator action.
Some JMS providers already offer nonstandard facilities to
detect messages that have been redelivered repeatedly and divert
them to a dead-message queue. While JMS 2.0 defines how such
messages should be handled,
the JMSXDeliveryCount
property allows
applications to implement their own “bad” message handling code in
a portable way.
Listing 8 shows a MessageListener
that
throws a RuntimeException
to simulate an
error in processing a “bad” message.
TheMessageListener
uses
the JMSXDeliveryCount
property to detect
that a message has been redelivered ten times and take a different
action.
Listing 8
class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { int deliveryCount = message.getIntProperty("JMSXDeliveryCount"); if (deliveryCount<10){ // now throw a RuntimeException // to simulate a problem processing the message // the message will then be redelivered throw new RuntimeException("Exception thrown to simulate a bad message"); } else { // message has been redelivered ten times, // let's do something to prevent endless redeliveries // such as sending it to dead message queue // details omitted } } catch (JMSException e) { throw new RuntimeException(e); } } }
MDB Configuration Properties
A Java EE application that needs to receive messages asynchronously does so using an MDB, which is configured by specifying a number of configuration properties.
Previous versions of Java EE were distinctly vague about how an MDB was configured. In EJB 3.1, the only configuration properties defined were:
acknowledgeMode
(used only when transactions are bean-managed; can be set to eitherAuto-acknowledge
orDups-ok-acknowledg
e)messageSelector
destinationType
(can be set to eitherjavax.jms.Queue
orjavax.jms.Topic
)subscriptionDurability
(used only for topics; can be set to eitherDurable
orNonDurable
)
However, EJB 3.1 didn’t define how the application should specify which queue or topic the MDB received messages from. It was left to the application server or resource adapter to define a nonstandard way to do this.
EJB 3.1 also didn’t define—when messages were received from a
topic and
the subscriptionDurability
property was set
toDurable
—how the subscription name and client
identifier should be specified. And there was no standard way in
EJB 3.1 to specify the connection factory that was used by the MDB
to create its connection to the JMS server.
These rather surprising limitations are all addressed in the latest version of Java EE. EJB 3.2 (part of Java EE 7) defines the following additional configuration properties:
destinationLookup
: The JNDI name of an administratively defined Queue or Topic object that represents the queue or topic from which the MDB will receive messagesconnectionFactoryLookup
: The JNDI name of an administratively definedConnectionFactory
object that the MDB will use to connect to the JMS providerclientId
: A client identifier used when the MDB connects to the JMS providersubscriptionName
: A durable subscription name used whensubscriptionDurability
is set toDurable
Most application servers
supported clientId
and subscriptionName
anyway,
so defining these as standard is simply formalizing existing
practice.
Of course, it was always possible to configure the queue or
topic used by a JMS MDB, and many (but not all) application servers
provided a way to specify the connection factory. However, the way
this was done was nonstandard and varied from one application
server to another. Application servers remain free to continue to
support these nonstandard mechanisms. However, you can be confident
that applications that
use destinationLookup
and connectionFactoryLookup
will
work with multiple application servers.
Listing 9 shows a JMS MDB that consumes messages from a durable subscription on a topic and uses the new standard properties:
Listing 9
@MessageDriven(activationConfig = { @ActivationConfigProperty( propertyName = "connectionFactoryLookup", propertyValue = "jms/MyConnectionFactory"), @ActivationConfigProperty( propertyName = "destinationLookup", propertyValue = "jmq/PriceFeed"), @ActivationConfigProperty( propertyName = "destinationType ", propertyValue = "javax.jms.Topic "), @ActivationConfigProperty( propertyName = "subscriptionDurability ", propertyValue = "Durable"), @ActivationConfigProperty( propertyName = "subscriptionName", propertyValue = "MySub"), @ActivationConfigProperty( propertyName = "clientId", propertyValue = "MyClientId") }) public class MyMDB implements MessageListener { public void onMessage(Message message) { ...
Conclusion
All of the five features described above lead to easier messaging for Java developers. Taken together with the ease-of-use features discussed in Part One, they represent a major step forward for JMS 2.0—which should continue to flourish as one of the most successful APIs in the Java landscape.
See Also
Author Bio: Nigel Deakin, a Principal Member of Technical Staff at Oracle, was Specification Lead for JSR 343, Java Message Service 2.0. In addition to his responsibilities for leading the next versions of the JMS specification, he is a member of Oracle’s JMS development team, working on Open Message Queue and the GlassFish application server. He has spoken recently at JavaOne in San Francisco, US, and at Devoxx in Antwerp, Belgium, and he is based in Cambridge, UK.
Image courtesy of andrewrennie