Part Two

What's New in JMS 2.0, Part Two—New Messaging Features

The second part of Nigel Deakin's series on JMS 2.0 in the impending Java EE 7 looks at some of the new messaging features. Reprinted with permission from the Oracle Technology Network, Oracle Corporation.

This article, which is the second in a two-part series, introduces some of the new messaging features introduced in Java Message Service (JMS) 2.0. It assumes a basic familiarity with JMS 1.1.

In Part One, we looked at the new ease-of-use features introduced in JMS 2.0. Here, we look at important new messaging features.

JMS 2.0, which was released in April 2013, is the first update to the JMS specification since version 1.1 was released in 2002. One might think that an API that has remained unchanged for so long has grown moribund and unused. However, if you judge the success of an API standard by the number of different implementations, JMS is one of the most successful APIs around.

In JMS 2.0, the emphasis has been on catching up with ease-of-use improvements that have been made to other enterprise Java technologies in recent years. The opportunity has been taken to introduce a number of new messaging features as well.

JMS 2.0 is part of the Java EE 7 platform and can be used in Java EE Web or EJB applications. It can also be used standalone in a Java SE environment. As I explain below, some of the features are available only in a standalone environment while others are available only in Java EE Web or EJB applications.

Here we discuss five important new messaging features in JMS 2.0.

Multiple Consumers allowed on the Same Topic Subscription

In JMS 1.1, a subscription on a topic was not permitted to have more than one consumer at a time. This meant that the work of processing messages on a topic subscription could not be shared among multiple threads, connections, or Java Virtual Machines (JVMs), thereby limiting the scalability of the application. This restriction has been removed in JMS 2.0 by the introduction of a new kind of topic subscription called a shared subscription.

Let's review how topic subscriptions worked in JMS 1.1. In Listing 1, the createConsumer method on Session is used to create anondurable subscription on the specified topic (we'll discuss durable subscriptions in just a moment):

Listing 1

private void createUnsharedConsumer(ConnectionFactory connectionFactory, Topic topic) 
      throws JMSException {
   Connection connection = connectionFactory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer messageConsumer = session.createConsumer(topic);
   connection.start();
   Message message = messageConsumer.receive(10000);
   while (message != null) {
      System.out.println("Message received: " + ((TextMessage) message).getText());
      message = messageConsumer.receive(10000);
   }
   connection.close();
}

In Listing 1, the consumer will receive a copy of every message sent to the topic. However, what if the application takes a long time to process each message? How do we make the application more scalable by sharing the work of processing these messages between, say, two JVMs, with one JVM processing some of the messages and the other JVM processing the remaining messages?

In JMS 1.1, there's no way to do this in a normal Java SE application. (In Java EE, you could do it using a pool of message-driven beans [MDBs]). If you use createConsumer to create a second consumer in a separate JVM (or a separate thread on the same JVM), each consumer will use a separate subscription, and so it will receive a copy of every message received by the topic. That's not what we want. If you think of a "subscription" as a logical entity that receives a copy of every message sent to the topic, then we want the two consumers to use the same subscription.

JMS 2.0 provides a solution. You can create a "shared" nondurable subscription using a new method: createSharedConsumer. This method is available both on Session (for applications using the classic API) and on JMSContext (for applications using the simplified API). Since the two JVMs need to be able to identify the subscription that they need to share, they need to supply a name to identify the shared subscription, as shown in Listing 2.

Listing 2

private void createSharedConsumer(ConnectionFactory connectionFactory, Topic topic) throws JMSException {
   Connection connection = connectionFactory.createConnection();
   Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   MessageConsumer messageConsumer = session.createSharedConsumer(topic,"mySubscription");
   connection.start();
   Message message = messageConsumer.receive(10000);
   while (message != null) {
      System.out.println("Message received: " + ((TextMessage) message).getText());
      message = messageConsumer.receive(10000);
   }
   connection.close();
}

If you run the code in Listing 2 in two separate JVMs, each message sent to the topic will be delivered to one or the other of the two consumers. This allows them to share the work of processing messages from the subscription.

The same feature is available to applications that use a durable subscription. In JMS 1.1, a durable subscription was created using the method createDurableSubscriber on Session:

   MessageConsumer messageConsumer = session.createDurableSubscription(topic,"myDurableSub");

 

