TUTORIAL: Centralise your Big Data

Distributed log aggregation with RabbitMQ Federation

In our current interconnected world, it's becoming quite common to see application architectures that span several data centers across the globe. With these kind of architectures new challenges arise, like the problem of collecting data from each of the various locations into a central server.

Let's say we have an online shop that has a localized version for different countries across many continents, and we want to be able to collect metrics about our users' behaviour in order to improve sales. How could we ship those metrics in a near real time fashion to our data warehouse so we can process them there?

One way to aggregate logs coming from several sources on the same data center is to use a messaging solution, like RabbitMQ. All of our frontend servers could log metrics to a local RabbitMQ instance, and then background worker processes could pick up the data and analyze it. The problem with this approach is that our data will be confined to the local datacenter of that particular region.

How could we forward the data across the world to our central data warehouse? We could have the workers sending the data over the wire to a remote RabbitMQ server, but that would involve a lot of coding on our side. Also, with the introduction of a distributed system a new array of problems arise that we probably don't want to deal with: what happens if the remote servers are unreachable? Do we queue messages locally or we drop them? If we queue them locally, do we have an unbounded queue or we need to drop messages after a certain threshold? And the list of problems just grows. Fortunately there's a solution for this: the RabbitMQ Federation Plugin.

The RabbitMQ Federation Plugin (federation for short), allows us to define a set of RabbitMQ servers as upstreams in relation to a particular server in our distributed setup. That particular server will be the downstream server, i.e. where the data will be collected. Messages that are published to the upstream servers will be forwarded to the downstream server as if they were originally published to it (See the image below). Consumers on queues on the downstream server will process messages as normal, not knowing those messages might come from a remote server. Federation also allows for more complex setups, where there are many downstream servers, or where downstream servers are used as upstream for other servers down in the federation chain.

How does it work? Usually what we do in RabbitMQ is publish messages to an exchange, which are then routed to queues. If we use federation, we can get RabbitMQ to forward those messages automatically for us across our servers. That means messages arriving to those federated exchanges will also be forwarded to the downstream servers.

To use federation we need to understand the concept of upstream and downstream servers. Upstream servers are where the messages are originally published. Downstream servers are where the messages get forwarded to. Before getting into how to define upstream and downstream servers, let's see how to enable the federation plugin with RabbitMQ.

Setting up Federation Plugin

Since the federation plugin comes by default with RabbitMQ, we just need to enable it by running the following command:

$ rabbitmq-plugins enable rabbitmq_federation

And if we want to be able to manage the federation plugin from the Web UI we could also enable the management bits:

$ rabbitmq-plugins enable rabbitmq_federation_management

Then we just need to restart the broker (if it was running) and the federation plugin will be ready to use.

Now, let's assume that we have an application architecture similar to the one from the map below, where we have a central server in the US and branches in Europe, Asia and South America (assuming an EC2 based architecture). We have a RabbitMQ server set up there and now we want to federate them - that is, ship messages from the overseas locations to the US based server.

To do that we have to login into the US server and declare our upstream servers, that is, the servers that will forward their messages to it. The command to define and upstream is as follows:

$ rabbitmqctl set_parameter federation-upstream sa-upstream \
  '{"uri":"amqp://ec2-XX-XXX-XX-XXX.sa-east-1.compute.amazonaws.com:5672"}'

With that command we are defining an upstream server called sa-upstream on which we specify the server as an AMQP URI (you'll have to replace the Xs there for the actual server IP that EC2 assigned you). On the RabbitMQ site you can read more about the AMQP URI specification and how you could use SSL connections with it. Note that to run the previous command your RabbitMQ server must be running.

You can repeat your previous command for each of the upstream servers, and then once you have them all set, you can run the following command to actually get federation started.

$ rabbitmqctl set_policy --apply-to exchanges federate-me "^metrics\." \
'{"federation-upstream-set":"all"}'

With that command we are telling RabbitMQ that we want it to federate all exchanges whose names start with metrics.. The JSON object tells the federation plugin how to connect to the upstream servers. The federation plugin maintains a set of upstream servers called all on which all the upstream serves are added. The federation plugin will connect to the servers on that sets and get the messages from them. We could define our own upstream set and add our upstreams there, but for this particular example we won't need that.

Now that we have our federation setup ready, it's time to start our consumer in our US datacenter and then forward messages to it from any of our upstream servers. We are going to create our consumers using the official RabbitMQ Java Client which you can obtain here: http://www.rabbitmq.com/java-client.html. We assume that you unzipped the Java client in the appropriate location and that you have set up the class path accordingly. Read more about how to setup the Java client here: http://www.rabbitmq.com/tutorials/tutorial-one-java.html.

Here's our RabbitMQ consumer, a file called Consumer.java:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

public class Consumer {

    private final static String EXCHANGE_NAME = "metrics.user_login";
    private final static String QUEUE_NAME = "user_login";

    public static void main(String[] argv) throws Exception {
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
      System.out.println("Waiting for messages. To exit press CTRL+C");

      QueueingConsumer consumer = new QueueingConsumer(channel);
      channel.basicConsume(QUEUE_NAME, false, consumer);

      while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println("Received '" + message + "'");
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
      }
  }
}

