‘coz Concurrency is Groovy!

Tutorial – GPars, making parallel systems Groovy and Java-friendly

VclavPech
Gpars

Václav Pech introduces us to his project GPars, an open source library designed for high-level concurrency in Groovy and Java

GPars is an open-source library
for high-level concurrency in Groovy (and Java). If you’ve ever
heard terms like actors, dataflow, or parallel collections and
wanted to try these concepts in a Java-friendly language, now you
have a chance. In this article, I plan to give you a quick overview
of the abstractions available in GPars. We’ll then look in more
detail at flow-based concurrent data processing, parallelizing
collection operations as well as running composed functions
asynchronously.

As multi-core chips are becoming
the norm in mainstream computers, tablets and phones, concurrent
programming is gaining importance. Unfortunately, the widespread
thread-based concurrency model that we know from Java doesn’t match
well with how the human brain works. Threads and locks introduce
too much non-determinism into the code, which frequently results in
subtle bugs that are hard to track down and fix. Such code can’t be
reliably tested or analysed. Inevitably, for concurrent programming
to be effective, we need to use more natural mental models.

Concurrency can be Groovy

Bringing intuitive concurrency
models to the mainstream is the challenge that GPars has ambitions
to help with. We took the well-known concurrency concepts, such as
actors, CSP, dataflow and others and implemented them in Java with
a tasty Groovy DSL topping giving the library a fluent and
easy-to-use flavour. Although targeting Groovy primarily, some of
the GPars abstractions can be used from Java directly. Thanks to
the interest of the Groovy community in concurrency and their
support to the project, GPars is currently a standard part of
Groovy distributions. No additional setup is required to get up and
running with concurrency in Groovy.

A loop considered deadly

I’d like you to stop for a moment
and think about the trivial exercises that computer science
students are typically assigned when learning to program. For
example, one such task is to find the maximum of a collection.
Potentially some complex metrics may be applied to make the
solution more computationally heavy. What algorithm comes to your
mind first?

Chances are high you’d propose an
iteration that would go through the collection and remember the
biggest element found so far. Once we hit the end of the
collection, the element we remember as the so-far biggest must be
the global maximum of the whole collection. Clear, simple and –
well, wrong! Keep on reading if you want to find out why.

Make your choice

There doesn’t seem to be a single
one-size-fits-all solution in the concurrency space. Multiple
paradigms have gradually emerged and despite some overlap, they are
each suitable to different types of problems. GPars itself started
in 2008 as a small Groovy library for parallel collection
processing. It added support for the actor model shortly after.
Over time other concurrency abstractions have been integrated.
Here’s a quick list of what is currently available in GPars version
0.12:

  • Parallel collections provide intuitive ways to parallelize
    processing of Java/Groovy collections, maps and all geometrically
    decomposable problems in general

  • Asynchronous functions enable Groovy closures to be run
    asynchronously and at the same time to orchestrate their mutual
    communication with little effort

  • Fork/Join empowers you to process recursive divide-and-conquer
    algorithms concurrently

  • Dataflow variables (aka Promises) offer a lightweight mechanism
    for inter-thread communication.

  • Dataflow channels and operators let you organize active
    data-transforming elements into highly concurrent data-processing
    pipelines and networks

  • CSP
    is a well-known concurrency model based in theoretical
    mathematics
    , which uses an abstraction of independent
    concurrently-run processes communicating through synchronous
    channels

  • Actors/Active objects give you an abstraction of low-ceremony
    event-driven active components that asynchronously exchange
    data

  • Agents, as the name suggests, protect data that multiple threads
    need to access concurrently

You may check out the details of each of the models in the
GPars user
guide
and see them compared side-by-side with their typical area of
applicability
. Also, the upcoming book “Groovy in Action”, 2nd
edition, by Dierk König covers GPars to a great level of
detail.

For this article, I chose three of the abstractions that are
most likely to show you the benefits of intuitive concurrency –
parallel collections, asynchronous functions, and dataflow
operators. Let’s dive right in!

Geometrical decomposition