This creates a durable subscription called myDurableSub on the specified topic. However, as before, there is no way to share the work of processing the messages on this durable subscription between two JVMs or between two threads on the same JVM. Depending on exactly what you try to do, you'll either get a JMSException or two different subscriptions.

Once again, JMS 2.0 provides a solution to this problem. You can now create a "shared" durable subscription using the new methodcreateSharedDurableConsumer. This method is available both on Session (for applications using the classic API) and onJMSContext (for applications using the simplified API).

   MessageConsumer messageConsumer = session.createSharedDurableConsumer(topic,"myDurableSub");

 

In summary, then, whereas JMS 1.1 defined two different types of topic subscription, JMS 2.0 defines four types, all of which can be created using either the classic or simplified APIs:

  • Unshared nondurable subscriptions. These are available in both JMS 1.1 and JMS 2.0 and are created using createConsumer. They can have only a single consumer. Setting the client identifier is optional.
  • Unshared durable subscriptions. These are available in both JMS 1.1 and JMS 2.0 and are created usingcreateDurableSubscriber or (in JMS 2.0 only) createDurableConsumer. They can have only a single consumer. Setting the client identifier is compulsory, and the subscription is identified by the combination of the subscription name and client identifier.
  • Shared nondurable subscriptions. These are available in JMS 2.0 only and are created using createSharedConsumer. They can have any number of consumers. Setting the client identifier is optional. The subscription is identified by the combination of the subscription name and the client identifier, if it is set.
  • Shared durable subscriptions. These are available in JMS 2.0 only and are created using createSharedDurableConsumer. They can have any number of consumers. Setting the client identifier is optional. The subscription is identified by the combination of the subscription name and the client identifier, if it is set.

Delivery delay

You can now specify a delivery delay on a message. The JMS provider will not deliver the message until after the specified delivery delay has elapsed.

If you're using the classic API, you need to set the delivery delay (in milliseconds) by calling setDeliveryDelay on theMessageProducer prior to sending the message, as shown in Listing 3.

Listing 3

private void sendWithDeliveryDelayClassic(ConnectionFactory connectionFactory,Queue queue) 
   throws JMSException {

   // send a message with a delivery delay of 20 seconds
   try (Connection connection = connectionFactory.createConnection();){
      Session session = con.createSession();
      MessageProducer messageProducer = session.createProducer(queue);
      messageProducer.setDeliveryDelay(20000);
      TextMessage textMessage = session.createTextMessage("Hello world");
      messageProducer.send(textMessage);
   }
}

If you're using the simplified API, you need to call setDeliveryDelay on the JMSProducer prior to sending the message. This method returns the JMSProducer object, which allows you to create the JMSProducer, set the delivery delay, and send the message all on the same line, as shown in Listing 4.

Listing 4

private void sendWithDeliveryDelaySimplified(ConnectionFactory connectionFactory,Queue queue)
   throws JMSException {

   // send a message with a delivery delay of 20 seconds
   try (JMSContext context = connectionFactory.createContext();){
      context.createProducer().setDeliveryDelay(20000).send(queue,"Hello world");
   }
}

Sending messages asynchronously

Another new feature of JMS 2.0 is the ability to send a message asynchronously.

This feature is available for applications running in Java SE or the Java EE application client container. It is not available to applications that run in the Java EE Web or EJB container.

Normally, when a persistent message is sent, the send method does not return until the JMS client has sent the message to the server and received a reply to notify the client that the message has been safely received and persisted. We call this a synchronous send.

JMS 2.0 introduces the ability to perform an asynchronous send. When a message is sent asynchronously, the send method sends the message to the server and then returns control to the application without waiting for a reply from the server. Instead of being blocked unproductively while the JMS client waits for a reply, the application can do something useful, such as sending a further message or performing some processing.

When a reply is received back from the server to indicate that the message has been received by the server and persisted, the JMS provider notifies the application by invoking the callback method onCompletion on an application-specified CompletionListenerobject.

There are two main ways in which you might use an asynchronous send in an application

  • To allow the application to do something else (such as update the display or write to a database) during the interval when it would otherwise be waiting for a reply from the server
  • To allow a large number of messages to be sent in succession without waiting for a reply from the server after each message

Listing 5 is an example of how you might implement the first of these using the classic API:

Listing 5

