Weaving YARN

Developing Distributed Applications with Apache Twill

For the past couple of decades, Java programmers have been building vast numbers of web apps generating petabytes of log data. Analyzing these web logs can create real business value -  for example, by optimizing the web app for observed user behavior patterns, or by personalizing the user experience within the app. But until recently, companies have discarded their web logs because they were either too hard to analyze, or too expensive to store with traditional relational databases.

Apache Hadoop [1] is a free, open source technology that runs on commodity hardware clusters. Its distributed file system (HDFS) makes it cheap to store large amounts of data, and its scalable Map/Reduce analysis engine makes it possible to extract insights from that data. Map/Reduce [2] is a flavor of batch-driven data analysis, where the input data is partitioned into smaller batches that can be processed in parallel across many machines in the Hadoop cluster. But Map/Reduce, while powerful enough to express many data analysis algorithms, is not always the optimal choice of programming paradigm. It‘s often desirable to run other computation paradigms in the Hadoop cluster – here are some examples:

  • Ad-hoc querying: The SQL language is widely known from relational databases, and many users would like to query their big data using SQL. Apache Hive [3] can execute a SQL query as a series of Map/Reduce jobs, but it has shortcomings in terms of performance. Recently, some new approaches such as Apache Tajo [4], Facebook's Presto [5] and Cloudera's Impala [6] drastically improve the performance, but require running services other than Map/Reduce in the Hadoop cluster.

  • Real-time Stream Processing: Due to their batch nature, Map/Reduce jobs have a high latency before results are available, often several hours. Many applications require much lower latencies. For example, a fraud detection algorithm should deliver its results much faster - if possible in real-time. Examples of real-time engines include Apache Storm [7] and Apache Samza [8]. Ideally such an engine could run in the same cluster as the Map/Reduce jobs that process the same data, to avoid duplication of data and infrastructure.

  • Message-Passing (MPI): MPI [9] is another scalable approach to big data analysis - It is a stateful process that runs on each node of a distributed network. The processes communicate with each other by sending messages, and alter their state based on the messages they receive. For example, it has been shown [10] that Google's PageRank algorithm can be approximated in near real-time by an MPI algorithm.

  • Distributed Testing: When a cluster is not fully utilized, it makes sense to use its spare resources for other purposes such as testing. For example, running all tests for the Hadoop distribution takes many hours on a single machine. If these tests can be parallelized and run on many machines concurrently, the test time can be greatly reduced. Another example of a distributed test is a load test for a web service that sends a high number of requests concurrently from many machines.
  • You name it.

Is it possible to run such non-Map/Reduce jobs on a Hadoop cluster? For the longest time, the answer to all these questions was: No! Or, only if one had a PhD in distributed systems engineering, and figured out a way to "disguise" the jobs as Map/Reduce. This was mainly due to the fact that in Hadoop, the cluster resource management and the Map/Reduce engine were very tightly integrated with each other.

The good news is that the latest version of Hadoop decouples these two responsibilities. Hadoop 2.0's new resource manager YARN allows the possibility of using the Hadoop cluster for any computing needs, by programming against YARN’s interfaces and protocols. However, YARN's programming model is complex and poses a steep learning curve to developers who do not have experience with Hadoop.

Fortunately, this all changes with the recently incubated Apache Twill, which exposes YARN's power with a Java thread-like programming model. Twill also implements commonly needed patterns of distributed applications such as central log collection, service discovery and life cycle management. Read on to learn more about the history of Hadoop, YARN and how to develop distributed applications using Twill.

Hadoop and YARN

In original Hadoop, the cluster management is tightly coupled with the Map/Reduce programming paradigm. A Map/Reduce job analyzes data by dividing the work between a number of specialized tasks called mappers and reducers.

Each task runs on one of the machines of the cluster, and each machine has a limited number of slots for running tasks concurrently. Hadoop's job tracker daemon is responsible for both managing the cluster's resources and driving the execution of the Map/Reduce job: it reserves and schedules slots for all tasks, configures, runs and monitors each task, and if a task fails, it allocates a new slot and reattempts the task. After a task finishes, the job tracker cleans up temporary resources and releases the task's slot to make it available for other jobs.

There are two major issues with this approach: It limits scalability, because the single job tracker daemon becomes a bottleneck and single point of failure for the many jobs in a large cluster. Additionally, specializing the execution logic to Map/Reduce makes it difficult to use the cluster for other distributed programing paradigms - such jobs have to "disguise" themselves as mappers and reducers in order to be able to run.

