Part Two

What’s New in JMS 2.0, Part Two—New Messaging Features

NigelDeakin
message

The second part of Nigel Deakin’s series on JMS 2.0 in the impending Java EE 7 looks at some of the new messaging features. Reprinted with permission from the Oracle Technology Network, Oracle Corporation.

The second part of Nigel Deakin’s series on JMS 2.0 in the
impending Java EE 7 looks at some of the new messaging
features. Reprinted with permission from the Oracle Technology
Network, Oracle Corporation.

This article, which is the second in a two-part series,
introduces some of the new messaging features introduced in Java
Message Service (JMS) 2.0. It assumes a basic familiarity with JMS
1.1.

In Part
One
, we looked at the new ease-of-use features introduced in
JMS 2.0. Here, we look at important new messaging features.

JMS 2.0, which was released in April 2013, is the first update
to the JMS specification since version 1.1 was released in 2002.
One might think that an API that has remained unchanged for so long
has grown moribund and unused. However, if you judge the success of
an API standard by the number of different implementations, JMS is
one of the most successful APIs around.

In JMS 2.0, the emphasis has been on catching up with
ease-of-use improvements that have been made to other enterprise
Java technologies in recent years. The opportunity has been taken
to introduce a number of new messaging features as well.

JMS 2.0 is part of the Java EE 7 platform and can be used in
Java EE Web or EJB applications. It can also be used standalone in
a Java SE environment. As I explain below, some of the features are
available only in a standalone environment while others are
available only in Java EE Web or EJB applications.

Here we discuss five important new messaging features in JMS
2.0.

Multiple Consumers allowed on the Same Topic Subscription

In JMS 1.1, a subscription on a topic was not permitted to have
more than one consumer at a time. This meant that the work of
processing messages on a topic subscription could not be shared
among multiple threads, connections, or Java Virtual Machines
(JVMs), thereby limiting the scalability of the application. This
restriction has been removed in JMS 2.0 by the introduction of a
new kind of topic subscription called a shared
subscription
.

Let’s review how topic subscriptions worked in JMS 1.1. In
Listing 1, the createConsumer method
on Session is used to create anondurable
subscription
 on the specified topic (we’ll
discuss durable subscriptions in just a
moment):

Listing 1

private void createUnsharedConsumer(ConnectionFactory connectionFactory, Topic topic) 
      throws JMSException {
   Connection connection = connectionFactory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer messageConsumer = session.createConsumer(topic);
   connection.start();
   Message message = messageConsumer.receive(10000);
   while (message != null) {
      System.out.println("Message received: " + ((TextMessage) message).getText());
      message = messageConsumer.receive(10000);
   }
   connection.close();
}

In Listing 1, the consumer will receive a copy
of every message sent to the topic. However, what if the
application takes a long time to process each message? How do we
make the application more scalable by sharing the work of
processing these messages between, say, two JVMs, with one JVM
processing some of the messages and the other JVM processing the
remaining messages?

In JMS 1.1, there’s no way to do this in a normal Java SE
application. (In Java EE, you could do it using a pool of
message-driven beans [MDBs]). If you
use createConsumer to create a second
consumer in a separate JVM (or a separate thread on the same JVM),
each consumer will use a separate subscription, and so it will
receive a copy of every message received by the topic. That’s not
what we want. If you think of a “subscription” as a logical entity
that receives a copy of every message sent to the topic, then we
want the two consumers to use the same subscription.

JMS 2.0 provides a solution. You can create a “shared”
nondurable subscription using a new
method: createSharedConsumer. This method is
available both on Session (for applications
using the classic API) and
on JMSContext (for applications using the
simplified API). Since the two JVMs need to be able to identify the
subscription that they need to share, they need to supply a name to
identify the shared subscription, as shown in Listing 2.

Listing 2

private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {
   Connection connection = connectionFactory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer messageConsumer = session.createSharedConsumer(topic,"mySubscription");
   connection.start();
   Message message = messageConsumer.receive(10000);
   while (message != null) {
      System.out.println("Message received: " + ((TextMessage) message).getText());
      message = messageConsumer.receive(10000);
   }
   connection.close();
}

If you run the code in Listing 2 in two separate JVMs, each
message sent to the topic will be delivered to one or the other of
the two consumers. This allows them to share the work of processing
messages from the subscription.

The same feature is available to applications that use a durable
subscription. In JMS 1.1, a durable subscription was created using
the
method createDurableSubscriber on Session:

   MessageConsumer messageConsumer = session.createDurableSubscription(topic,"myDurableSub");

 