Now, this is a good place to explain why the sequential
algorithm for finding the maximum that we described earlier is a
bad choice. It is not that the solution is incorrect. Obviously, it
gives you valid answers, doesn’t it? Where it fails is its
effectiveness. It prohibits scaling up with the increasing number
of workers. It totally ignores the possibility that the system may
be able to put several processors on the problem.

Supermarkets solved this challenge decades ago. When a queue at
the cash desk gets too long, they call in an additional cashier to
serve the customers and so the work-load gets distributed and
throughput increases.

Back to our problem of finding
the maximum: Leveraging the Groovy functional collection API, GPars
adds parallel versions of each of the popular iterative methods
such as
eachParallel(),
collectParallel(), findAllParallel(),
maxParallel() and others. These methods hide the actual implementation
from the user. Behind the scenes, the collection gets divided into
smaller chunks, possibly organized hierarchically, and each of
these chunks will be processed by a different thread
(
Figure 1).

The actual work is performed by threads from a thread pool that
the user must provide. GPars comes with two types of thread
pools:

  • GParsExecutorsPool uses straight Java 5 Executors

  • GParsPool uses a
    Fork/Join thread pool

 

Figure 1: Geometric
Decomposition

 

Parallel collections in use:
GParsPool.withPool 16, {
    def myFavorite = programmingLanguages.collectParallel {it.grabSpec()}
                        .maxParallel {it.spec.count(/parallel/)}
}

 

Within the
withPool code block, the parallel
collection methods automatically distribute work among the threads
of the surrounding thread pool. The more threads you add to the
pool, the more parallelism you get. Without an explicit pool size
requirement, the pool would create a thread for each available
processor detected at runtime, giving you the maximum computing
power. That way there will be no artificial upper boundaries
limiting parallelism inside your algorithm. The code will run at
full speed no matter whether it is being run on an old
single-processor machine or on a future hundred-core
chip.

GPars parallel collection API
offers solutions to what is frequently called
The Loop
Parallelism problems
or more
generally
Geometrical Decomposition
problems
. There are also other types of
challenges that require more creative approaches to concurrency.
We’re going to discuss two of them in the next part of the
article.

      
     

Asynchronous functions

Having seen collections being processed concurrently, we’ll now
focus on functions. Groovy has pretty good support for functional
programming; after all, being able to parallelize method and
function invocations will surely come in handy. To stay close to
the domain where Groovy is at home, I chose the problem of software
project build orchestration for our next journey.

Note: When parallelizing build processes, which are
typically more I/O bound than CPU bound, we’re obviously increasing
the utilization of the disk and the network bandwidth and not so
much the processor. It is not only the CPU, but also other
resources the utilization of which can be improved with concurrent
code. Obviously, the demonstrated principles could be applied
exactly the same way to CPU-bound problems.
 

Let’s assume for the demonstration that we have a set of
functions, perhaps implemented as shell scripts, gradle tasks or
GAnt methods, which can perform different parts of a build. The
traditional build script could then look something like the one
shown below:

Listing 1: A sequential version of the build
script

println "Starting the build process."
def projectRoot = checkout('git@github.com:vaclav/GPars.git')
def classes = compileSources(projectRoot)
def api = generateAPIDoc(projectRoot)
def guide = generateUserDocumentation(projectRoot)
def result = deploy(packageProject(classes, api, guide))

Making it parallel

The task for us now is to safely
parallelize the build as much as possible without excessive effort.
You can probably see that

compileSources(),
generateAPIDoc(), and
generateUserGuide() can safely run in
parallel, since they have no mutual dependencies. They only need to
wait for
checkout() to finish
before they start their work. The script, however, runs them
serially.

I’m sure you can imagine much more complex build scenarios than
this one. However, without a good abstraction, if our task is to
run build tasks concurrently, even with such an artificially
simplified build script we are in for quite some work (listing 2).
Well, what do you think about this?

Listing 2: A concurrent version of the build script
using asynchronous functions