private void asyncSendClassic(ConnectionFactory connectionFactory,Queue queue)
   throws Exception {

   // send a message asynchronously
   try (Connection connection = connectionFactory.createConnection();){
      Session session = connection.createSession();
      MessageProducer messageProducer = session.createProducer(queue);
      TextMessage textMessage = session.createTextMessage("Hello world");
      CountDownLatch latch = new CountDownLatch(1);
      MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
      messageProducer.send(textMessage,new MyCompletionListener(latch));
      System.out.println("Message sent, now waiting for reply");

      // at this point we can do something else before waiting for the reply
      // this is not shown here

      // now wait for the reply from the server 
      latch.await();

      if (myCompletionListener.getException()==null){
         System.out.println("Reply received from server");
      } else {
         throw myCompletionListener.getException();
      }
   }
}

The class MyCompletionListener used in Listing 5 is a separate class provided by the application, which implements thejavax.jms.CompletionListener interface, as shown in Listing 6:

Listing 6

class MyCompletionListener implements CompletionListener {

   CountDownLatch latch;
   Exception exception;
   
   public MyCompletionListener(CountDownLatch latch) {
      this.latch=latch;
   }

   @Override
   public void onCompletion(Message message) {
      latch.countDown();
   }

   @Override
   public void onException(Message message, Exception exception) {
      latch.countDown();
      this.exception=exception;
   }

   public Exception getException(){
      return exception;
   }
}

In Listing 6, we use a new method on MessageProducer to send a message without waiting for a reply from the server. This issend(Message message, CompletionListener listener). Using this method to send a message allows the application to do something else while the message is processed in the server. When the application is ready to continue, it uses ajava.util.concurrent.CountDownLatch to wait until the reply has been received from the server. When the reply is received, the application can then proceed with the same degree of confidence that the message has been successfully sent that it would have had after a normal synchronous send.

Sending a message asynchronously is slightly simpler if you are using the JMS 2.0 simplified API, as shown in Listing 7:

Listing 7

private void asyncSendSimplified(ConnectionFactory connectionFactory,Queue queue) 
   throws Exception {

   // send a message asynchronously
   try (JMSContext context = connectionFactory.createContext();){
      CountDownLatch latch = new CountDownLatch(1);
      MyCompletionListener myCompletionListener = new MyCompletionListener(latch);
      context.createProducer().setAsync(myCompletionListener).send(queue,"Hello world");
      System.out.println("Message sent, now waiting for reply");

      // at this point we can do something else before waiting for the reply
      // this is not shown here

      latch.await();
      if (myCompletionListener.getException()==null){
         System.out.println("Reply received from server");
      } else {
         throw myCompletionListener.getException();
      }
   }
 }

In this case, the method setAsync(CompletionListener listener) is called on the JMSProducer prior to calling send(Message message). And since the JMSProducer supports method chaining, you can do this both on the same line.

JMSXDeliveryCount

JMS 2.0 allows applications that receive a message to determine how many times the message has been redelivered. This information can be obtained from the message property JMSXDeliveryCount:

int deliveryCount = message.getIntProperty("JMSXDeliveryCount");

 

JMSXDeliveryCount is not a new property; it was defined in JMS 1.1. However, in JMS 1.1, it was optional for a JMS provider to actually set it, which meant application code that used it was not portable. In JMS 2.0, it becomes mandatory for JMS providers to set this property, allowing portable applications to make use of it.

So why might an application want to know how many times a message has been redelivered?

If a message is being redelivered, this means that a previous attempt to deliver the message failed for some reason. If a message is being redelivered repeatedly, the cause is likely to be a problem in the receiving application. Perhaps the application is able to receive the message but is unable to process it, and so it throws an exception or rolls back the transaction. If there is a prolonged reason why the message cannot be processed, such as the message being "bad" in some way, the same message will be redelivered over and over again, wasting resources and preventing subsequent "good" messages from being processed.

The JMSXDeliveryCount property allows a consuming application to detect that a message has been redelivered multiple times and is, therefore, "bad" in some way. The application can use this information to take some special action (instead of simply triggering yet another redelivery), such as consuming the message and sending it to a separate queue of "bad" messages for administrator action.

Some JMS providers already offer nonstandard facilities to detect messages that have been redelivered repeatedly and divert them to a dead-message queue. While JMS 2.0 defines how such messages should be handled, the JMSXDeliveryCount property allows applications to implement their own "bad" message handling code in a portable way.

Listing 8 shows a MessageListener that throws a RuntimeException to simulate an error in processing a "bad" message. TheMessageListener uses the JMSXDeliveryCount property to detect that a message has been redelivered ten times and take a different action.

