Here's all you need to know

Reactive programming, HTTP and Angular 2

Thierry Templier
reactive programming
Different orientation image via Shutterstock

Reactive Programming and observables are really powerful and go beyond what promises offer. They provide key features like laziness and the ability to cancel them. This allows you to add robustness into Angular 2 applications especially at the level of HTTP to finely control what is executed.

As you probably know Angular 1 leverages promises to handle asynchronous processing. Promises are a powerful feature to handle asynchronous processing within JavaScript. They are even part of EcmaScript from version 2.

In version 2, Angular introduces Reactive Programming based on observables for asynchronous processing. Of course, it’s still possible to use promises, if desired, but we will explain why you should use observables and why it’s a killer feature and an important choice and design.

After having introduced observables in relation with promises, we will see how Angular 2 leverages it. We will then describe in detail different use cases where observables shine (mainly in relation with HTTP).

Let’s now introduce the concepts and differences between promises and observables.

Getting started with observables

The central concept of Reactive Programming is the observable element which corresponds to the entity that can be observed. Some people use the term “stream” instead because it’s more self-explanatory.

At first sight, promises and observables seem to be very similar. They have the same semantics and seem to address the same issues. They allow you to execute asynchronous processing, register callbacks for both successful and error responses and notify these callbacks when the result is there.

Let’s describe how to define and use raw observables based on the RxJS library, the one used by Angular 2.

Defining observables

To create an observable, we can leverage the create method of the Observable object. A function must be provided as parameter with the code to initialize the observable processing. A function can be returned by this function to cancel the observable. We will discuss this aspect in detail later.

Imagine we want a raw observable that fires an event after an amount of time. This can be done using this code:

var observable = Observable.create((observer) => {
  setTimeout(() => {
    observer.next('some event');
  }, 500);
});

Contrary to promises, observables can produce several notifications using the different methods from the observer:

  • next – Emit an event. This can be called several times.

  • error – Throw an error. This can be called once and will break the stream. This means that the error callback will be immediately called and no more events or completion can be received.

  • complete – Mark the observable as completed. After this, no more events or errors will be handled and provided to corresponding callbacks.

Let’s now see how to consume observables.

Consuming observables

Observables allow you to register callbacks for previously described notifications. The subscribe method tackles this issue. It accepts three callbacks as parameters:

  • The onNext callback that will be called when an event is triggered.

  • The onError callback that will be called when an error is thrown.

  • The onCompleted callback that will be called when the observable completes.

Here is the way to register callbacks on an observable:

var observable = (...)
observable.subscribe(
  (event) => {
    // handle events
  },
  (error) => {
    // handle error
  },
  () => {
    // handle completion
  }
);

We can also notice that an error callback can be registered using the catch method.

var observable = (...)
observable.subscribe(
  (event) => {
    // handle events
  });

observable.catch((error) => {
  // handle error
});

Since observables and promises have similar semantics, RxJS provides a bridge between them in its API.

Linking with promises

The observable class provides a fromPromise method to create an observable from a promise. This allows you to make a promise part of an asynchronous data stream. This way when the promise is resolved, an event is triggered in the corresponding observable. The same occurs for rejections in the case of errors.

Here is how to use the fromPromise method:

var observable = Observable.fromPromise(somePromise);

The opposite is also possible using thetoPromise method as described below;

var observable = (...)
var promise = observable.toPromise();

Most operators support promises. This means that they can accept promises instead of observables.

As stated before, observables have specifics that make them quite different from promises. Let’s dive into them in more detail.

Observable specificities

Promises and observables have similar semantics, but observables introduce some specificities. We will describe them all during this section and show their impacts in the context of HTTP calls.

Lazy

An observable is only enabled when a first observer subscribes. This is a significant difference compared to promises. As a matter of fact, processing provided to initialize a promise is always executed even if no listener is registered. This means that promises don’t wait for subscribers to be ready to receive and handle the response. When creating the promise, the initialization processing is always immediately called.

