Weaving YARN

Developing Distributed Applications with Apache Twill

AndreasNeumannTerenceYin
twill

For the past couple of decades, Java programmers have been building vast numbers of web apps generating petabytes of log data.

For the past couple of decades, Java
programmers have been building vast numbers of web apps generating
petabytes of log data. Analyzing these web logs can create real
business value -  for example, by optimizing the web app for
observed user behavior patterns, or by personalizing the user
experience within the app. But until recently, companies have
discarded their web logs because they were either too hard to
analyze, or too expensive to store with traditional relational
databases.

Apache Hadoop [1] is a free, open source
technology that runs on commodity hardware clusters. Its
distributed file system (HDFS) makes it cheap to store large
amounts of data, and its scalable Map/Reduce analysis engine makes
it possible to extract insights from that data. Map/Reduce [2] is a
flavor of batch-driven data analysis, where the input data is
partitioned into smaller batches that can be processed in parallel
across many machines in the Hadoop cluster. But Map/Reduce, while
powerful enough to express many data analysis algorithms, is not
always the optimal choice of programming paradigm. It‘s often
desirable to run other computation paradigms in the Hadoop cluster
– here are some examples:

  • Ad-hoc querying: The SQL language is widely known from
    relational databases, and many users would like to query their big
    data using SQL. Apache Hive [3] can execute a SQL query as a series
    of Map/Reduce jobs, but it has shortcomings in terms of
    performance. Recently, some new approaches such as Apache Tajo [4],
    Facebook’s Presto [5] and Cloudera’s Impala [6] drastically improve
    the performance, but require running services other than Map/Reduce
    in the Hadoop cluster.

  • Real-time Stream Processing: Due to their batch
    nature, Map/Reduce jobs have a high latency before results are
    available, often several hours. Many applications require much
    lower latencies. For example, a fraud detection algorithm should
    deliver its results much faster – if possible in real-time.
    Examples of real-time engines include Apache Storm [7] and Apache
    Samza [8]. Ideally such an engine could run in the same cluster as
    the Map/Reduce jobs that process the same data, to avoid
    duplication of data and infrastructure.

  • Message-Passing (MPI): MPI [9] is another scalable
    approach to big data analysis – It is a stateful process that runs
    on each node of a distributed network. The processes communicate
    with each other by sending messages, and alter their state based on
    the messages they receive. For example, it has been shown [10] that
    Google’s PageRank algorithm can be approximated in near real-time
    by an MPI algorithm.

  • Distributed Testing: When a cluster is not fully
    utilized, it makes sense to use its spare resources for other
    purposes such as testing. For example, running all tests for the
    Hadoop distribution takes many hours on a single machine. If these
    tests can be parallelized and run on many machines concurrently,
    the test time can be greatly reduced. Another example of a
    distributed test is a load test for a web service that sends a high
    number of requests concurrently from many machines.
  • You name it.

Is it possible to run such non-Map/Reduce jobs
on a Hadoop cluster? For the longest time, the answer to all these
questions was: No! Or, only if one had a PhD in distributed systems
engineering, and figured out a way to “disguise” the jobs as
Map/Reduce. This was mainly due to the fact that in Hadoop, the
cluster resource management and the Map/Reduce engine were very
tightly integrated with each other.

The good news is that the latest version of
Hadoop decouples these two responsibilities. Hadoop 2.0’s new
resource manager YARN allows the possibility of using the Hadoop
cluster for any computing needs, by programming against YARN’s
interfaces and protocols. However, YARN’s programming model is
complex and poses a steep learning curve to developers who do not
have experience with Hadoop.

Fortunately, this all changes with the recently
incubated Apache Twill, which exposes YARN’s power with a Java
thread-like programming model. Twill also implements commonly
needed patterns of distributed applications such as central log
collection, service discovery and life cycle management. Read on to
learn more about the history of Hadoop, YARN and how to develop
distributed applications using Twill.

Hadoop and YARN

In original Hadoop, the cluster management is
tightly coupled with the Map/Reduce programming paradigm. A
Map/Reduce job analyzes data by dividing the work between a number
of specialized tasks called mappers and reducers.

Each task runs on one of the machines of the
cluster, and each machine has a limited number of slots for running
tasks concurrently. Hadoop’s job tracker daemon is responsible for
both managing the cluster’s resources and driving the execution of
the Map/Reduce job: it reserves and schedules slots for all tasks,
configures, runs and monitors each task, and if a task fails, it
allocates a new slot and reattempts the task. After a task
finishes, the job tracker cleans up temporary resources and
releases the task’s slot to make it available for other jobs.