withPool {
    /* We need asynchronous variants of all the individual build steps */
    def aCheckout = checkout.asyncFun()
    def aCompileSources = compileSources.asyncFun()
    def aGenerateAPIDoc = generateAPIDoc.asyncFun()
    def aGenerateUserDocumentation = generateUserDocumentation.asyncFun()
    def aPackageProject = packageProject.asyncFun()
    def aDeploy = deploy.asyncFun()

    /* Here's the composition of asynchronous build steps to form a process */
    Promise projectRoot = aCheckout('git@github.com:vaclav/GPars.git')
    Promise classes = aCompileSources(projectRoot)
    Promise api = aGenerateAPIDoc(projectRoot)
    Promise guide = aGenerateUserDocumentation(projectRoot)
    Promise result = aDeploy(aPackageProject(classes, api, guide))

    /* Now we're setup and can wait for the build to finish */
    println "Starting the build process. This line is quite likely to be printed first ..."
    println result.get()
}

 

The wiring stayed totally the
same. We only turned our original functions into asynchronous ones
through the
asyncFun() method.
Also, the whole block of code is now wrapped inside a

GParsPool.withPool() block, so that the
functions have some threads to use for their hard work.

Obviously, there’s quite some
magic happening in the
asyncFun() method to allow the functions to run asynchronously and yet
cooperate when they need data from one another.

The beauty of
details

Essentially,
asyncFun() wraps the original function
inside a new one. The signature of the new function is slightly
different from the signature of the original function. For example,
while the original
compileSources() function takes a String as a parameter and returns a String
as a result:

 

String compileSources = {String projectRoot -> ...}

the newly constructed
aCompileSources() function returns
a
Promise of a
String, not the String
itself. Also, both String
and Promise<String>
are accepted as arguments:

 

Promise<String> aCompileSources = {String | Promise<String> projectRoot -> ...}

Promises to Keep

The Promise interface is a
fundamental cornerstone of several GPars APIs. It is somewhat
similar to
java.util.concurrent.Future in that it represents an on-going asynchronous activity and
the result can be waited for and obtained through its

blocking get() method. The most
important distinction between the two is that

Promises, unlike Java’s
Futures, allow non-blocking
reads.

 

promise.then {println "Now we have a result: $it"}

This allows our asynchronous
functions to only consume system threads when all the values they
need for their calculation are readily available. Thus, for
example, packaging will only start after the

classes, api
and guide local variables are all bound to a result value. Until then
the
aPackage()
function is silently waiting in the back room
unscheduled and inactive (
Figure
2
).

 

Figure 2: Functions

Cui bono?

Now you should be able to see how nicely the building blocks fit
together. Since asynchronous functions return and accept promises,
they can be composed in the same way the synchronous original
functions were. And the second and perhaps more prominent benefit
of functional composition is that we don’t have to explicitly
specify which tasks can run in parallel. I’m sure if we continued
adding tasks to our build process we’d soon lose the global view of
what activity can safely run in parallel with what other
activities. Fortunately, our asynchronous functions will discover
parallelism by themselves at run-time. At any moment, all tasks
that have their parameters ready will grab a thread from the
assigned thread pool and start running. By limiting the number of
threads in the pool you can set an upper limit on the number of
tasks run in parallel.

Where the data flows

Now we are ready for the most
interesting abstraction in our today’s set –
dataflow
concurrency
. Building on our previous
example, we’ll now
go continuous. We’ll be running the build script repeatedly in order to
build multiple projects. Think of it as an initial stage of a
future build server, if you like. Build requests for various
projects will be coming in through a pipe and our build server will
be processing them in turn, as the system resources
permit.

You may be tempted to try the
simplest solution – run the asynchronous function-based code of the
previous exercise for each incoming request. However, chances are
high that this would be largely suboptimal. With multiple requests
stacked in the request queue, we have an opportunity for much
greater parallelism. Not only the independent parts of the same
project can be built in parallel, but perhaps different parts of
different projects can also be processed at the same time. Simply
put, processing of different projects can overlap in time
(
Figure 3).

 

 

Figure 3: Overlap