Observables are lazy so we have to subscribe a callback to let them execute their initialization callback. In the following code, if no subscribe method is called on the observable, the setTimeout function will never be called and the event will never be triggered.

var observable = Observable.create((observer) => {
  setTimeout(() => {
    observer.next('some event');
  }, 500);
});	

For example, for a HTTP-based observable, the associated request will only be executed at this time. Moreover if the subscribe method is never called, the request won’t be sent. Here is a sample with Angular 2.

var observable = this.http.get('http://localhost:3000/test');

Execute several times

Another particularity of observables is that they can trigger several times unlike promises which can’t be used after they were resolved or rejected.

In the following sample the observable sends events every 500 milliseconds and completes after 2 seconds.

var observable = Observable.create((observer) => {
  var id = setInterval(() => {
    observer.next('some event');
  }, 500);

  setTimeout(() => {
    clearInterval(id);
    observer.complete();
  }, 2000);
});

observable.subscribe(
  (data) => {
    console.log('data received');
  },
  (error) => {
  },
  () => {
    console.log('completed');
  });

This feature is particularly suitable for event-driven technologies like Websockets and Firebase. We will discuss this later in a dedicated section.

Canceling observables

Another characteristic of observables is that they can be cancelled. For this, we can simply return a function within the initialization call of the Observable.create function. We can refactor our initial code to make it possible to cancel the timeout function.

var observable = Observable.create((observer) => {
  var id setTimeout(() => {
    observer.next('some event');
  }, 500);

  return () => {
    clearTimeout(id);
  };
});

If the unsubscribe method is called before 500 milliseconds, the event will never be triggered, as described below:

var subscription = observable.subscribe();

setTimeout(() => {
  subscription.unsubscribe();
}, 300);

Cool but what’s so revolutionary? I mean what are the impacts and the possibilities of such features within real world applications. Let’s bring this into an Angular 2 application.

In fact, the low-level API (XHR) to execute HTTP requests in the browser supports call cancelling through its abort method. It would be interesting to bring this possibility into observables to allow us to cancel HTTP requests when they are out to date, and even before they return.

The implementation of HTTP calls of Angular 2 leverages this feature and allows cancellation for such use cases. To cancel an HTTP request, simply call the unsubscribe method on the observable returned by the HTTP call.

var observable = this.http.get('http://localhost:3000/test');

var subscription = observable.subscribe(res => {
  console.log('The response is received.');
});

setTimeout(() => {
  subscription.unsubscribe();
}, 500);

If we have a look at the Network tab in the browser dev tools, we will see that the HTTP request is actually cancelled if the result isn’t received before 500 milliseconds.

pic1

This feature is actually a killer one! We will see how powerful it can be when used with operators.

Leverage operators

When using Reactive Programming, we will leverage asynchronous data streams. This means that we will connect several parts of the application on these streams. So we need to work on them to transform and adapt them.

For that reason, observables come with a set of operators to transform and adapt one stream into another one.

The following list describes the most common operators:

  • map – Projects each element of an observable sequence into a new form by incorporating the element’s index.

  • flatMap – Projects each element of an observable sequence to another observable sequence.

  • flatMapLatest – Same as flatMap but cancels previous observables if they are in progress.

  • filter – Filters the elements of an observable sequence based on a predicate.

Here is a simple sample that describes how to create data stream to query the HTTP geonames service according to the values entered by a user as an input:

var searchInput = document.querySelector('#searchInput');
var searchStream = Observable.fromEvent(searchInput, 'keyup');

var observable = searchStream.map((event) => {
  var text = event.srcElement.value;
  return 'http://api.geonames.org/postalCodeSearchJSON?placename_startsWith=${text}&username=XXX';
}).flatMap((url) => {
  return Observable.fromPromise(jQuery.getJSON(url));
});

We won’t go further here but rather provide how to use these operators in action with HTTP.

Build asynchronous processing chain

In the first section, we described the foundations of Reactive Programming and observables and how to leverage their operators to build data streams. It’s now time to bring this into a real world Angular 2 application.