There are two major issues with this approach:
It limits scalability, because the single job tracker daemon
becomes a bottleneck and single point of failure for the many jobs
in a large cluster. Additionally, specializing the execution logic
to Map/Reduce makes it difficult to use the cluster for other
distributed programing paradigms – such jobs have to “disguise”
themselves as mappers and reducers in order to be able to run.

Hadoop 2.0 changes all of this. It has a new
resource manager named YARN [11] that is decoupled from job
execution. In YARN, the compute resources of a cluster are
organized into “containers”, where each machine can host multiple
containers at the same time.

The Resource Manager (RM) maintains the state
of all active containers as well as the available capacity of all
nodes. In addition, every machine runs a Node Manager that is
responsible for launching and monitoring containers on that
machine. Every job is controlled by a separate task, the
Application Master (AM).

The AM itself runs in a container and
communicates with the RM to request, receive and release containers
for the tasks of the application, and with the Node Managers to
launch tasks in those containers. It makes all logical decisions
about the execution of the application, such as the number of tasks
to run, the distribution of work among the tasks, the order of
execution of tasks, etc.

This new architecture delivers better
scalability because there is no single bottleneck in the management
of job execution, and it provides more flexibility as to what types
of applications can run in the cluster. The AM is now part of the
application code, and hence every application can have its own,
custom AM – the Map/Reduce AM is simply provided out of the box as
part of the Hadoop distribution.

Anatomy of a YARN
application

A YARN application consists of three parts: A
YARN client, an application master, and the actual application
tasks. Although the tasks are typically programmed in Java, a task
can be an arbitrary shell command, an executable compiled from C++
code, or a Python script. The AM allocates containers for the
applications tasks and controls the execution. It must continuously
communicate with the RM to send a heartbeat signal. Via the same
protocol, the AM can request and receive more containers from the
RM, return containers that it does not need any longer, and receive
notification about containers that have terminated.

The YARN client initiates the job, requesting
the RM to start the AM. While this appears to be a simple exercise,
it actually turns out to be quite complex: Before the AM can be
started, the client must make sure that all required resources
(code, libraries, executables, configuration, etc.) are available
in the container that will run the AM.

How does that happen? Hadoop uses a distributed
file system (HDFS) that is shared between all the machines in the
cluster. The YARN client copies all required files to HDFS and
informs the RM of the location. The files are then available to all
machines of the cluster. Whenever a Node Manager starts a task, it
copies all of the task’s resources to a working directory on the
local file system.

Let’s take a closer look at how the YARN client
submits a job: First it copies all of the tasks resources to a
location in HDFS. Then it connects to the RM to request an
application id. Using this application id, it then submits a
request to the RM to start the AM. This request contains:

  • The application id.
  • The HDFS locations of all resources.
  • Environment settings for the AM such as the class path.
  • Resource limits for the AM, for example, how much memory it is
    allowed to use.
  • The shell command to execute the AM. If the AM is a Java
    program, this command would be java. This command is also
    responsible – if desired – for redirecting standard output/error to
    files by using shell syntax.

After the request is submitted, the AM is
started asynchronously, and the client can now check the status of
the application using the application id. The RM will allocate a
container on a machine that has enough capacity to run the AM. Then
it requests the Node Manager of that machine to start the AM. The
Node Manager creates a working directory for the AM, copies all the
local resources from HDFS, executes the Application Master using
the provided shell command, and starts monitoring it.

Now the AM is running in a container, but none
of the actual application’s tasks has been started. This is the
task of the AM. It decides how many tasks to run and in what order,
how many resources they are allowed to use, and how to deal with
task failures. For every task it needs to start, the AM first
requests a container from the RM. Then it starts the task in a way
that is very similar to how the YARN client starts the AM, except
that the AM sends its request directly to the Node Manager
responsible for the allocated container. 

The AM must regularly communicate with the RM
to send a heartbeat. This is important because if this heartbeat is
not received for a long time, the RM will assume that the AM has
crashed and kill all the (now orphaned) tasks that belong to the
application.

In the same call used for the heartbeat, the AM
can also request new containers and release containers that it no
longer needs. In its response to the heartbeat, the RM informs the
AM of new containers that have been allocated for it, and it also
informs the AM of all containers that have been terminated since
the last request.

Note that the RM works asynchronously, and a
request for new containers is typically not fulfilled by the
response to that request. The AM must keep sending its heartbeat to
the RM and will eventually receive the containers it requested. It
is therefore important that the AM manages its state carefully,
always remembering what pending requests for containers it has,
what active containers it owns, etc.

Eventually, all the tasks of the application
are finished, and the application manager releases all containers
and terminates itself. The RM can now reallocate the capacity of
the released containers to other applications.