Hadoop 2.0 changes all of this. It has a new resource manager named YARN [11] that is decoupled from job execution. In YARN, the compute resources of a cluster are organized into "containers", where each machine can host multiple containers at the same time.

The Resource Manager (RM) maintains the state of all active containers as well as the available capacity of all nodes. In addition, every machine runs a Node Manager that is responsible for launching and monitoring containers on that machine. Every job is controlled by a separate task, the Application Master (AM).

The AM itself runs in a container and communicates with the RM to request, receive and release containers for the tasks of the application, and with the Node Managers to launch tasks in those containers. It makes all logical decisions about the execution of the application, such as the number of tasks to run, the distribution of work among the tasks, the order of execution of tasks, etc.

This new architecture delivers better scalability because there is no single bottleneck in the management of job execution, and it provides more flexibility as to what types of applications can run in the cluster. The AM is now part of the application code, and hence every application can have its own, custom AM - the Map/Reduce AM is simply provided out of the box as part of the Hadoop distribution.

Anatomy of a YARN application

A YARN application consists of three parts: A YARN client, an application master, and the actual application tasks. Although the tasks are typically programmed in Java, a task can be an arbitrary shell command, an executable compiled from C++ code, or a Python script. The AM allocates containers for the applications tasks and controls the execution. It must continuously communicate with the RM to send a heartbeat signal. Via the same protocol, the AM can request and receive more containers from the RM, return containers that it does not need any longer, and receive notification about containers that have terminated.

The YARN client initiates the job, requesting the RM to start the AM. While this appears to be a simple exercise, it actually turns out to be quite complex: Before the AM can be started, the client must make sure that all required resources (code, libraries, executables, configuration, etc.) are available in the container that will run the AM.

How does that happen? Hadoop uses a distributed file system (HDFS) that is shared between all the machines in the cluster. The YARN client copies all required files to HDFS and informs the RM of the location. The files are then available to all machines of the cluster. Whenever a Node Manager starts a task, it copies all of the task's resources to a working directory on the local file system.

Let's take a closer look at how the YARN client submits a job: First it copies all of the tasks resources to a location in HDFS. Then it connects to the RM to request an application id. Using this application id, it then submits a request to the RM to start the AM. This request contains:

  • The application id.
  • The HDFS locations of all resources.
  • Environment settings for the AM such as the class path.
  • Resource limits for the AM, for example, how much memory it is allowed to use.
  • The shell command to execute the AM. If the AM is a Java program, this command would be java. This command is also responsible - if desired - for redirecting standard output/error to files by using shell syntax.

After the request is submitted, the AM is started asynchronously, and the client can now check the status of the application using the application id. The RM will allocate a container on a machine that has enough capacity to run the AM. Then it requests the Node Manager of that machine to start the AM. The Node Manager creates a working directory for the AM, copies all the local resources from HDFS, executes the Application Master using the provided shell command, and starts monitoring it.

Now the AM is running in a container, but none of the actual application's tasks has been started. This is the task of the AM. It decides how many tasks to run and in what order, how many resources they are allowed to use, and how to deal with task failures. For every task it needs to start, the AM first requests a container from the RM. Then it starts the task in a way that is very similar to how the YARN client starts the AM, except that the AM sends its request directly to the Node Manager responsible for the allocated container. 

The AM must regularly communicate with the RM to send a heartbeat. This is important because if this heartbeat is not received for a long time, the RM will assume that the AM has crashed and kill all the (now orphaned) tasks that belong to the application.

In the same call used for the heartbeat, the AM can also request new containers and release containers that it no longer needs. In its response to the heartbeat, the RM informs the AM of new containers that have been allocated for it, and it also informs the AM of all containers that have been terminated since the last request.

Note that the RM works asynchronously, and a request for new containers is typically not fulfilled by the response to that request. The AM must keep sending its heartbeat to the RM and will eventually receive the containers it requested. It is therefore important that the AM manages its state carefully, always remembering what pending requests for containers it has, what active containers it owns, etc.

Eventually, all the tasks of the application are finished, and the application manager releases all containers and terminates itself. The RM can now reallocate the capacity of the released containers to other applications.

Introducing Twill

YARN has been called the "Operation System of the Data Center", and it honors this title with very generic and powerful APIs and protocols. But with that power comes complexity, and learning YARN is a steep uphill climb.