Basic use with the Http class

The basic usage consists of executing an HTTP request. If we reuse the geonames HTTP service, simply use the Http class and its get method with parameters defined within the URLSearchParams class. We add the map operator to extract the JSON payload from the whole response.

import {Injectable} from 'Angular 2/core';
import {Http,URLSearchParams} from 'Angular 2/http';
import 'rxjs/add/operator/map';

@Injectable()
export class GeonamesService {
  constructor(private http:Http) {
  }

  searchPlaces(searchText:string) {
    var params = new URLSearchParams();
    params.set('placename_startsWith', searchText);
    params.set('username', 'XXX');

    return this.http.get('http://api.geonames.org/postalCodeSearchJSON',
                                  { search: params }).map(res => res.json().postalCodes);
  }
}

Note that the corresponding providers must be specified to be able to inject an instance of the Http class. This can be done when calling the bootstrap function with the HTTP_PROVIDERS object. Moreover, as observables are lazy, we need to call the subscribe method when executing the searchPlaces method. For more details, you can refer the previous article, “Implementing an Angular 2 frontend over an APISpark-hosted Web API”.

Linking with user events

We can go further than reacting to user input in order to display in real-time the result corresponding to a search text. For this, we can leverage a form control to be notified when the value of an input is updated. This control will provide an observable through its valueChanges property. This will correspond to the input of our data stream. If you’re not comfortable with form controls, you can have a look at the previous article, Implementing Angular2 forms – Beyond basics.

To adapt our previous sample using jQuery to Angular 2 and use our searchPlaces method, we will have the following code:

@Component({
  selector: 'my-app',
  template: '

<form>
      <input [ngFormControl]="ctrl"/>
      Result:

<ul>

<li *ngFor=”#place of places | async”>{{place.placeName}}</li>

      </ul>

    </form>

  '
})
export class AppComponent {
  constructor(private service:GeonamesService) {
    this.ctrl = new Control();

    this.places = this.ctrl.valueChanges.flatMap(
      val => {
        return this.service.searchPlaces(val);
      });
  }
}

We see now how observables can fit into Angular 2 applications. That said, the previous code suffers from several drawbacks:

  • An HTTP request is executed for each input of the user. For example, if the user wants to search for the pattern “nant”, four requests will be sent but the first three ones won’t be relevant.

  • Another problem is execution times of HTTP requests. In fact, a second request could be longer than the fourth one, meaning the corresponding result will be received last and it’s this content that will be displayed instead of the one from the last one.

Here is what we see in the Network tab of the browser developer tools.

pic 1

Let’s start by fixing the second issue. RxJS provides the switchMap operator that cancels previous in-progress observables when providing new ones.

this.places = this.ctrl.valueChanges.switchMap(
  val => {
    return this.service.searchPlaces(val);
});

With this operator, we can now see that previous requests are canceled. This is possible because observables can be canceled.

pic 2

We can even go a step further to only execute the last request by buffering the events and handle only the last after some time of inactivity. The debounceTime operator suits our needs here.

this.places = this.ctrl.valueChanges.debounceTime(500).flatMap(
  val => {
    return this.service.searchPlaces(val);
});

We now have only one HTTP request sent.

pic 3

Defining execution flow of several HTTP requests

We saw how powerful observables and operators can be for controlling an asynchronous data flow. We will now see another common use case in Web applications: the ability to control the execution of HTTP requests. Imagine we want to execute a request using the result of a previous one or to execute two requests in parallel and execute some processing when both responses are received.

Executing a request with the result of a previous one

In this first case, the flatMap operator suits. Imagine that we want to get the country information for an element we got from our previous search. We need to execute another request on the geonames service using this country code. In the following snippet, we get the country corresponding to the first result of the first request.

this.country = this.ctrl.valueChanges.debounceTime(500).flatMap(
  val => {
    return this.service.searchPlaces(val);
  }).flatMap(data => {
    return this.service.getCountryFromCode(data[0].countryCode);
  });