Introducing Twill

YARN has been called the “Operation System of
the Data Center”, and it honors this title with very generic and
powerful APIs and protocols. But with that power comes complexity,
and learning YARN is a steep uphill climb.

The sample application that is bundled with
YARN, which does nothing but execute a simple shell command in each
container, sums up to more than a thousand lines of code. And due
to the asynchronous nature of the YARN protocol, the AM must always
carefully remember all containers it has requested, received and
started, a complex task that opens up high chances for coding
mistakes.

Many applications, however, do not require all
the options of YARN. For example, in order to use the Hadoop
cluster for a distributed load test – all that is needed is a
number of agents – each doing only one thing – sending as many
requests as possible to the service being tested.

If this was being implemented as a single JVM,
each agent would run as a Java thread, and Java executor services
make it extremely easy to manage that. Can distributed applications
be made as easy as Java threads (at least for some applications)?
The answer is: Yes, with Apache Twill [12], which was incubated at
Apache in late 2013 with the goal of reducing the complexity of
developing distributed Java applications with YARN.

Analogy to Standalone Java
Application

It is not difficult to see that a YARN
application is very similar to what many Java developers are
familiar with: Standalone Java Application.

YARN            
                 
               
Standalone Java App      
 
YARN Client Java command that provides options and arguments to the
application.
Application Master Main() method preparing threads for the application.
Container Task Runnable implementation, where each runs in its own
thread.

This close resemblance suggests that it could
be as easy to write a distributed application as it is to write a
threaded Java application. This thought inspired the development of
Twill with the goal of enabling every Java programmer to write
distributed applications.

Hello World

With Twill, all tasks are implemented as a
TwillRunnable, which is essentially a Java Runnable, and the
application is given a specification of how many runnables to
start, with rules for ordering tasks and constraints for the
resources each task may consume. Twill then uses a generic YARN
client and Application Master to allocate the containers, start all
runnables, monitor them and if necessary restart them – the only
API that the developer needs to understand is the Twill interface,
and the above mentioned distributed shell example shrinks to less
than 50 lines of code.

For example, let’s take a look at the minimal
Twill application – a classic Hello-World [13]. We start with
implementing the runnable that will be executed:

Listing 1
public class HelloWorld {   
  public static Logger LOG = LoggerFactory.getLogger(HelloWorld.class);
 
  public static class HelloWorldRunnable extends AbstractTwillRunnable {
    @Override
    public void run() {
      LOG.info("Hello World. My first distributed application.");
    }
  }
 
End

To submit this application with Twill, all we need is to create
and start a runner service and submit the runnable. We also make
use of Twill’s central log collection and attach a log handler that
prints all log messages to standard out (more about that later).
The result of starting the app is a controller object that can be
used to manage the application from now on. In this example, all we
do it wait for completion.

Listing 2
  public static void main(String[] args) throws Exception {
    TwillRunnerService twillRunner =
       new YarnTwillRunnerService(
        new YarnConfiguration(), "localhost:2181");
    twillRunner.startAndWait();
 
    TwillController controller =
      twillRunner.prepare(new HelloWorldRunnable())
                 .addLogHandler(
                   new PrinterLogHandler(
                     new PrintWriter(System.out, true)))
                 .start();
 
    Services.getCompletionFuture(controller).get();
  }       
}
End

Distributed Application
Patterns

Twill also addresses a number of common
patterns of distributed applications:

  • Persistence of State: Twill saves the state and
    configuration of every application in ZooKeeper. That means that
    even if the client terminates or crashes, a new client can always
    reconnect to a running Twill application. Apache ZooKeeper [14] is
    a highly reliable distributed coordination and synchronization
    service.
  • Real-time Log Collection: The tasks of an application
    will typically emit log messages. For ease of operations, it is
    desirable to make all logs available for inspection and search in a
    central place at that time that they happen. YARN does not provide
    any assistance with this kind of log collection, and application
    developers must implement it themselves. Again, YARN leaves it to
    the developer to implement log collection. Twill addresses that by
    embedding Apache Kafka in each application. Kafka [15] is a
    scalable, distributed publish/subscribe messaging system. Twill has
    an option to start a Kafka instance along with the AM. It then
    injects a log appender into each runnable that publishes the
    messages to Kafka. From there, all logs can be retrieved using the
    Twill client, as shown in Figure 3.

  • Application Life Cycle: Once an application is
    running, YARN has only two options: either wait for it to finish,
    or end it by killing all its containers. But often it is desirable
    to shut the application down gracefully, for example allowing it to
    persist its state before quitting. Or sometimes it is necessary for
    an application to pause in order to resume at a later time.
  • It can also be useful to reconfigure a running application by
    signaling all of its tasks to re-read their configuration. For
    example, an application that uses a machine-learned model for
    classification may need to reread the model every time the model is
    rebuilt (by an independent process). Twill allows these kinds of
    actions with a simple, ZooKeeper-based message protocol between the
    Twill client and the runnables.
  • Service Discovery: Running a service in YARN means
    that the service may start on any of the machines of the cluster.
    Clients of the service need to find out what IP addresses or host
    names to connect to, hence the service must announce its actual
    location to the clients after it starts up. This is tricky to
    implement, because the location must also be withdrawn after an
    instance of the service terminates or crashes. Using ZooKeeper,
    Twill implements a simple service discovery protocol that allows
    clients to find the service using a Twill client.