Go with the flow

The model that is a natural fit
for problems like these is called
Dataflow
networks
. It is commonly used for
concurrent data processing such as encryption and compression,
data mining, image
processing, and others. Essentially, a dataflow network consists of
active data-transforming elements, called operators, connected by
asynchronous channels. Each operator consumes data from its input
channels and publishes results through several output channels. It
also has an associated transformation function, which transforms
data received through the input channels into the data to send down
the output channels. Under the covers operators share a thread
pool, so an inactive operator that has no data to process doesn’t
consume a system thread. For our simple build server, the network
could look like
Figure
4
.

 

Figure 4: Operator Network Option
1

We have an operator for each step. The channels represent
dependencies between the build tasks and each operator will only
demand a system thread and initiate the calculation after all its
input channels have a value to read.

Listing 3: Concurrent build server using dataflow
operators

/* We need channels to wire active elements together */
def urls = new DataflowQueue()
def checkedOutProjects = new DataflowBroadcast()
def compiledProjects = new DataflowQueue()
def apiDocs = new DataflowQueue()
def userDocs = new DataflowQueue()
def packages = new DataflowQueue()
def done = new DataflowQueue()

/* Here's the composition of individual build steps into a process */
operator(inputs: [urls], outputs: [checkedOutProjects], maxForks: 3) {url ->
    bindAllOutputs checkout(url)
}
operator([checkedOutProjects.createReadChannel()],
         [compiledProjects]) {projectRoot ->
    bindOutput compileSources(projectRoot)
}
operator(checkedOutProjects.createReadChannel(), apiDocs) {projectRoot ->
    bindOutput generateAPIDoc(projectRoot)
}
operator(checkedOutProjects.createReadChannel(), userDocs) {projectRoot ->
    bindOutput generateUserDocumentation(projectRoot)
}
operator([compiledProjects, apiDocs, userDocs], 
        [packages]) {classes, api, guide ->
    bindOutput packageProject(classes, api, guide)
}
def deployer = operator(packages, done) {packagedProject ->
    if (deploy(packagedProject) == 'success') bindOutput true
    else bindOutput false
}

/* Now we're setup and can wait for the build to finish */
println "Starting the build process. This line is quite likely to be printed first ..."
deployer.join()  //Wait for the last operator in the network to finish

The beauty of this model applied to our problem is that, for
example, when the first operator performs project checkout and
finishes fetching sources of a project, it can immediately grab the
next request in the queue and start fetching its sources long
before the former request is compiled, packaged and deployed.

By changing the number of
operators assigned to a particular task in the network you can tune
the system. If we realize, for example, that fetching the sources
is the bottle-neck and provided the hardware (network bandwidth)
still isn’t fully utilized, we might increase throughput of the
server by increasing the number of source-fetching operators
(
Figure 5).

Figure 5: Operator Network Option
2

And a lot more

Obviously, your tuning options go much further than forking
heavily used operators. To briefly name some of the other
possibilities, you can consider load-balancing schemes among
duplicated parts of your network, implement production throttling –
either through synchronous communication channels or using the
kanban-like

Work In Progress throttling scheme
. For some problems, data
caching and speculative calculations may be
considered
.

Summary

Having dipped your toe in Groovy concurrency, you may be in a
good position to take a deeper look. For a quick start, consider
following one of the GPars fast tracks, and I
definitely recommend you had a look at the user guide to
investigate further details.

I’ll be happy if you take GPars for a ride. Go and enjoy
concurrency, ‘coz concurrency is Groovy!

Author Bio: Václav is
a programming enthusiast who’s constantly seeking ways to make
development more effective and enjoyable. He’s particularly
interested in server-side Java technologies, concurrency, modern
programming languages and DSLs. He works
at JetBrains on the MPS project as a
senior developer and a technology evangelist. On the side,
he’s leading the GPars project, an open-source library
for easy concurrent programming in Groovy and Java.

This article originally appeared in Java Tech Journal –
The Groovy Universe. Read more of that here.

Author
Comments
comments powered by Disqus