Executing two requests in parallel

Here the Observable.forkJoin method can be used. For developers that already implement Angular1 applications, it does the same thing than $q.all. We will aggregate data regarding a country, i.e. the country hints (getCountry method), the population of its capital (getCapitalPopulation method) and its neighbours (getNeighbours method).

The getCapitalPopulation and getNeighbours methods require the getCountry method to be executed since they rely on its result. For this, we will leverage the forkJoin method inside a flatMap operator.

    this.country = this.service.getCountry('FR').flatMap(country => {
      return Observable.forkJoin([
          Observable.of(country),
          this.service.getCapitalPopulation(country),
          this.service.getNeighbours(country)
      ]);
    }).map(data => {
      var country = data[0];
      country.capitalPopulation = data[1];
      country.neighbours = data[2];
      return country;
    });

We saw how to implement asynchronous processing chains using operators. To make processing more robust we need to handle errors within processing chains.

Error handling

Handling errors efficiently is an important part of processing since this allows you to improve your application robustness. This can be done by leveraging several observable operators.

Intercept error in the stream

With observables, we can leverage the catch operator. This way it’s not necessary to handle errors only when subscribing on the whole stream.

This operator can be used within services to extract the actual error from an HTTP response payload or call a method of the service to handle errors globally. To propagate a new error in the stream, use the Observable.throw method. The following snippet describes this approach.

searchPlaces(searchText:string) {
  var params = new URLSearchParams();
  params.set('placename_startsWith', searchText);
  params.set('username', 'XXX');

  return this.http.get('http://api.geonames.org/postalCodeSearchJSON',
                                  { search: params })
                  .map(res => res.json().postalCodes)
                  .catch(res => Observable.throw(res.json()));
}

Angular 2 allows you to extend the Http class itself to plug our own processing. This could correspond to be processing to handle errors globally.

Extending the Http class

For this, we can create a class that extends the Http one and redefines the methods we want to intercept. In these methods, just call the corresponding methods from the superclass and leverage the catch operator to attach our generic error processing.

The following code describes how to implement this approach.

@Injectable()
export class CustomHttp extends Http {
  constructor(backend: ConnectionBackend, defaultOptions: RequestOptions) {
    super(backend, defaultOptions);
  }

  request(url: string | Request, options?: RequestOptionsArgs): Observable<Response> {
    console.log('request...');
    return super.request(url, options).catch(res => {
      // do something
    });        
  }

  get(url: string, options?: RequestOptionsArgs): Observable<Response> {
    console.log('get...');
    return super.get(url, options).catch(res => {
      // do something
    });
  }

  (...)
}

Our CustomHttp class can be configured by overriding the Http default provider when calling the bootstrap function. This can be done with a factory.

bootstrap(AppComponent, [HTTP_PROVIDERS,
    new Provider(Http, {
      useFactory: (backend: XHRBackend, defaultOptions: RequestOptions) => new CustomHttp(backend, defaultOptions),
      deps: [XHRBackend, RequestOptions]
  })
]);

For some use cases, we want to enable a retry feature on HTTP request failures.

Retrying requests

Another operator, the retry one, is particularly handy. It allows you to repeat the source observable sequence the specified number of times or until it successfully terminates. In the context of HTTP requests, this allows you to transparently re-execute requests that failed.

The following snippet describes how to use the retry operator:

searchPlaces(searchText:string) {
  var params = new URLSearchParams();
  params.set('placename_startsWith', searchText);
  params.set('username', 'XXX');

  return this.http.get('http://api.geonames.org/postalCodeSearchJSON',
                                  { search: params })
                  .retry(4)
                  .map(res => res.json().postalCodes)
                  .catch(res => Observable.throw(res.json()));
}

With this configuration, we will see the following requests in the Network tab.

pic 4

This approach is however a bit basic since we didn’t specify the interval to use between requests. A better approach is to combine the retryWhen, delay and timeout operators. This way we will specify to retry after a delay and after a global amount of time the request will fail if retries failed.