Programming with Twill: Resources
specification

There are situations where you will need more
than one instance of your application. For example, when using
Twill to run a cluster of Jetty Web Servers. Moreover, different
applications would have different requirements on system resources,
such as CPU and memory. By default, Twill starts one container per
TwillRunnable with 1 virtual core and 512MB of memory. You could,
however, customize it when starting your application through
TwillRunner. For example, you can specify 5 instances, each with 2
virtual cores and 1GB of memory by doing this:

Listing 3
TwillRunner twillRunner = ... 
twillRunner.prepare(new JettyServerTwillRunnable(),
                     ResourceSpecification.Builder.with()
                                          .setVirtualCores(2)
                                          .setMemory(1, SizeUnit.GIGA)
                                          .setInstances(5)
                                          .build())
            .start();
End

Notice that this specifies virtual cores and not actual CPU
cores. The mapping is defined in the YARN configuration – see [16]
for more details.

Multiple runnables

Just like you can have multiple threads doing
different things, you can have multiple TwillRunnable in your
application. All you need to do is implement the TwillApplication
interface and specify the runnables that constitute your
application. Say your application contains a Jetty server and a log
processing daemon, your Twill Application will look something like
this:

Listing 4
public class MyTwillApplication implements TwillApplication {
  @Override
  public TwillSpecification configure() {
    return TwillSpecification.Builder.with()
       .setName("MyApplication")
       .withRunnable()
         .add("jetty", new JettyServerTwillRunnable()).noLocalFiles()
         .add("logdaemon", new LogProcessorTwillRunnable()).noLocalFiles()
       .anyOrder()
       .build();
  }
}
End

Note that the call to anyOrder() specifies that
every TwillRunnable in this application can be started in no
particular order. If there are dependencies between runnables, you
can specify the ordering like this:

Listing 5
// To have the log processing daemon start before the Jetty server
  .withRunnable()
    .add("jetty", new JettyServerTwillRunnable()).noLocalFiles()
    .add("logdaemon", new LogProcessorTwillRunnable()).noLocalFiles()
  .withOrder()
    .begin("logdaemon")
    .nextWhenStarted("jetty")
End

File localization

One nice feature in YARN is that it can copy
HDFS files to a container’s working directory on local disk, which
is an efficient way to distribute files needed by containers across
the cluster. Here is an example of how to do so in Twill:

Listing 6
  .withRunnable()
    .add("jetty", new JettyServerTwillRunnable())
      .withLocalFiles()
        // Distribute local file "index.html" to the Jetty server
        .add("index.html", new File("index.html"))
        // Distribute and expand contents in local archive "images.tgz"
        // to the container’s "images" directory
        .add("images", new File("images.tgz"), true)
        // Distribute HDFS file "site-script.js" to a file named "script.js".
        // "fs" is the Hadoop FileSystem object
        .add("script.js", fs.resolvePath(new Path("site-script.js")).toUri())      .apply()
End

In Twill, the file that needs to be localized
doesn’t need to be on HDFS. It can come from a local file, or even
an external URL. Twill also supports archive auto-expansion and
file rename. If no file needs to be localized, simply
call noLocalFile()
when adding the
TwillRunnable.

Arguments

Just like a standalone application, you may
want to pass arguments to alter the behavior of your application.
In Twill, you can pass arguments to the individual TwillRunnable as
well as to the whole TwillApplication. Arguments are passed when
launching the application through TwillRunner:

Listing 7
 TwillRunner twillRunner = ... 
twillRunner.prepare(new MyTwillApplication())
           // Application arguments will be visible to all runnables
           .withApplicationArguments("--debug")
            // Arguments only visible to instance of a given runnable.
           .withArguments("jetty", "--threads", "100")
           .withArguments("logdaemon", "--retain-logs", "5")
End