This creates a durable subscription
called myDurableSub on the specified topic.
However, as before, there is no way to share the work of processing
the messages on this durable subscription between two JVMs or
between two threads on the same JVM. Depending on exactly what you
try to do, you’ll either get
JMSException or two different
subscriptions.

Once again, JMS 2.0 provides a solution to this problem. You can
now create a “shared” durable subscription using the new
methodcreateSharedDurableConsumer. This method is
available both on Session (for applications
using the classic API) and onJMSContext (for
applications using the simplified API).

   MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic,"myDurableSub");

 

In summary, then, whereas JMS 1.1 defined two different types of
topic subscription, JMS 2.0 defines four types, all of which can be
created using either the classic or simplified APIs:

  • Unshared nondurable subscriptions. These are
    available in both JMS 1.1 and JMS 2.0 and are created
    using createConsumer. They can have only a single
    consumer. Setting the client identifier is optional.
  • Unshared durable subscriptions. These are
    available in both JMS 1.1 and JMS 2.0 and are created
    usingcreateDurableSubscriber or (in JMS 2.0
    only) createDurableConsumer. They can have only a
    single consumer. Setting the client identifier is compulsory, and
    the subscription is identified by the combination of the
    subscription name and client identifier.
  • Shared nondurable subscriptions. These are
    available in JMS 2.0 only and are created
    using createSharedConsumer. They can have any
    number of consumers. Setting the client identifier is optional. The
    subscription is identified by the combination of the subscription
    name and the client identifier, if it is set.
  • Shared durable subscriptions. These are
    available in JMS 2.0 only and are created
    using createSharedDurableConsumer. They can have
    any number of consumers. Setting the client identifier is optional.
    The subscription is identified by the combination of the
    subscription name and the client identifier, if it is set.

Delivery delay

You can now specify a delivery delay on a message. The JMS
provider will not deliver the message until after the specified
delivery delay has elapsed.

If you’re using the classic API, you need to set the delivery
delay (in milliseconds) by
calling setDeliveryDelay on
theMessageProducer prior to sending the message,
as shown in Listing 3.

Listing 3

private void sendWithDeliveryDelayClassic(ConnectionFactory connectionFactory,Queue queue) 
   throws JMSException {

   // send a message with a delivery delay of 20 seconds
   try (Connection connection = connectionFactory.createConnection();){
      Session session = con.createSession();
      MessageProducer messageProducer = session.createProducer(queue);
      messageProducer.setDeliveryDelay(20000);
      TextMessage textMessage = session.createTextMessage("Hello world");
      messageProducer.send(textMessage);
   }
}

If you’re using the simplified API, you need to
call setDeliveryDelay on
the JMSProducer prior to sending the
message. This method returns
the JMSProducer object, which allows you to
create the JMSProducer, set the delivery delay, and send the
message all on the same line, as shown in Listing 4.

Listing 4

private void sendWithDeliveryDelaySimplified(ConnectionFactory connectionFactory,Queue queue)
   throws JMSException {

   // send a message with a delivery delay of 20 seconds
   try (JMSContext context = connectionFactory.createContext();){
      context.createProducer().setDeliveryDelay(20000).send(queue,"Hello world");
   }
}

Sending messages asynchronously

Another new feature of JMS 2.0 is the ability to send a message
asynchronously.

This feature is available for applications running in Java SE or
the Java EE application client container. It is not available to
applications that run in the Java EE Web or EJB container.

Normally, when a persistent message is sent, the send method
does not return until the JMS client has sent the message to the
server and received a reply to notify the client that the message
has been safely received and persisted. We call this
synchronous send.

JMS 2.0 introduces the ability to perform
an asynchronous send. When a message is sent
asynchronously, the send method sends the message to the server and
then returns control to the application without waiting for a reply
from the server. Instead of being blocked unproductively while the
JMS client waits for a reply, the application can do something
useful, such as sending a further message or performing some
processing.

When a reply is received back from the server to indicate that
the message has been received by the server and persisted, the JMS
provider notifies the application by invoking the callback
method onCompletion on an
application-specified CompletionListenerobject.

There are two main ways in which you might use an asynchronous
send in an application

  • To allow the application to do something else (such as update
    the display or write to a database) during the interval when it
    would otherwise be waiting for a reply from the server
  • To allow a large number of messages to be sent in succession
    without waiting for a reply from the server after each message

Listing 5 is an example of how you might
implement the first of these using the classic API:

Listing 5

