Talking asynchronous apps

Tutorial: Atmosphere 1.0, WebSocket Portability on the JVM

JeanfrancoisArcand
barometer

JeanFrancois Arcand introduces us to the Atmosphere Framework, a Java/Javascript framework which allows the creation of portable asynchronous applications using Groovy, Scala and Java.

In this JAX Magazine
tutorial, JeanFrancois Arcand introduces us to the Atmosphere
Framework, a Java/Javascript framework which allows the creation of
portable asynchronous applications using Groovy, Scala and
Java.


The Atmosphere Framework
ships with a JavaScript component supporting all modern browsers
and several server components supporting all major Java-based
WebServers. The aim of the framework is to allow a developer to
write an application and let the framework discover the best
communication channel between the client and the server,
transparently.

For example, a developer can write an
application that will use the WebSocket protocol when used with a
browser or server that supports the protocol, and transparently
fall back to HTTP in case the WebSocket protocol isn’t supported.
For example, an Atmosphere application will work fine with Internet
Explorer 6, 7, 8, and 9 using HTTP, and will use the WebSocket
protocol when used with Internet Explorer 10.

To see the power of Atmosphere, let’s build a
simple chat application. Let’s assume our chat application will
only support a single chat room to make the logic simpler.

First, let’s write the server side component.
Atmosphere supports four components:

atmosphere-runtime: the core
module of Atmosphere. All other modules build on top of this one.
This module exposes two simple APIs for building applications:
AtmosphereHandler and Meteor. The
AtmosphereHandler is a simple interface to implements,
whereas the Meteor API is a class that can be retrieved or injected
in Servlets based applications.

atmosphere-jersey: An extension
to the Jersey REST framework. This module exposes a new set of
annotations, thus exposing the Atmosphere’s runtime
functionality.

atmosphere-gwt: An extension to
the GWT framework.

The Server Side

For this article I will use the
atmosphere-runtime to demonstrate how simple it is
to write a simple asynchronous application. Let’s start with the
server component using an AtmosphereHandler. The
AtmosphereHandler is defined as shown in Listing
1
below.

Listing 1: AtmosphereHandler

 

public interface AtmosphereHandler {

    void onRequest(AtmosphereResource resource) throws IOException;

    void onStateChange(AtmosphereResourceEvent event) throws IOException;

    void destroy();
}

 

The onRequest method is called every
time a request gets mapped to the path associated with the
AtmosphereHandler. The path is defined by annotating an
implementation of AtmosphereHandler.

 

@AtmosphereHandlerService(path = “/<path>”)

 

In Atmosphere, an AtmosphereResource
represents a physical connection. An AtmosphereResource
can be used to retrieve information about the request, execute
action on the response, and more importantly be used to suspend the
connection during the onRequest execution. A
WebServer must know when a connection needs to stay open for future
actions (e. g. for WebSockets), and also when the connection needs
to be upgraded to support the protocol, for example http
(streaming, long-polling, jsonp or server side events).

Figure 1:
onStateChange Method

 

The onStateChange method (Figure 1

) will be
invoked by Atmosphere when:

  • a broadcast operation occurs and an action needs to be taken. A
    Broadcaster always initiates a broadcast operation. It can be seen
    as channel of communication. An application can create many
    communication channels and retrieve them using the
    BroadcasterFactory class. An AtmosphereResource
    is always associated with one or several Broadcaster. We can also
    see Broadcasters as an event queue, where you can listen and get
    notified every time new events get broadcasted. Broadcast can
    happens from an onRequest, onStateChange or
    anywhere on the server side.

  • The connection has been closed or timed out (no activity occurs
    on it).

Visually it can be seen as: finally, the destroy method will be
called when Atmosphere is un-deployed or stopped.

Complicated? Fortunately for us, the framework ships with
AtmosphereHandlers that can be used in almost all
scenarios, which allows the developer to focus on the application
logic whilst it already handles the connection lifecycle. Let’s use
the OnMessage<T> AtmosphereHandler

to
write our application (Listing 2).

