TUTORIAL: Centralise your Big Data

Distributed log aggregation with RabbitMQ Federation

AlvaroVidela
alvaro-teaser1

A practical introduction to the Federation plugin by Alvaro Videla, author of ‘RabbitMQ in Action’.

In
our current interconnected world, it’s becoming quite common to see
application architectures that span several data centers across the
globe. With these kind of architectures new challenges arise, like
the problem of collecting data from each of the various locations
into a central server.

Let’s say we have an online shop that has a localized version
for different countries across many continents, and we want to be
able to collect metrics about our users’ behaviour in order to
improve sales. How could we ship those metrics in a near real time
fashion to our data warehouse so we can process them there?

One way to aggregate logs coming from several sources on the
same data center is to use a messaging solution, like RabbitMQ. All
of our frontend servers could log metrics to a local RabbitMQ
instance, and then background worker processes could pick up the
data and analyze it. The problem with this approach is that our
data will be confined to the local datacenter of that particular
region.

How could we forward the data across the world to our central
data warehouse? We could have the workers sending the data over the
wire to a remote RabbitMQ server, but that would involve a lot of
coding on our side. Also, with the introduction of a distributed
system a new array of problems arise that we probably don’t want to
deal with: what happens if the remote servers are unreachable? Do
we queue messages locally or we drop them? If we queue them
locally, do we have an unbounded queue or we need to drop messages
after a certain threshold? And the list of problems just grows.
Fortunately there’s a solution for this: the RabbitMQ Federation
Plugin.

The RabbitMQ Federation Plugin (federation for short), allows us
to define a set of RabbitMQ servers as upstreams in
relation to a particular server in our distributed setup. That
particular server will be the downstream server, i.e.
where the data will be collected. Messages that are published to
the upstream servers will be forwarded to the downstream server as
if they were originally published to it (See the image below).
Consumers on queues on the downstream server will process messages
as normal, not knowing those messages might come from a remote
server. Federation also allows for more complex setups, where there
are many downstream servers, or where downstream servers are used
as upstream for other servers down in the federation chain.

How does it work? Usually what we do in RabbitMQ is publish
messages to an exchange, which are then routed to queues. If we use
federation, we can get RabbitMQ to forward those messages
automatically for us across our servers. That means messages
arriving to those federated exchanges will also be
forwarded to the downstream servers.

To use federation we need to understand the concept of upstream
and downstream servers. Upstream servers are where the messages are
originally published. Downstream servers are where the messages get
forwarded to. Before getting into how to define upstream and
downstream servers, let’s see how to enable the federation plugin
with RabbitMQ.

Setting up Federation Plugin

Since the federation plugin comes by default with RabbitMQ, we
just need to enable it by running the following command:

$ rabbitmq-plugins enable rabbitmq_federation

And if we want to be able to manage the federation plugin from
the Web UI we could also enable the management bits:

$ rabbitmq-plugins enable rabbitmq_federation_management

Then we just need to restart the broker (if it was running) and
the federation plugin will be ready to use.

Now, let’s assume that we have an application architecture
similar to the one from the map below, where we have a central
server in the US and branches in Europe, Asia and South America
(assuming an EC2 based architecture). We have a RabbitMQ server set
up there and now we want to federate them – that is, ship messages
from the overseas locations to the US based server.

To do that we have to login into the US server and declare our
upstream servers, that is, the servers that will forward their
messages to it. The command to define and upstream is as
follows:

$ rabbitmqctl set_parameter federation-upstream sa-upstream 
  '{"uri":"amqp://ec2-XX-XXX-XX-XXX.sa-east-1.compute.amazonaws.com:5672"}'

With that command we are defining an upstream server called
sa-upstream on which we specify the server as an AMQP
URI (you’ll have to replace the Xs there for the actual server IP
that EC2 assigned you). On the RabbitMQ site you can read more
about the AMQP URI
specification
and how you could
use SSL connections with it
. Note that to run the previous
command your RabbitMQ server must be running.

You can repeat your previous command for each of the upstream
servers, and then once you have them all set, you can run the
following command to actually get federation started.

$ rabbitmqctl set_policy --apply-to exchanges federate-me "^metrics." 
'{"federation-upstream-set":"all"}'

With that command we are telling RabbitMQ that we want it to
federate all exchanges whose names start with
metrics.. The JSON object tells the federation plugin
how to connect to the upstream servers. The federation plugin
maintains a set of upstream servers called all on
which all the upstream serves are added. The federation plugin will
connect to the servers on that sets and get the messages from them.
We could define our own upstream set and add our upstreams there,
but for this particular example we won’t need that.

Now that we have our federation setup ready, it’s time to start
our consumer in our US datacenter and then forward messages to it
from any of our upstream servers. We are going to create our
consumers using the official RabbitMQ Java Client which you can
obtain here: http://www.rabbitmq.com/java-client.html.
We assume that you unzipped the Java client in the appropriate
location and that you have set up the class path accordingly. Read
more about how to setup the Java client here: http://www.rabbitmq.com/tutorials/tutorial-one-java.html.

Here’s our RabbitMQ consumer, a file called
Consumer.java:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

    private final static String EXCHANGE_NAME = "metrics.user_login";
    private final static String QUEUE_NAME = "user_login";

    public static void main(String[] argv) throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
      System.out.println("Waiting for messages. To exit press CTRL+C");

      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(QUEUE_NAME, false, consumer);

      while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println("Received '" + message + "'");
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      }
  }
}

