ETL the easy way

Moving Data from Hadoop into Solr or Data Warehouses

WolfgangHoschek
hadoop-coffee

Cloudera’s Wolfgang Hoschek introduces a new open source library, Morphlines, for creating MapReduce pipelines with minimum fuss.

Here’s a guest post by Cloudera’s Wolfgang Hoschek, who’s
today introducing a new open source library: Morphlines, for
creating MapReduce pipelines with minimum fuss. 

When bringing Apache Hadoop infrastructure into
your environment, integration with existing systems is a paramount
concern. That integration, of course, most commonly takes the form
of building new data movement and transformation pipelines between
the Hadoop Distributed File System (HDFS) and other data endpoints,
such as relational data warehouses, distributed data stores (such
as Apache HBase), and Apache Solr-based enterprise search
servers.

Building these pipelines has long been a common
task for Java developers (long before Hadoop even existed, in fact)
– so much so that writing custom data movement applications in Java
is often considered a core competency. Now, with Hadoop penetrating
the data center, building new pipelines that bring Hadoop into the
data movement network (or integrating a Hadoop cluster with
existing ones), this function is reaching a new apex in
popularity.

For those developers, however, writing these
custom apps is very often an exercise in reinventing the wheel.
Because data formats, sources, and transformation requirements are
so varied, re-purposing code is rarely an option. Furthermore, when
Hadoop is involved as an endpoint, it’s necessary to know your way
around the MapReduce API and be familiar with MapReduce
programming.

In this article, I will describe a new open source
library, Cloudera Morphlines, that reduces the time and skills
necessary to integrate, build, and change Hadoop processing
applications that extract, transform, and load data into HDFS,
Solr, HBase, enterprise data warehouses, or analytic applications.
If you want to integrate, build, or facilitate transformation
pipelines without a lot of Java programming and without
substantial MapReduce skills, and get the job done with a minimum
amount of fuss and support costs, Morphlines is for you.

What is the Morphlines Use Case?

Earlier this year, Cloudera engineers developed
Morphlines in support of the new

Cloudera Search
offering (in beta, at the
time of writing), which unites Solr with Hadoop to enable
natural-language search across data in HDFS or HBase. Since the
launch of Cloudera Search, Morphlines development has graduated
into the 
Cloudera
Development Kit
 (CDK) in order to make the
technology accessible to a wider range of users, contributors,
integrators, and products beyond Search. The CDK is a set of
libraries, tools, examples, and documentation focused on making it
easier to build systems on top of the Hadoop ecosystem (and hence a
perfect home for Morphlines).
 

Morphlines powers a variety of ETL data flows
from Apache Flume and MapReduce into Solr or data warehouses and
other end points. Flume covers the real time case, whereas
MapReduce covers the batch processing case.

What is a Morphline?

A “morphline” is a rich configuration file that
makes it easy to define a transformation chain that consumes any
kind of data from any kind of data source, processes the data, and
loads the results into a Hadoop component. It replaces Java
programming with simple configuration steps, and correspondingly
reduces the cost and integration effort associated with developing,
maintaining, or integrating custom ETL projects.

Morphlines is a library, embeddable in any Java codebase.
An individual 
morphline is an
in-memory container of transformation commands. Commands are
plugins to a morphline that perform tasks such as loading, parsing,
transforming, or otherwise processing a single record.
record is an in-memory data
structure of name-value pairs with optional blob attachments or
POJO attachments. The framework is extensible and integrates
existing functionality and third-party systems in a simple and
straightforward manner.

Currently, Morphlines works with multiple indexing workloads,
but could easily be embedded into Apache Crunch, Apache HBase,
Cloudera Impala, Apache Pig, Apache Hive, or Apache
Sqoop. Your feedback is welcome and in fact crucial, so please
let us know where you see more opportunities for integration going
forward! 

Next, let’s take a look at the Morphlines
processing and data models.

Processing Model

Morphlines can be seen as an evolution of Unix
pipelines where the data model is generalized to work with streams
of generic records, including arbitrary binary payloads. A
morphline is an efficient way to consume records (e.g. Flume
events, HDFS files, RDBMS tables, or Apache Avro objects),
turn them into a stream of records, and pipe the stream of records
through a set of easily configurable transformations on the way to
a target application such as Solr.

The Morphlines framework ships with a set of
frequently used high-level transformation and I/O commands that can
be combined in application-specific ways. The plugin system allows
the adding of new transformations and I/O commands and integrates
existing functionality and third-party systems in a straightforward
manner.

This integration enables rapid Hadoop ETL
application prototyping, complex stream and event processing in
real time, flexible log file analysis, integration of multiple
heterogeneous input schemas and file formats, as well as reuse of
ETL logic building blocks across Hadoop ETL
applications.