The sample application that is bundled with YARN, which does nothing but execute a simple shell command in each container, sums up to more than a thousand lines of code. And due to the asynchronous nature of the YARN protocol, the AM must always carefully remember all containers it has requested, received and started, a complex task that opens up high chances for coding mistakes.

Many applications, however, do not require all the options of YARN. For example, in order to use the Hadoop cluster for a distributed load test - all that is needed is a number of agents - each doing only one thing - sending as many requests as possible to the service being tested.

If this was being implemented as a single JVM, each agent would run as a Java thread, and Java executor services make it extremely easy to manage that. Can distributed applications be made as easy as Java threads (at least for some applications)? The answer is: Yes, with Apache Twill [12], which was incubated at Apache in late 2013 with the goal of reducing the complexity of developing distributed Java applications with YARN.

Analogy to Standalone Java Application

It is not difficult to see that a YARN application is very similar to what many Java developers are familiar with: Standalone Java Application.

YARN                                               Standalone Java App        
YARN Client Java command that provides options and arguments to the application.
Application Master Main() method preparing threads for the application.
Container Task Runnable implementation, where each runs in its own thread.

This close resemblance suggests that it could be as easy to write a distributed application as it is to write a threaded Java application. This thought inspired the development of Twill with the goal of enabling every Java programmer to write distributed applications.

Hello World

With Twill, all tasks are implemented as a TwillRunnable, which is essentially a Java Runnable, and the application is given a specification of how many runnables to start, with rules for ordering tasks and constraints for the resources each task may consume. Twill then uses a generic YARN client and Application Master to allocate the containers, start all runnables, monitor them and if necessary restart them - the only API that the developer needs to understand is the Twill interface, and the above mentioned distributed shell example shrinks to less than 50 lines of code.

For example, let's take a look at the minimal Twill application – a classic Hello-World [13]. We start with implementing the runnable that will be executed:

Listing 1
public class HelloWorld {   
  public static Logger LOG = LoggerFactory.getLogger(HelloWorld.class);
 
  public static class HelloWorldRunnable extends AbstractTwillRunnable {
    @Override
    public void run() {
      LOG.info("Hello World. My first distributed application.");
    }
  }
 
End

To submit this application with Twill, all we need is to create and start a runner service and submit the runnable. We also make use of Twill's central log collection and attach a log handler that prints all log messages to standard out (more about that later). The result of starting the app is a controller object that can be used to manage the application from now on. In this example, all we do it wait for completion.

Listing 2
  public static void main(String[] args) throws Exception {
    TwillRunnerService twillRunner =
       new YarnTwillRunnerService(
        new YarnConfiguration(), "localhost:2181");
    twillRunner.startAndWait();
 
    TwillController controller =
      twillRunner.prepare(new HelloWorldRunnable())
                 .addLogHandler(
                   new PrinterLogHandler(
                     new PrintWriter(System.out, true)))
                 .start();
 
    Services.getCompletionFuture(controller).get();
  }       
}
End

Distributed Application Patterns

Twill also addresses a number of common patterns of distributed applications:

  • Persistence of State: Twill saves the state and configuration of every application in ZooKeeper. That means that even if the client terminates or crashes, a new client can always reconnect to a running Twill application. Apache ZooKeeper [14] is a highly reliable distributed coordination and synchronization service.
  • Real-time Log Collection: The tasks of an application will typically emit log messages. For ease of operations, it is desirable to make all logs available for inspection and search in a central place at that time that they happen. YARN does not provide any assistance with this kind of log collection, and application developers must implement it themselves. Again, YARN leaves it to the developer to implement log collection. Twill addresses that by embedding Apache Kafka in each application. Kafka [15] is a scalable, distributed publish/subscribe messaging system. Twill has an option to start a Kafka instance along with the AM. It then injects a log appender into each runnable that publishes the messages to Kafka. From there, all logs can be retrieved using the Twill client, as shown in Figure 3.

  • Application Life Cycle: Once an application is running, YARN has only two options: either wait for it to finish, or end it by killing all its containers. But often it is desirable to shut the application down gracefully, for example allowing it to persist its state before quitting. Or sometimes it is necessary for an application to pause in order to resume at a later time.
  • It can also be useful to reconfigure a running application by signaling all of its tasks to re-read their configuration. For example, an application that uses a machine-learned model for classification may need to reread the model every time the model is rebuilt (by an independent process). Twill allows these kinds of actions with a simple, ZooKeeper-based message protocol between the Twill client and the runnables.
  • Service Discovery: Running a service in YARN means that the service may start on any of the machines of the cluster. Clients of the service need to find out what IP addresses or host names to connect to, hence the service must announce its actual location to the clients after it starts up. This is tricky to implement, because the location must also be withdrawn after an instance of the service terminates or crashes. Using ZooKeeper, Twill implements a simple service discovery protocol that allows clients to find the service using a Twill client.