The arguments can be accessed using the
TwillContext object in TwillRunnable. Application arguments are
retrieved by calling TwillContext.getApplicationArguments(), while
runnable arguments are available through the
TwillContext.getArguments() call.

Service discovery

When launching your application in YARN, you don’t know where
your containers will be running, and the hosts can change over time
due to container or machine failure. Twill has built-in service
discovery support – you can announce a named service from runnables
and later on discover their locations. For example, you can start
the Jetty server instances on a random port and announce the
address and port of the service.

Listing 8
class JettyServerTwillRunnable extends AbstractTwillRunnable() {
  @Override   
  public void initialize(TwillContext context) {
    // Starts Jetty on random port
    int port = startJetty();
    context.announce("jetty", port);
  } 
}
End

You can then build a router layer to route
those requests to the cluster. The router will look something like
this:

Listing 9
 TwillController controller = ... 
ServiceDiscovered jettyService = controller.discoverService("jetty");     
// The ServiceDiscovered maintains a live list of service endpoints. 
// Everytime the .iterator() is invoked it gives the latest list of endpoints. Iterator<Discoverable> itor = jettyService.iterator(); 
// Pick an endpoint from the list of endpoints. 
// ...
End

Controlling live
applications

As mentioned in the Hello-World example, you
can control a running application using TwillController. You can
change the number of instances of a runnable by simply doing
this:

Listing 10
TwillController controller = ... 
ListenableFuture<Integer> changeComplete = 
    controller.changeInstances("jetty", 10);
End

You can then either block until the change is
completed or observe the completion asynchronously by listening on
the future.

Future

Twill is a powerful distributed application
platform. Yet it was only incubated in Apache recently, and it is
still in its early stages. It is clear that many more features have
to follow, such as:

  • Non-Java applications - At this time Twill is a pure
    Java framework. But there are many systems engineers who believe
    that other programming languages ultimately deliver better
    performance. Allowing the runnables to be written in different
    languages would open up Twill to those developers, while still
    performing the (higher-level) tasks of management and control in
    Java. This will also allow running existing distributed engines
    that were not written in Java, such as Impala, or not written for
    YARN, such as HBase or Cassandra (see [18] for an alternative
    approach).
  • Suspending applications - Some applications are not
    time-critical and can be run when the load in the cluster is low
    (for example, distributed testing). This requires the ability to
    suspend the application when the cluster load goes up, to free the
    resources until the load is low enough to resume. While this
    appears simple, it implies the ability of the application to save
    its state when suspended and restoring its state upon restart. It
    also requires restarting the application in containers in locality
    with the original containers. For example, HBase’s region servers
    store their files in HDFS, and it is crucial for performance that a
    region server is resumed on the same machine (that ensures a local
    replica of the HDFS files).
  • Central metrics collection: Metrics are crucial in
    understanding the behavior and performance of distributed systems.
    Twill can use the same Kafka instance that it already uses for log
    collection to also collect metrics from all the containers and make
    them available through the controller.

An open source project can only thrive with an
active community. Many new features and requirements will come from
Twill’s users, and that journey has just begun at Apache.

Conclusion

With YARN as its “operating system”, a Hadoop
cluster turns into a virtual compute server. YARN makes it possible
to develop massively parallel applications and run them on the
Hadoop cluster. Twill adds simplicity to the power of YARN and
transforms every Java programmer into a distributed data
application developer. It‘s a revolution – and it has only just got
started!

References

[1] http://hadoop.apache.org/

[2] http://research.google.com/archive/mapreduce.html

[3] http://hive.apache.org/

[4] http://tajo.incubator.apache.org/

[5] http://prestodb.io/

[6]

http://www.cloudera.com/content/cloudera/en/products-and-services/cdh/impala.html

[7]

http://wiki.apache.org/incubator/StormProposal

[8] http://samza.incubator.apache.org/

[9] http://en.wikipedia.org/wiki/Message_Passing_Interface

[10] http://www2003.org/cdrom/papers/refereed/p007/p7-abiteboul.html

[11] http://hortonworks.com/hadoop/yarn/

[12] http://twill.incubator.apache.org/

 [13] http://en.wikipedia.org/wiki/Hello_world_program

[14] http://zookeeper.apache.org/

[15] http://kafka.apache.org/

[16]

http://riccomini.name/posts/hadoop/2013-06-14-yarn-with-cgroups/YARN

virtual core support

[17] http://incubator.apache.org/

[18]

http://wiki.apache.org/incubator/HoyaProposal

 

 

 

 

 


 

 

 

 

 

 

 

 

 


 

Author

AndreasNeumannTerenceYin

All Posts by AndreasNeumannTerenceYin

Comments
comments powered by Disqus