Above is a very basic RabbitMQ consumer. After we obtain a
connection to RabbitMQ, we declare an exchange using the name
'metrics.user_login' so it matches the policy we
defined before. We then declare a queue and bind it to the
exchange. Finally, after we’ve defined our callback we subscribe to
the queue to start receiving messages. Note that this queue will
receive all the messages published locally and also on the upstream
server.

Let’s store this file on the Consumer.java file and
run it by calling:

 $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Consumer

Now let’s quickly create a producer to send data to our
consumer. Save the code below to the file
Producer.java.

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer {

  private final static String EXCHANGE_NAME = "metrics.user_login";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

    String message = "Hello Federation!";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

    channel.close();
    connection.close();
  }
}

The code in our producer is nearly the same as the one for our
consumer. The difference here is that instead of setting up a
callback we call basicPublish to send messages to our
exchange. Let’s try running this producer locally by opening a new
terminal window and then issuing the following command:

us-host$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer

If everything went well we should see the message Hello
Federation!
appearing on our consumer screen. The next step
would be to run our producer from our upstream machine and see if
RabbitMQ forwards the message automatically for us.

Let’s login on our remote machine in Brazil, install the Java
RabbitMQ client there, copy the Producer.java script
and then run it form the command line:

sa-host$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer

Once again if everything went well, we should see the message
Hello Federation! appearing in our consumer screen.
And that’s all folks!. Federation is that easy to use.
Note that we don’t have anything federation specific in our code,
nor we had to restart our consumer in order to get the federated
messages. We just had to define an upstream as parameter and then
apply a policy to our queue. Now let’s see what are some of the
options available to use when defining an upstream.

Federation Options

To define an upstream we had to set a RabbitMQ runtime parameter
like this:

$ rabbitmqctl set_parameter federation-upstream sa-upstream 
  '{"uri":"amqp://ec2-XX-XXX-XX-XXX.sa-east-1.compute.amazonaws.com:5672"}'

We can see there that a parameter is basically a JSON object. We
don’t need to limit ourselves to just setting up the URI for the
upstream. We can specify more options that will control how
federation behaves:

{
    'uri': 'amqp://server-name/',
    'prefetch-count': 1000,
    'reconnect-delay': 1,
    'ack-mode': 'on-confirm',
    'expires': 3600000,
    'message-ttl': 1800000
}

In that JSON object we set up some options besides the usual
uri we had before. For example,
prefetch-count will tell the federation plugin how
many messages can be in-flight before them being ack’ed. Behind the
scenes, federation will set up queues and consumers that are
transparent for the user. Federation will use those queues and
consumers to ship messages from broker to broker. What this
parameter does is to setup the basic.qos prefetch
count for federation’s own internal consumer.

Then we have reconnect-delay. This parameter
affects how long the federation plugin will wait before attempting
to reconnect to the remote node if the connection is lost. The
value is specified in seconds.

With ack-mode we have the option to configure the
trade off between speed and reliability. We can have a very fast
federation but with the risk of message loss or we can have a bit
slower setup and ensure that we don’t lose any messages. To
understand the different options we can use here, first we need to
understand a bit how federation works behind the scenes. Federation
will create an internal queue for its messages, it will consume
from them, and that consumer will take care of publishing them in
the downstream server. To ensure that there’s no message loss, the
federation plugin needs to decide when it should ack those messages
in its internal queue.

If we set the ack-mode option to
no-ack, then we could experience message loss, because
as soon as the federation internal consumer receives the message,
RabbitMQ will forget about it. That’s the fastest option, but the
most unsafe one. The second option will be on-publish,
which as soon as the downstream server takes the published message,
the federation plugin will ack the message so the upstream server
can forget about it. Finally with on-confirm the
federation app will only ack the message once it receives the
confirmation from the downstream server that the broker has
properly handled the message. (Read more about publisher confirms
on rabbitmq.com/confirms.html).
In any case choose the option that better fits your use case.

Expires and Message TTL both help with preventing having an
unbounded buffer for the upstream messages server. Imagine there’s
a broken network link to the downstream server and then messages
start to pile up there. If there wasn’t a way to prevent that at
some point we would run out of memory, disk space or both and to
prevent a system failure RabbitMQ would have to block producers.
With expires we can provide a time in milliseconds that tells
RabbitMQ when to delete the upstream federation queue in case it is
unused. Message TTL will tell RabbitMQ to expire messages that were
supposed to be federated but got older than the time specified in
the configuration. You can read more about all the options
federation takes at rabbitmq.com/federation-reference.html.

Conclusion

In this article we have seen that although the concept of a
distributed RabbitMQ setup seems daunting at first, it is not hard
to set up in practice. We actually need just two commands to get a
federation upstream link up and running. We’ve also seen that
federation is transparent to producers and consumers – they don’t
need to know anything about it. From a flexibility point of view,
we saw that RabbitMQ Federation offers quite a few options that can
help tailor the plugin to our needs.

Related reading

Alvaro Videla works as Developer Advocate for
RabbitMQ/Pivotal. Before moving to Europe he used to work in
Shanghai where he helped building one of Germany’s biggest dating
websites. He co-authored the book “RabbitMQ in Action” for Manning
Publishing. Some of his open source projects can be found here:
http://github.com/videlalvaro.
Apart from code related stuff he likes traveling with his wife,
listening/playing music and reading books. You can find him on
Twitter as @old_sound.

Author
Comments
comments powered by Disqus