Above is a very basic RabbitMQ consumer. After we obtain a connection to RabbitMQ, we declare an exchange using the name 'metrics.user_login' so it matches the policy we defined before. We then declare a queue and bind it to the exchange. Finally, after we've defined our callback we subscribe to the queue to start receiving messages. Note that this queue will receive all the messages published locally and also on the upstream server.

Let's store this file on the Consumer.java file and run it by calling:

 $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Consumer

Now let's quickly create a producer to send data to our consumer. Save the code below to the file Producer.java.

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class Producer {

  private final static String EXCHANGE_NAME = "metrics.user_login";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

    String message = "Hello Federation!";
    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

    channel.close();
    connection.close();
  }
}

The code in our producer is nearly the same as the one for our consumer. The difference here is that instead of setting up a callback we call basicPublish to send messages to our exchange. Let's try running this producer locally by opening a new terminal window and then issuing the following command:

us-host$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer

If everything went well we should see the message Hello Federation! appearing on our consumer screen. The next step would be to run our producer from our upstream machine and see if RabbitMQ forwards the message automatically for us.

Let's login on our remote machine in Brazil, install the Java RabbitMQ client there, copy the Producer.java script and then run it form the command line:

sa-host$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Producer

Once again if everything went well, we should see the message Hello Federation! appearing in our consumer screen. And that's all folks!. Federation is that easy to use. Note that we don't have anything federation specific in our code, nor we had to restart our consumer in order to get the federated messages. We just had to define an upstream as parameter and then apply a policy to our queue. Now let's see what are some of the options available to use when defining an upstream.

Federation Options

To define an upstream we had to set a RabbitMQ runtime parameter like this:

$ rabbitmqctl set_parameter federation-upstream sa-upstream \
  '{"uri":"amqp://ec2-XX-XXX-XX-XXX.sa-east-1.compute.amazonaws.com:5672"}'

We can see there that a parameter is basically a JSON object. We don't need to limit ourselves to just setting up the URI for the upstream. We can specify more options that will control how federation behaves:

{
    'uri': 'amqp://server-name/',
    'prefetch-count': 1000,
    'reconnect-delay': 1,
    'ack-mode': 'on-confirm',
    'expires': 3600000,
    'message-ttl': 1800000
}

In that JSON object we set up some options besides the usual uri we had before. For example, prefetch-count will tell the federation plugin how many messages can be in-flight before them being ack'ed. Behind the scenes, federation will set up queues and consumers that are transparent for the user. Federation will use those queues and consumers to ship messages from broker to broker. What this parameter does is to setup the basic.qos prefetch count for federation's own internal consumer.

Then we have reconnect-delay. This parameter affects how long the federation plugin will wait before attempting to reconnect to the remote node if the connection is lost. The value is specified in seconds.

With ack-mode we have the option to configure the trade off between speed and reliability. We can have a very fast federation but with the risk of message loss or we can have a bit slower setup and ensure that we don't lose any messages. To understand the different options we can use here, first we need to understand a bit how federation works behind the scenes. Federation will create an internal queue for its messages, it will consume from them, and that consumer will take care of publishing them in the downstream server. To ensure that there's no message loss, the federation plugin needs to decide when it should ack those messages in its internal queue.

If we set the ack-mode option to no-ack, then we could experience message loss, because as soon as the federation internal consumer receives the message, RabbitMQ will forget about it. That's the fastest option, but the most unsafe one. The second option will be on-publish, which as soon as the downstream server takes the published message, the federation plugin will ack the message so the upstream server can forget about it. Finally with on-confirm the federation app will only ack the message once it receives the confirmation from the downstream server that the broker has properly handled the message. (Read more about publisher confirms on rabbitmq.com/confirms.html). In any case choose the option that better fits your use case.

Expires and Message TTL both help with preventing having an unbounded buffer for the upstream messages server. Imagine there's a broken network link to the downstream server and then messages start to pile up there. If there wasn't a way to prevent that at some point we would run out of memory, disk space or both and to prevent a system failure RabbitMQ would have to block producers. With expires we can provide a time in milliseconds that tells RabbitMQ when to delete the upstream federation queue in case it is unused. Message TTL will tell RabbitMQ to expire messages that were supposed to be federated but got older than the time specified in the configuration. You can read more about all the options federation takes at rabbitmq.com/federation-reference.html.

Conclusion

In this article we have seen that although the concept of a distributed RabbitMQ setup seems daunting at first, it is not hard to set up in practice. We actually need just two commands to get a federation upstream link up and running. We've also seen that federation is transparent to producers and consumers - they don't need to know anything about it. From a flexibility point of view, we saw that RabbitMQ Federation offers quite a few options that can help tailor the plugin to our needs.

Related reading

Alvaro Videla works as Developer Advocate for RabbitMQ/Pivotal. Before moving to Europe he used to work in Shanghai where he helped building one of Germany's biggest dating websites. He co-authored the book "RabbitMQ in Action" for Manning Publishing. Some of his open source projects can be found here: http://github.com/videlalvaro. Apart from code related stuff he likes traveling with his wife, listening/playing music and reading books. You can find him on Twitter as @old_sound.

Alvaro Videla

What do you think?

JAX Magazine - 2014 - 06 Exclucively for iPad users JAX Magazine on Android

Comments

Latest opinions