Programming with Twill: Resources specification

There are situations where you will need more than one instance of your application. For example, when using Twill to run a cluster of Jetty Web Servers. Moreover, different applications would have different requirements on system resources, such as CPU and memory. By default, Twill starts one container per TwillRunnable with 1 virtual core and 512MB of memory. You could, however, customize it when starting your application through TwillRunner. For example, you can specify 5 instances, each with 2 virtual cores and 1GB of memory by doing this:

Listing 3
TwillRunner twillRunner = ... 
twillRunner.prepare(new JettyServerTwillRunnable(),
                     ResourceSpecification.Builder.with()
                                          .setVirtualCores(2)
                                          .setMemory(1, SizeUnit.GIGA)
                                          .setInstances(5)
                                          .build())
            .start();
End

Notice that this specifies virtual cores and not actual CPU cores. The mapping is defined in the YARN configuration - see [16] for more details.

Multiple runnables

Just like you can have multiple threads doing different things, you can have multiple TwillRunnable in your application. All you need to do is implement the TwillApplication interface and specify the runnables that constitute your application. Say your application contains a Jetty server and a log processing daemon, your Twill Application will look something like this:

Listing 4
public class MyTwillApplication implements TwillApplication {
  @Override
  public TwillSpecification configure() {
    return TwillSpecification.Builder.with()
       .setName("MyApplication")
       .withRunnable()
         .add("jetty", new JettyServerTwillRunnable()).noLocalFiles()
         .add("logdaemon", new LogProcessorTwillRunnable()).noLocalFiles()
       .anyOrder()
       .build();
  }
}
End

Note that the call to anyOrder() specifies that every TwillRunnable in this application can be started in no particular order. If there are dependencies between runnables, you can specify the ordering like this:

Listing 5
// To have the log processing daemon start before the Jetty server
  .withRunnable()
    .add("jetty", new JettyServerTwillRunnable()).noLocalFiles()
    .add("logdaemon", new LogProcessorTwillRunnable()).noLocalFiles()
  .withOrder()
    .begin("logdaemon")
    .nextWhenStarted("jetty")
End

File localization

One nice feature in YARN is that it can copy HDFS files to a container’s working directory on local disk, which is an efficient way to distribute files needed by containers across the cluster. Here is an example of how to do so in Twill:

Listing 6
  .withRunnable()
    .add("jetty", new JettyServerTwillRunnable())
      .withLocalFiles()
        // Distribute local file "index.html" to the Jetty server
        .add("index.html", new File("index.html"))
        // Distribute and expand contents in local archive "images.tgz"
        // to the container’s "images" directory
        .add("images", new File("images.tgz"), true)
        // Distribute HDFS file "site-script.js" to a file named "script.js".
        // "fs" is the Hadoop FileSystem object
        .add("script.js", fs.resolvePath(new Path("site-script.js")).toUri())      .apply()
End

In Twill, the file that needs to be localized doesn’t need to be on HDFS. It can come from a local file, or even an external URL. Twill also supports archive auto-expansion and file rename. If no file needs to be localized, simply call noLocalFile() when adding the TwillRunnable.

Arguments

Just like a standalone application, you may want to pass arguments to alter the behavior of your application. In Twill, you can pass arguments to the individual TwillRunnable as well as to the whole TwillApplication. Arguments are passed when launching the application through TwillRunner:

Listing 7
 TwillRunner twillRunner = ... 
twillRunner.prepare(new MyTwillApplication())
           // Application arguments will be visible to all runnables
           .withApplicationArguments("--debug")
            // Arguments only visible to instance of a given runnable.
           .withArguments("jetty", "--threads", "100")
           .withArguments("logdaemon", "--retain-logs", "5")
End

The arguments can be accessed using the TwillContext object in TwillRunnable. Application arguments are retrieved by calling TwillContext.getApplicationArguments(), while runnable arguments are available through the TwillContext.getArguments() call.