In the following code, in the case of failure, we will try for 2 seconds. The delay between each try will be 500 milliseconds.

searchPlaces(searchText:string) {
  var params = new URLSearchParams();
  params.set('placename_startsWith', searchText);
  params.set('username', 'templth');

  return this.http.get('http://api.geonames.org/postalCodeSearchJSON',
              { search: params })
        .retryWhen(error => error.delay(500))
        .timeout(2000, return new Error('delay exceeded'))
        .map(res => res.json().postalCodes);
}

Sharing observables

As stated, observables allow you to register several listeners. This of course applies to observables returned from HTTP calls. Using this approach allows you to notify several parts of an Angular 2 application and trigger some processing when the response of an HTTP call is received.

For simplicity, we gather the two subscriptions in a same HTTP observable into a snippet. In real-world applications, this can be dispatched into several parts.

let observable = this.service.searchPlaces('nant');

// Within one component
observable.subscribe((data) => {
  // Some processing
});

// Within another component
observable.subscribe((data) => {
  // Some other processing
});

If we execute this piece of code, we will see that the corresponding HTTP request is executed twice.

pic 5

Why such behavior? In fact, this is linked to the state of observables. By default, HTTP observables are cold. This means that they are passive and start publishing on requests, i.e. the complete stream is executed for each subscription.

What we need here is to share the observable for all subscriptions and it publishes regardless of subscriptions. This feature is called hot observable.

The only thing we need to do is to call the share operator on the observable corresponding to our HTTP request.

let observable = this.service.searchPlaces('nant').share();

Now the HTTP request will be called once.

Polling

In some cases polling an HTTP resource can be useful especially when we can’t use real-time middlewares leveraging Web Sockets. Observables make the implementation of such a feature simple using operators.

Initializing polling

Implementing polling is very simple with observables using the interval operator. This operator will trigger repeatedly an event after a specified amount of time.

We can check for new messages each minute with the following code:

initializePolling() {
  return Observable
         .interval(60000)
         .flatMap(() => this.getNewMessages());
}

This code is executed indefinitely. With observables, we can extend this code to make it end based on some conditions.

Canceling polling

To stop polling the request, we can unsubscribe the subscription. A more elegant solution consists of defining an external completion signal with the takeUntil operator.

The following code adapts the code of the previous section to use an observable to notify the polling observable to finish. This code can be located into a service.

initializePolling(stopPolling) {
  return Observable
         .interval(60000)
         .flatMap(() => this.getNewMessages())
         .takeUntil(stopPolling);
}

A component can leverage the initializePolling method of the previous service to initialize the polling and provide a subject (an hot observable) to finish this polling.

constructor() {
  this.stopPolling = new Subject();
  this.service.initializePolling(stopPolling);
}

stopPolling() {
  this.stopPolling.next(true);
}

Transparent security handling

If you used Angular 1, you probably leverage the $httpProvider service and create HTTP interceptors. This feature is really powerful since it allows you to define global processing for HTTP requests. There is no built-in feature for this in Angular 2 but it’s easy to plug such a processing.

We can rely on the approaches described for error handling in a service or by extending the Http class.

Nowadays authentication relies on tokens. They can expire and refreshing them is possible by calling a dedicated HTTP resource with a refresh token. When first authenticating, the current token and the refreshed one can be stored into the Angular 2 application.

When receiving 401 errors in HTTP responses, we can try to automatically refresh the token. The following snippet describes how to leverage the refreshAuthenticationToken method in the data flow.

return this.http.request(url, request)
    .catch(initialError => {
      if (initialError.status === 401) {
        return this.refreshAuthenticationToken().flatMap((newToken) => {
          // retry with new token
          this.setAuthorizationHeader(request.headers, newToken);
          return this.http.request(url, request);
        }
      } else {
        return Observable.throw(initialError);
      }
    });
}

You couldn’t finish this article without describing how to use event-based technologies with observables.