private void asyncSendClassic(ConnectionFactory connectionFactory,Queue queue)
   throws Exception {

   // send a message asynchronously
   try (Connection connection = connectionFactory.createConnection();){
      Session session = connection.createSession();
      MessageProducer messageProducer = session.createProducer(queue);
      TextMessage textMessage = session.createTextMessage("Hello world");
      CountDownLatch latch = new CountDownLatch(1);
      MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
      messageProducer.send(textMessage,new MyCompletionListener(latch));
      System.out.println("Message sent, now waiting for reply");

      // at this point we can do something else before waiting for the reply
      // this is not shown here

      // now wait for the reply from the server 
      latch.await();

      if (myCompletionListener.getException()==null){
         System.out.println("Reply received from server");
      } else {
         throw myCompletionListener.getException();
      }
   }
}

The class MyCompletionListener used in
Listing 5 is a separate class provided by the application, which
implements
thejavax.jms.CompletionListener interface, as
shown in Listing 6:

Listing 6

class MyCompletionListener implements CompletionListener {

   CountDownLatch latch;
   Exception exception;
   
   public MyCompletionListener(CountDownLatch latch) {
      this.latch=latch;
   }

   @Override
   public void onCompletion(Message message) {
      latch.countDown();
   }

   @Override
   public void onException(Message message, Exception exception) {
      latch.countDown();
      this.exception=exception;
   }

   public Exception getException(){
      return exception;
   }
}

In Listing 6, we use a new method
on MessageProducer to send a message without
waiting for a reply from the server. This issend(Message
message, CompletionListener listener)
. Using this method to
send a message allows the application to do something else while
the message is processed in the server. When the application is
ready to continue, it uses
ajava.util.concurrent.CountDownLatch to wait
until the reply has been received from the server. When the reply
is received, the application can then proceed with the same degree
of confidence that the message has been successfully sent that it
would have had after a normal synchronous send.

Sending a message asynchronously is slightly simpler if you are
using the JMS 2.0 simplified API, as shown in Listing 7:

Listing 7

private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue) 
   throws Exception {

   // send a message asynchronously
   try (JMSContext context = connectionFactory.createContext();){
      CountDownLatch latch = new CountDownLatch(1);
      MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
      context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world");
      System.out.println("Message sent, now waiting for reply");

      // at this point we can do something else before waiting for the reply
      // this is not shown here

      latch.await();
      if (myCompletionListener.getException()==null){
         System.out.println("Reply received from server");
      } else {
         throw myCompletionListener.getException();
      }
   }
 }

In this case, the method setAsync(CompletionListener
listener)
 is called on
the JMSProducer prior to
calling send(Message message). And since
the JMSProducer supports method chaining,
you can do this both on the same line.

JMSXDeliveryCount

JMS 2.0 allows applications that receive a message to determine
how many times the message has been redelivered. This information
can be obtained from the message
property JMSXDeliveryCount:

int deliveryCount = message.getIntProperty("JMSXDeliveryCount");

 

JMSXDeliveryCount is not a new property; it
was defined in JMS 1.1. However, in JMS 1.1, it was optional for a
JMS provider to actually set it, which meant application code that
used it was not portable. In JMS 2.0, it becomes mandatory for JMS
providers to set this property, allowing portable applications to
make use of it.

So why might an application want to know how many times a
message has been redelivered?

If a message is being redelivered, this means that a previous
attempt to deliver the message failed for some reason. If a message
is being redelivered repeatedly, the cause is likely to be a
problem in the receiving application. Perhaps the application is
able to receive the message but is unable to process it, and so it
throws an exception or rolls back the transaction. If there is a
prolonged reason why the message cannot be processed, such as the
message being “bad” in some way, the same message will be
redelivered over and over again, wasting resources and preventing
subsequent “good” messages from being processed.

The JMSXDeliveryCount property allows a
consuming application to detect that a message has been redelivered
multiple times and is, therefore, “bad” in some way. The
application can use this information to take some special action
(instead of simply triggering yet another redelivery), such as
consuming the message and sending it to a separate queue of “bad”
messages for administrator action.

Some JMS providers already offer nonstandard facilities to
detect messages that have been redelivered repeatedly and divert
them to a dead-message queue. While JMS 2.0 defines how such
messages should be handled,
the JMSXDeliveryCount property allows
applications to implement their own “bad” message handling code in
a portable way.

Listing 8 shows a MessageListener that
throws a RuntimeException to simulate an
error in processing a “bad” message.
TheMessageListener uses
the JMSXDeliveryCount property to detect
that a message has been redelivered ten times and take a different
action.

Listing 8

class MyMessageListener implements MessageListener {