The CDK ships an efficient runtime that compiles
a morphline on the fly. The runtime executes all commands of a
given morphline in the same thread. Piping a record from one
command to another implies just a cheap Java method call. In
particular, there are no queues, no handoffs among threads, no
context switches, and no serialization between commands, which
minimizes performance overhead.

Data Model

Morphlines manipulate continuous or arbitrarily
large streams of records. A command transforms a record into zero
or more records. The data model can be described as follows: a
record is a set of named fields where each field has an ordered
list of one or more values. A value can be any Java Object. That
is, a record is essentially a hash table where each hash table
entry contains a String key and a list of Java Objects as values.
Note that a field can have multiple values and any two records need
not use common field names. This flexible data model corresponds
exactly to the characteristics of the Solr/Lucene data
model.

Not only structured data, but also binary data,
can be passed into and processed by a morphline. By convention, a
record can contain an optional field named _attachment_body, which
can be a Java
java.io.InputStream or Java
byte[]. Optionally, such binary input data can be characterized in
more detail by setting the fields named

_attachment_mimetype (such as
application/pdf)
and
_attachment_charset (such as “UTF-8”)
and
_attachment_name (such as “cars.pdf”),
which assists in detecting and parsing the data type. (This is
similar to the way email works.)

This generic data model is useful to support a
wide range of applications. For example,
the 
Apache Flume Morphline Solr
Sink
 embeds the morphline library and executes a
morphline to convert Flume events into morphline records and load
them into Solr. This sink fills the body of the Flume event into
the _attachment_body field of the morphline record, as well as
copies the headers of the Flume event into record fields of the
same name.

Commands

Commands can access all record fields. For
example, commands can parse fields, add fields, remove fields,
rename fields, find and replace values of existing fields, split a
field into multiple fields, split a field into multiple values, or
drop records. Often, regular expression-based pattern matching is
used as part of the process of acting on fields. The output records
of a command are passed to the next command in the chain. A command
has a Boolean return code, indicating success or
failure.

For example, consider the case of a multi-line
input record: a command could take this multi-line input record and
divide the single record into multiple output records, one for each
line. This output could then later be further divided using regular
expression commands, splitting each single line record out into
multiple fields in application specific ways.

A command can extract, clean, transform, join,
integrate, enrich and decorate records in many other ways. For
example, a command might join records with external data sources
such as relational databases, key-value stores, local files or IP
Geo lookup tables. It might also perform tasks such as DNS
resolution, expand shortened URLs, fetch linked metadata from
social networks, perform sentiment analysis and annotate the record
accordingly, continuously maintain statistics for analytics over
sliding windows, or compute exact or approximate distinct values
and quantiles.

A command can also consume records and pass them
to external systems. For example, a command might load records into
Apache Solr or write them to a MapReduce Reducer, or load them into
an Enterprise Data Warehouse or a Key Value store such as HBase, or
pass them into an online dashboard, or write them to
HDFS.

The CDK includes several maven modules that
contain morphline commands for flexible log file analysis,
single-line records, multi-line records, CSV files, JSON, commonly
used HDFS file formats Avro and Hadoop Sequence Files, regular
expression based pattern matching and extraction, operations on
record fields for assignment and comparison, operations on record
fields with list and set semantics, if-then-else conditionals,
string and timestamp conversions, scripting support for dynamic
java code, a small rules engine, logging, metrics and counters,
integration with Solr including SolrCloud, integration and access
to the large set of file formats supported by the Apache Tika
parser library, auto-detection of MIME types from binary data using
Tika, and decompression and unpacking of arbitrarily nested
container file formats, among others. These are described in detail
in the 
Cloudera
Morphlines Reference Guide
.

Embedding into a Host System

A morphline has no notion of persistence,
durability, distributed computing, or node failover — it’s
basically just a chain of in-memory transformations in the current
thread. There is no need for a morphline to manage multiple
processes, nodes, or threads because this is already addressed by
host systems such as MapReduce, Flume, or Storm. However, a
morphline does support passing notifications on the control plane
to command subtrees. Such notifications include BEGIN_TRANSACTION,
COMMIT_TRANSACTION, ROLLBACK_TRANSACTION, and SHUTDOWN.

Syntax

The morphline configuration file is implemented
using the HOCON format (Human Optimized Config Object Notation)
developed by typesafe.com. HOCON is basically JSON slightly
adjusted for the configuration file use case. HOCON syntax is
defined at the 
HOCON
github page
.

Conclusion

If you’ve got any questions, please do ask us. The best
place is to do that is in the

Community Forum for CDK
. Alternatively, if
you are trying out Cloudera Search, post your
questions 
here.

Author
WolfgangHoschek
Wolfgang Hoschek is a Software Engineer on the Platform team and the lead developer on Morphlines.
Comments
comments powered by Disqus