Event-based support

Another key feature of observables is that they support events. I mean you can subscribe a listener once for an event type and be notified then each time this event is triggered. This is extremely different from promises. Promises will only receive the event and won’t be used any longer afterwards. If you want to receive the next event, you will need to create a new promise. This adds complexity with applications.

Sure, this makes sense within Angular 2 components that can trigger custom events based on the EventEmitter class. For such events (and DOM native ones), it’s possible to subscribe listeners to execute processing when they occur. There are even supports within component templates to handle them declaratively and make them involve within two way bindings.

We won’t go any further here on such features but we’ll describe how useful observables can be for real-time servers that leverage web sockets.

Receiving events

A WebSocket JavaScript object is available within the browser to use web sockets. It allows you to initialize a web socket by specifying the target address and registering callbacks for different supported events.

Web sockets come with a set of events out of the box. To bring observables at this level, we need to create a raw observable around a WebSocket instance, as described below. According to events triggered by the web sockets, we can call the associated observer:

  • onclose event – If the web socket is clean, we complete the observable, if not we trigger an error.

  • onerror event – We trigger an error on the observable.

  • onmessage event – We trigger an event on the observable.

We return a function that closes the underlying websocket when every subscriber unsubscribes. Don’t forget to use the share operator to make the observable “hot” since it can be shared by several parts of the application.

export class WebSocketService {
  initializeWebSocket(url) {
    this.wsObservable = Observable.create((observer) => {
      this.ws = new WebSocket(url);

      this.ws.onopen = (e) => {
        (...)
      };

      this.ws.onclose = (e) => {
        if (e.wasClean) {
          observer.complete();
        } else {
          observer.error(e);
        }
      };

      this.ws.onerror = (e) => {
        observer.error(e);
      }

      this.ws.onmessage = (e) => {
        observer.next(JSON.parse(e.data));
      }

      return () => {
        this.ws.close();
      };
    }).share();
  }
}

To send a message, we can simply provide a method that leverages the send method of the websocket instance:

sendData(message) {
  this.ws.send(JSON.stringify(message));
}

We need to improve our implementation to make it a bit more robust to filter received messages and implement retries.

Making support more robust

To do this, we need to wrap our initial websocket observable into another one. This way we can support retries when the connection is lost and integrate filtering on criteria like the client identifier.

return Observable.create((observer) => {
  let subscription = this.wsObservable
          .filter((data) => data.sender!==clientId)
          .subscribe(observer);

  return () => {
    subscription.unsubscribe();
  };
}).retryWhen((errors) => {
  return Observable.timer(3000);
});

Conclusion

As you can see, Reactive Programming and observables are really powerful and go beyond what promises offer. They provide key features like laziness and the ability to cancel them. This allows you to add robustness into Angular 2 applications especially at the level of HTTP to finely control what is executed.

Observables also allow you to build asynchronous data stream to link user interactions with HTTP calls and HTTP results to user interfaces. Such streams can be built using observable operators. They provide a wide range of features and allow you to build in a few lines of code complex data processing chain.

We hope that you understand now better why Reactive Programming was brought into the Angular 2 framework and what it provides. Understanding how observables work allows you to better design your Angular 2 applications and leverage all the power of this framework.

Author

Thierry Templier

Thierry Templier, also known as API Iron Cat, is software architect at Restlet. In addition to server-side architecture, he’s also been a JavaScript and CSS addict for a very long time. He co-authored a book on the subject in 2009, “JavaScript for Web 2.0”. In addition to working with patterns, libraries and frameworks to build rich Internet applications, he also implements JavaScript server-side applications with Node. He has worked with Angular since 2013, including contributing to the migration of Restlet’s API platform frontend from JavaScript / jQuery to Angular. He likes to make applications that interact efficiently with RESTful services and web APIs. Thierry also teaches classes around JavaScript, Angular and Node and contributes to Angular 2 on the HTTP module and on p) StackOverflow where he is one the five top answerers.


Comments
comments powered by Disqus