Listing 8

class MyMessageListener implements MessageListener {

   @Override
   public void onMessage(Message message) {
      try {
         int deliveryCount = message.getIntProperty("JMSXDeliveryCount");
    if (deliveryCount<10){
       // now throw a RuntimeException 
            // to simulate a problem processing the message
       // the message will then be redelivered
       throw new RuntimeException("Exception thrown to simulate a bad message");
         } else {
       // message has been redelivered ten times, 
       // let's do something to prevent endless redeliveries
       // such as sending it to dead message queue
       // details omitted
    }
      } catch (JMSException e) {
         throw new RuntimeException(e);
      }
   }
}

MDB Configuration Properties

A Java EE application that needs to receive messages asynchronously does so using an MDB, which is configured by specifying a number of configuration properties.

Previous versions of Java EE were distinctly vague about how an MDB was configured. In EJB 3.1, the only configuration properties defined were:

  • acknowledgeMode (used only when transactions are bean-managed; can be set to either Auto-acknowledge or Dups-ok-acknowledge)
  • messageSelector
  • destinationType (can be set to either javax.jms.Queue or javax.jms.Topic)
  • subscriptionDurability (used only for topics; can be set to either Durable or NonDurable)

However, EJB 3.1 didn't define how the application should specify which queue or topic the MDB received messages from. It was left to the application server or resource adapter to define a nonstandard way to do this.

EJB 3.1 also didn't define—when messages were received from a topic and the subscriptionDurability property was set toDurable—how the subscription name and client identifier should be specified. And there was no standard way in EJB 3.1 to specify the connection factory that was used by the MDB to create its connection to the JMS server.

These rather surprising limitations are all addressed in the latest version of Java EE. EJB 3.2 (part of Java EE 7) defines the following additional configuration properties:

  • destinationLookup: The JNDI name of an administratively defined Queue or Topic object that represents the queue or topic from which the MDB will receive messages
  • connectionFactoryLookup: The JNDI name of an administratively defined ConnectionFactory object that the MDB will use to connect to the JMS provider
  • clientId: A client identifier used when the MDB connects to the JMS provider
  • subscriptionName: A durable subscription name used when subscriptionDurability is set to Durable

Most application servers supported clientId and subscriptionName anyway, so defining these as standard is simply formalizing existing practice.

Of course, it was always possible to configure the queue or topic used by a JMS MDB, and many (but not all) application servers provided a way to specify the connection factory. However, the way this was done was nonstandard and varied from one application server to another. Application servers remain free to continue to support these nonstandard mechanisms. However, you can be confident that applications that use destinationLookup and connectionFactoryLookup will work with multiple application servers.

Listing 9 shows a JMS MDB that consumes messages from a durable subscription on a topic and uses the new standard properties:

Listing 9

@MessageDriven(activationConfig = { 
   @ActivationConfigProperty(
      propertyName = "connectionFactoryLookup", propertyValue = "jms/MyConnectionFactory"),
   @ActivationConfigProperty(
      propertyName = "destinationLookup", propertyValue = "jmq/PriceFeed"), 
   @ActivationConfigProperty(
      propertyName = "destinationType ", propertyValue = "javax.jms.Topic "),
   @ActivationConfigProperty(
      propertyName = "subscriptionDurability ", propertyValue = "Durable"), 
   @ActivationConfigProperty(
      propertyName = "subscriptionName", propertyValue = "MySub"), 
   @ActivationConfigProperty(
      propertyName = "clientId", propertyValue = "MyClientId") }) 
   
public class MyMDB implements MessageListener {
   public void onMessage(Message message) {
      ...

Conclusion

All of the five features described above lead to easier messaging for Java developers. Taken together with the ease-of-use features discussed in Part One, they represent a major step forward for JMS 2.0—which should continue to flourish as one of the most successful APIs in the Java landscape.

See Also

Author Bio: Nigel Deakin, a Principal Member of Technical Staff at Oracle, was Specification Lead for JSR 343, Java Message Service 2.0. In addition to his responsibilities for leading the next versions of the JMS specification, he is a member of Oracle's JMS development team, working on Open Message Queue and the GlassFish application server. He has spoken recently at JavaOne in San Francisco, US, and at Devoxx in Antwerp, Belgium, and he is based in Cambridge, UK.

Image courtesy of andrewrennie

Nigel Deakin
Nigel Deakin

What do you think?

Comments

Latest opinions