Service discovery

When launching your application in YARN, you don’t know where your containers will be running, and the hosts can change over time due to container or machine failure. Twill has built-in service discovery support - you can announce a named service from runnables and later on discover their locations. For example, you can start the Jetty server instances on a random port and announce the address and port of the service.

Listing 8
class JettyServerTwillRunnable extends AbstractTwillRunnable() {
  @Override   
  public void initialize(TwillContext context) {
    // Starts Jetty on random port
    int port = startJetty();
    context.announce("jetty", port);
  } 
}
End

You can then build a router layer to route those requests to the cluster. The router will look something like this:

Listing 9
 TwillController controller = ... 
ServiceDiscovered jettyService = controller.discoverService("jetty");     
// The ServiceDiscovered maintains a live list of service endpoints. 
// Everytime the .iterator() is invoked it gives the latest list of endpoints. Iterator<Discoverable> itor = jettyService.iterator(); 
// Pick an endpoint from the list of endpoints. 
// ...
End

Controlling live applications

As mentioned in the Hello-World example, you can control a running application using TwillController. You can change the number of instances of a runnable by simply doing this:

Listing 10
TwillController controller = ... 
ListenableFuture<Integer> changeComplete = 
    controller.changeInstances("jetty", 10);
End

You can then either block until the change is completed or observe the completion asynchronously by listening on the future.

Future

Twill is a powerful distributed application platform. Yet it was only incubated in Apache recently, and it is still in its early stages. It is clear that many more features have to follow, such as:

  • Non-Java applications - At this time Twill is a pure Java framework. But there are many systems engineers who believe that other programming languages ultimately deliver better performance. Allowing the runnables to be written in different languages would open up Twill to those developers, while still performing the (higher-level) tasks of management and control in Java. This will also allow running existing distributed engines that were not written in Java, such as Impala, or not written for YARN, such as HBase or Cassandra (see [18] for an alternative approach).
  • Suspending applications - Some applications are not time-critical and can be run when the load in the cluster is low (for example, distributed testing). This requires the ability to suspend the application when the cluster load goes up, to free the resources until the load is low enough to resume. While this appears simple, it implies the ability of the application to save its state when suspended and restoring its state upon restart. It also requires restarting the application in containers in locality with the original containers. For example, HBase's region servers store their files in HDFS, and it is crucial for performance that a region server is resumed on the same machine (that ensures a local replica of the HDFS files).
  • Central metrics collection: Metrics are crucial in understanding the behavior and performance of distributed systems. Twill can use the same Kafka instance that it already uses for log collection to also collect metrics from all the containers and make them available through the controller.

An open source project can only thrive with an active community. Many new features and requirements will come from Twill's users, and that journey has just begun at Apache.

Conclusion

With YARN as its "operating system", a Hadoop cluster turns into a virtual compute server. YARN makes it possible to develop massively parallel applications and run them on the Hadoop cluster. Twill adds simplicity to the power of YARN and transforms every Java programmer into a distributed data application developer. It‘s a revolution - and it has only just got started!

References

[1] http://hadoop.apache.org/

[2] http://research.google.com/archive/mapreduce.html

[3] http://hive.apache.org/

[4] http://tajo.incubator.apache.org/

[5] http://prestodb.io/

[6] http://www.cloudera.com/content/cloudera/en/products-and-services/cdh/impala.html

[7] http://wiki.apache.org/incubator/StormProposal

[8] http://samza.incubator.apache.org/

[9] http://en.wikipedia.org/wiki/Message_Passing_Interface

[10] http://www2003.org/cdrom/papers/refereed/p007/p7-abiteboul.html

[11] http://hortonworks.com/hadoop/yarn/

[12] http://twill.incubator.apache.org/

 [13] http://en.wikipedia.org/wiki/Hello_world_program

[14] http://zookeeper.apache.org/

[15] http://kafka.apache.org/

[16] http://riccomini.name/posts/hadoop/2013-06-14-yarn-with-cgroups/YARN virtual core support

[17] http://incubator.apache.org/

[18] http://wiki.apache.org/incubator/HoyaProposal

 

 

 

 

 


 

 

 

 

 

 

 

 

 


 
Andreas Neumann & Terence Yin

What do you think?

JAX Magazine - 2014 - 06 Exclucively for iPad users JAX Magazine on Android

Comments

Latest opinions