Listing 2: OnMessage<T> Atmosphere
Handler

 

  @AtmosphereHandlerService(
          path="/chat",
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                   BroadcastOnPostAtmosphereInterceptor.class})
  public class ChatRoom extends OnMessage<String> {
  
      private final ObjectMapper mapper = new ObjectMapper();
     
       @Override
     public void onMessage(AtmosphereResponse response, String message) throws IOException {
          response.getWriter()
                     .write(mapper.writeValueAsString(mapper.readValue(message, Data.class)));
    }   
  }   

 

The main idea here is to delegate as much as possible of the
connection life cycle to Atmosphere’s ready-to-use component.
First, we annotate the ChatRoom class with the
@AtmosphereHandlerService annotation and define the path
and the interceptors. AtmosphereInterceptors can be seen
as Filters that always gets invoked before and after
AtmosphereHandler#onRequest.
AtmosphereInterceptor is useful for manipulating the
request/response, handling the life cycle, etc. For example, the
suspend and broadcast (Figure 2).

Figure 2: Suspend and
Broadcast

As described above, two interceptors can be used
to first, suspend the request
(AtmosphereResourceLifeCycleInterceptor), and then
broadcast the data received on every POST
(BroadcastOnPostAtmosphereInterceptor). Great, we can
focus only on the application’s logic.

Now instead of writing our own complete
AtmosphereHandler, we can extend the
OnMessage<T> handler, which delegates the broadcast
operation to the onMessage method (Line 10). For our chat
application, it just means we write what we receive (Line 11). If
we have 50 connected users, that means the onMessage will
be called 50 times so the 50 users gets the message.

We are using JSON between the client and the
server. The client sent:

 {"message":"Hello World","author":"John Doe"}

 

and the server sent back to the connected
browsers

 

{"message":"Hello World","author":"John Doe","time":1348578675087}

 

On Line 11 we use the Jackson library for
reading the message and write it back, augmented with the time the
message got received. The Data class is just a simple POJO
(Listing 3).

Listing 3: Data Class

 

   public final static class Data {

        private String message;
        private String author;
        private long time;

        public Data() {
            this("","");
        }

        public Data(String author, String message) {
            this.author = author;
            this.message = message;
            this.time = new Date().getTime();
        }

        public String getMessage() {
            return message;
        }

        public String getAuthor() {
            return author;
        }

        public void setAuthor(String author) {
            this.author = author;
        }

        public void setMessage(String message) {
            this.message = message;
        }

        public long getTime() {
            return time;
        }

        public void setTime(long time) {
            this.time = time;
        }

    }

      
    

The Client Side – atmosphere.js

That’s it for the server side. Now let’s use the
atmosphere.js to write the client side. First,
let’s look at the code (Listing 4).