   @Override
   public void onMessage(Message message) {
      try {
         int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
    if (deliveryCount<10){
       // now throw a RuntimeException 
            // to simulate a problem processing the message
       // the message will then be redelivered
       throw new RuntimeException("Exception thrown to simulate a bad message");
         } else {
       // message has been redelivered ten times, 
       // let's do something to prevent endless redeliveries
       // such as sending it to dead message queue
       // details omitted
    }
      } catch (JMSException e) {
         throw new RuntimeException(e);
      }
   }
}

MDB Configuration Properties

A Java EE application that needs to receive messages
asynchronously does so using an MDB, which is configured by
specifying a number of configuration properties.

Previous versions of Java EE were distinctly vague about how an
MDB was configured. In EJB 3.1, the only configuration properties
defined were:

  • acknowledgeMode (used only when transactions
    are bean-managed; can be set to
    either Auto-acknowledge or Dups-ok-acknowledge)
  • messageSelector
  • destinationType (can be set to
    either javax.jms.Queue or javax.jms.Topic)
  • subscriptionDurability (used only for topics;
    can be set to
    either Durable or NonDurable)

However, EJB 3.1 didn’t define how the application should
specify which queue or topic the MDB received messages from. It was
left to the application server or resource adapter to define a
nonstandard way to do this.

EJB 3.1 also didn’t define—when messages were received from a
topic and
the subscriptionDurability property was set
toDurable—how the subscription name and client
identifier should be specified. And there was no standard way in
EJB 3.1 to specify the connection factory that was used by the MDB
to create its connection to the JMS server.

These rather surprising limitations are all addressed in the
latest version of Java EE. EJB 3.2 (part of Java EE 7) defines the
following additional configuration properties:

  • destinationLookup: The JNDI name of an
    administratively defined Queue or Topic object that represents the
    queue or topic from which the MDB will receive messages
  • connectionFactoryLookup: The JNDI name of an
    administratively
    defined ConnectionFactory object that the
    MDB will use to connect to the JMS provider
  • clientId: A client identifier used when the MDB
    connects to the JMS provider
  • subscriptionName: A durable subscription name used
    when subscriptionDurability is set
    to Durable

Most application servers
supported clientId and subscriptionName anyway,
so defining these as standard is simply formalizing existing
practice.

Of course, it was always possible to configure the queue or
topic used by a JMS MDB, and many (but not all) application servers
provided a way to specify the connection factory. However, the way
this was done was nonstandard and varied from one application
server to another. Application servers remain free to continue to
support these nonstandard mechanisms. However, you can be confident
that applications that
use destinationLookup and connectionFactoryLookup will
work with multiple application servers.

Listing 9 shows a JMS MDB that consumes
messages from a durable subscription on a topic and uses the new
standard properties:

Listing 9

@MessageDriven(activationConfig = { 
   @ActivationConfigProperty(
      propertyName = "connectionFactoryLookup", propertyValue = "jms/MyConnectionFactory"),
   @ActivationConfigProperty(
      propertyName = "destinationLookup", propertyValue = "jmq/PriceFeed"), 
   @ActivationConfigProperty(
      propertyName = "destinationType ", propertyValue = "javax.jms.Topic "),
   @ActivationConfigProperty(
      propertyName = "subscriptionDurability ", propertyValue = "Durable"), 
   @ActivationConfigProperty(
      propertyName = "subscriptionName", propertyValue = "MySub"), 
   @ActivationConfigProperty(
      propertyName = "clientId", propertyValue = "MyClientId") }) 
   
public class MyMDB implements MessageListener {
   public void onMessage(Message message) {
      ...

Conclusion

All of the five features described above lead to easier
messaging for Java developers. Taken together with the ease-of-use
features discussed in Part One, they represent a major step forward
for JMS 2.0—which should continue to flourish as one of the most
successful APIs in the Java landscape.

See Also

Author Bio: Nigel Deakin, a Principal Member of Technical
Staff at Oracle, was Specification Lead for JSR 343, Java Message
Service 2.0. In addition to his responsibilities for leading the
next versions of the JMS specification, he is a member of Oracle’s
JMS development team, working on Open Message Queue and the
GlassFish application server. He has spoken recently at JavaOne in
San Francisco, US, and at Devoxx in Antwerp, Belgium, and he is
based in Cambridge, UK.

Image courtesy of andrewrennie

Author
Nigel Deakin, a Principal Member of Technical Staff at Oracle, was Specification Lead for JSR 343, Java Message Service 2.0. In addition to his responsibilities for leading the next versions of the JMS specification, he is a member of Oracle's JMS development team, working on Open Message Queue and the GlassFish application server. He has spoken recently at JavaOne in San Francisco, US, and at Devoxx in Antwerp, Belgium, and he is based in Cambridge, UK.
Comments
comments powered by Disqus