Listing 4: Atmosphere.js client code

 

  $(function () {
      "use strict";
  
      var header = $('#header');
      var content = $('#content');
      var input = $('#input');
      var status = $('#status');
      var myName = false;
      var author = null;
     var logged = false;
     var socket = $.atmosphere;
     var subSocket;
     var transport = 'websocket';
 
     // We are now ready to cut the request
     var request = { url: document.location.toString() + 'chat',
         contentType : "application/json",
         trackMessageSize: true,
         shared : true,
         transport : transport ,
         fallbackTransport: 'long-polling'};
 
     request.onOpen = function(response) {
         content.html($('<p>', { text: 'Atmosphere connected using ' + response.transport }));
         input.removeAttr('disabled').focus();
         status.text('Choose name:');
         transport = response.transport;
         
         if (response.transport == "local") {
             subSocket.pushLocal("Name?");
         }   
     };  
     
     request.onTransportFailure = function(errorMsg, request) {
         jQuery.atmosphere.info(errorMsg);
         if (window.EventSource) {
             request.fallbackTransport = "sse";
             transport = "see";
         }   
         header.html($('<h3>', { text: 'Atmosphere Chat. Default transport is WebSocket, fallback is ' + request.fallbackTransport }));
     };  
     
     request.onMessage = function (response) {
     
         // We need to be logged first.
         if (!myName) return;
         
         var message = response.responseBody;
         try {
            var json = jQuery.parseJSON(message);
         } catch (e) {
             console.log('This doesn't look like a valid JSON: ', message.data);
             return;
         }   
        
         if (!logged) {
             logged = true;
            status.text(myName + ': ').css('color', 'blue');
             input.removeAttr('disabled').focus();
            subSocket.pushLocal(myName);
         } else {
             input.removeAttr('disabled');
             
             var me = json.author == author;
             var date = typeof(json.time) == 'string' ? parseInt(json.time) : json.time;
             addMessage(json.author, json.message, me ? 'blue' : 'black', new Date(date));
         }   
     };  
     
     request.onClose = function(response) {
        logged = false;
     }   
     
     subSocket = socket.subscribe(request);
     
     input.keydown(function(e) {
         if (e.keyCode === 13) {
             var msg = $(this).val();
            if (author == null) {
                 author = msg;
             }
                 
             subSocket.push(jQuery.stringifyJSON({ author: author, message: msg }));
             $(this).val('');
             
             input.attr('disabled', 'disabled');
             if (myName === false) {
                 myName = msg;
             }
         }       
     });     
          
     function addMessage(author, message, color, datetime) {
         content.append('<p><span style="color:' + color + '">' + author + '</span> @ ' +
             + (datetime.getHours() < 10 ? '0' + datetime.getHours() : datetime.getHours()) + ':'
             + (datetime.getMinutes() < 10 ? '0' + datetime.getMinutes() : datetime.getMinutes())
             + ': ' + message + '</p>');
     }       
  });  

 

There is a lot of extra in the code in Listing
4
, so let’s only describe the atmosphere.js important
parts. First, we initialize a connection (called socket in
the code):

 

 var socket = $.atmosphere;

 

The next step is to define some functions callback. For
this article, let’s define only a one subset. First, we define an
onOpen function that gets invoked when the underlying
transport is connected to the server (line 24).
There we just display the transport that was used for connecting to
the server. The transport is specified on the request object, which
is defined as:

 

 var request = { url: document.location.toString() + 'chat',
          contentType : "application/json",
          transport : transport ,
          fallbackTransport: 'long-polling'};

 

Here we want to use the WebSocket transport by
default, and fallback to long-polling in case WebSocket is not
supported either by the browser or the server. In our
onOpen function we just displayed which transport was
used.

Note: You can also change the transport when
WebSocket is failing by adding an onTransportFailure
function:

request.onTransportFailure = function(errorMsg, request) {
        if (window.EventSource) {
            request.fallbackTransport = "sse";
            transport = "see";
        }   

Here for demonstration purposes, we shall look for the
EventSource object (HTML5 Server Sides Events) and, if
available, switch the transport to use it. The beauty here is: you
don’t need to use a special API. All transports are handled the
same way using the atmosphere.js.

Next we define the onMessage function, which will be
invoked every time we receive data from the server

 

   request.onMessage = function (response) {
      …..
   }

 

Here we just display the received message. To connect and send
data to the server, all we need to do is to invoke:

 

subSocket = socket.subscribe(request);

 

Once subscribed, we are ready to receive and send data. To send
data, we are using the subSocket object returned from the
subscribe operation. If the WebSocket transport is in use, the
subSocket will reference the WebSocket connection (because
the protocol is bi-directional), where for all other transport, a
new connection will be opened every time the push operation is
called:

 

subSocket.push(jQuery.stringifyJSON({ author: author, message: msg }));

 

Next, let’s add support for a really nice Atmosphere feature,
which is the ability to share a connection amongst open
windows/tabs. All you need to do in Atmosphere is to set the shared
variable to “true” when doing a request:

 

var request = { url: document.location.toString() + 'chat',
          contentType : "application/json",
          transport : transport ,
          shared : true,
          fallbackTransport: 'long-polling'};

 

Now every time a new window or tab gets opened and the same page
opened, the connection will be shared. To get notified when the
“master” tabs/windows (the open that opened first), just
implement

 

    request.onLocalMessage = function(message) {
      ….
    }

 

Tabs/Windows can also communicate directly by
using the following function.

 

 subSocket.pushLocal(…)

 

     
    

 

Fully functional – not just yet!

That’s it, we now have a fully functional chat
application. But there are two problems with the current
application. The first one is related to Proxy/Firewall.
Occasionally Proxy/Firewall doesn’t allow a connection to stay
inactive for a longer period of time, and usually the connection
gets closed automatically by the Proxy. For a suspended connection,
it means the client will have to reconnect every time a connection
gets closed. One possible solution is to keep the suspended
connection active by sending some bytes between the client and the
server. Fortunately for us, all we need to do is to add the
HeartbeatInterceptor, which will keep the connection
active for us, transparently (Listing 5).

Listing 5: HeartbeatInterceptor

 

  @AtmosphereHandlerService(
          path = "/chat",
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                  BroadcastOnPostAtmosphereInterceptor.class,
                                   HeartbeatInterceptor.class})
  public class ChatRoom extends OnMessage<String> {

 

Now the HeartbeatInterceptor will
periodically write bytes (whitespace) to the connection to keep it
active. Unfortunately, there are still Proxies that could close the
connection after a period of time (active or not) or a network
issue could arise and the browser will have to reconnect.

During the process of reconnect, a broadcast
operation can always happen and the browser may never get the
broadcast because the connection is in the process of connecting.
Under that scenario it will mean the browser has missed a message
(or lost it). For some applications, it may not be problematic, but
for some missing message is a major issue.

Fortunately Atmosphere supports the concept of
BroadcasterCache. Installing a BroadcasterCache
will allow a Browser to never miss/lose messages. When the Browser
reconnects, Atmosphere will always look in the cache and make sure
all messages that occurred during the reconnection time are send
back to the browser. The BroadcasterCache API is pluggable
and Atmosphere ships with ready-to-use implementation. Hence, for
our Chat application, all we need to do is:

 

 @AtmosphereHandlerService(
          path = "/chat",
          broadcasterCache = HeaderBroadcasterCache.class,
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                   BroadcastOnPostAtmosphereInterceptor.class,
                                   HeartbeatInterceptor.class})
  public class ChatAtmosphereHandler extends OnMessage<String> {

 

Our application is now guaranteed to never miss or lose a
message. The second issue we need to address is intermixed
messages, depending on the WebServer used. The Browser may receive
two messages in one chunk, one and a half messages, etc. This is
problematic because let’s say we use JSON for encoding our message,
the Browser will fail to decode messages that takes the form
of:

{"message":"Hello World","author":"John Doe","time":1348578675087}{"message":"Cool Man","author":"Foo Bar","time":1348578675087}

or

{"message":"Hello World","author":"John Doe

or

{"message":"Hello World","author":"John Doe","time":1348578675087}{"message":"Cool Man","author"

When the Browser is receiving such messages, it
will fail to decode it

var json = jQuery.parseJSON(message);

To solve the issue, we need to install the
TrackMessageSizeInterceptor, which will add some hints to
the message, and the Browser will be able to use those hints to
make sure the atmosphere.js onMessage function is always
invoked with a valid message (Listing 6).

 

Listing 6: 

 @AtmosphereHandlerService(
          path = "/chat",
         broadcasterCache = HeaderBroadcasterCache.class,
          interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                   BroadcastOnPostAtmosphereInterceptor.class,
                                   TrackMessageSizeInterceptor.class,
                                   HeartbeatInterceptor.class})
  public class ChatRoom extends OnMessage<String> {

On the client side, all we need to do is to set the
trackMessageLength on the request object.

 

To The Cloud!

We are now ready to deploy our application into
the cloud…well, not yet. The next feature we need to add is how
messages are getting distributed amongst servers when deployed in a
Cloud. The problem we need to solve can be seen in Figure
3
.

Figure 3: Servers
in the Cloud

 

Under that scenario, when a broadcast action occurs on Tomcat
Server 1, Tomcat Server 2 will never get the messages. For our
application, that means some users won’t see other messages, which
is clearly a major issue. Not only for a chat, but for any
application deployed into the Cloud we need to solve that
issue.

Fortunately for us, Atmosphere supports “Cloud-Enabled” or
“Cluster-enabled” Broadcaster that can be used to propagate message
between server instance. Atmosphere currently natively supports
well known technologies like Redis PubSub, Hazelcast, JGroups, JMS,
XMPP (for example using Gmail servers). For this article, let’s use
Redis PubSub (Figure 4).

Figure 4: Redis
PubSub

The Redis PubSub allows us to connect to a Redis
instance and subscribe to some topics. For our application, all we
need to do is to create a ‘chat’ topic and subscribe all our
servers to it. Next we just need to tell our application to use the
RedisBroadcaster instead of the normal Broadcaster. As
simple as Listing 7.

 

Listing 7:

@AtmosphereHandlerService(
        path = "/chat",
        broadcasterCache = HeaderBroadcasterCache.class,
        broadcaster = RedisBroadcaster.class,
        interceptors = {AtmosphereResourceLifecycleInterceptor.class,
                                 BroadcastOnPostAtmosphereInterceptor.class,
                                 TrackMessageSizeInterceptor.class,
                                 HeartbeatInterceptor.class})
public class ChatRoom extends OnMessage<String> {

 By just adding the RedisBroadcaster we just
enabled message sharing between servers, making our chat
application “Cloud-aware” in a single line. On the client side we
don’t have to change anything. We now have a fully functional
application:

  • Transparently supporting all existing WebServers

  • Transparently supporting all existing Browsers

  • Cloud/Cluster enabled

Our application will first negotiate the best transport to use
between the client and the server. For example, assuming we deploy
using Jetty 8, the following transport will be used 

  • Chrome 21 : WebSockets

  • Internet Explorer 9 : Long-Polling

  • FireFox 15: Server Side Events

  • Safari/iOS 6: WebSockets

  • Internet Explorer 10: WebSockets

  • Android 2.3: Long-Polling

  • FireFox 3.5 : Long-Polling

All of this transparently allows a developer to
focus on the application instead of transport/portability
issues.

Conclusions and Considerations

WebSockets and Server Sides Events are technologies on the rise
and their adoption within the enterprise is accelerating. Some
things to think about before jumping in:

  • Is the API portable, e.g. will it work on all well-known
    WebServer?
  • Is the framework already offering a transport fallback
    mechanism? For example, Internet Explorer 7/8/9 neither support
    WebSockets and Server Side Events, and unfortunately for us, those
    browsers are still widely used.
  • Is the framework cloud enabled, and more important, will it
    scale?
  • Is it easy to write application, is the framework well
    established?

Clearly, the Atmosphere Framework is the response for those four
really important questions. Still having doubts? Well, go to the
Wall Street Journal, open pages and
look for the Wordnik’s logo. More than 60 million requests per day,
and all of this powered by the Atmosphere Framework! Start today by
going to our website!

Author Bio: 

Jeanfrançois has been working in software engineering for
the last 18 years. He studied pure mathematics and worked for a
Canadian research centre, doing mathematical modeling in C++ until
someone introduced him to a new language called Java. He never
stopped using it. 
Jeanfrançois worked for Sun Microsystems for
almost 10 years, before writing one of the first NIO
frameworks, 
Grizzly, Jeanfrançois also developed
the 
Grizzly
Comet Framework
, which was an
early way to implement asynchronous web applications. He then
started the 
Atmosphere
Framework
, which brings
portability across Servlet container and allows the creation of
WebSocket and Comet applications. He can be followed on twitter
at
http://twitter.com/jfarcand

This article appears in JAX Magazine: Atmosphere 1.0 from
October. For that and previous back issues, click here.

Flickr Image courtesy of ell brown

Author

JeanfrancoisArcand

All Posts by JeanfrancoisArcand

Jeanfran
Comments
comments powered by Disqus