‘coz Concurrency is Groovy!

Tutorial – GPars, making parallel systems Groovy and Java-friendly

VclavPech
Gpars

Václav Pech introduces us to his project GPars, an open source library designed for high-level concurrency in Groovy and Java

GPars is an open-source library for high-level concurrency in Groovy (and Java). If you’ve ever heard terms like actors, dataflow, or parallel collections and wanted to try these concepts in a Java-friendly language, now you have a chance. In this article, I plan to give you a quick overview of the abstractions available in GPars. We’ll then look in more detail at flow-based concurrent data processing, parallelizing collection operations as well as running composed functions asynchronously.

As multi-core chips are becoming the norm in mainstream computers, tablets and phones, concurrent programming is gaining importance. Unfortunately, the widespread thread-based concurrency model that we know from Java doesn’t match well with how the human brain works. Threads and locks introduce too much non-determinism into the code, which frequently results in subtle bugs that are hard to track down and fix. Such code can’t be reliably tested or analysed. Inevitably, for concurrent programming to be effective, we need to use more natural mental models.

Concurrency can be Groovy

Bringing intuitive concurrency models to the mainstream is the challenge that GPars has ambitions to help with. We took the well-known concurrency concepts, such as actors, CSP, dataflow and others and implemented them in Java with a tasty Groovy DSL topping giving the library a fluent and easy-to-use flavour. Although targeting Groovy primarily, some of the GPars abstractions can be used from Java directly. Thanks to the interest of the Groovy community in concurrency and their support to the project, GPars is currently a standard part of Groovy distributions. No additional setup is required to get up and running with concurrency in Groovy.

A loop considered deadly

I’d like you to stop for a moment and think about the trivial exercises that computer science students are typically assigned when learning to program. For example, one such task is to find the maximum of a collection. Potentially some complex metrics may be applied to make the solution more computationally heavy. What algorithm comes to your mind first?

Chances are high you’d propose an iteration that would go through the collection and remember the biggest element found so far. Once we hit the end of the collection, the element we remember as the so-far biggest must be the global maximum of the whole collection. Clear, simple and – well, wrong! Keep on reading if you want to find out why.

Make your choice

There doesn’t seem to be a single one-size-fits-all solution in the concurrency space. Multiple paradigms have gradually emerged and despite some overlap, they are each suitable to different types of problems. GPars itself started in 2008 as a small Groovy library for parallel collection processing. It added support for the actor model shortly after. Over time other concurrency abstractions have been integrated. Here’s a quick list of what is currently available in GPars version 0.12:

  • Parallel collections provide intuitive ways to parallelize processing of Java/Groovy collections, maps and all geometrically decomposable problems in general

  • Asynchronous functions enable Groovy closures to be run asynchronously and at the same time to orchestrate their mutual communication with little effort

  • Fork/Join empowers you to process recursive divide-and-conquer algorithms concurrently

  • Dataflow variables (aka Promises) offer a lightweight mechanism for inter-thread communication.

  • Dataflow channels and operators let you organize active data-transforming elements into highly concurrent data-processing pipelines and networks

  • CSP is a well-known concurrency model based in theoretical mathematics, which uses an abstraction of independent concurrently-run processes communicating through synchronous channels

  • Actors/Active objects give you an abstraction of low-ceremony event-driven active components that asynchronously exchange data

  • Agents, as the name suggests, protect data that multiple threads need to access concurrently

You may check out the details of each of the models in the GPars user guide and see them compared side-by-side with their typical area of applicability. Also, the upcoming book “Groovy in Action”, 2nd edition, by Dierk König covers GPars to a great level of detail.

For this article, I chose three of the abstractions that are most likely to show you the benefits of intuitive concurrency – parallel collections, asynchronous functions, and dataflow operators. Let’s dive right in!

Geometrical decomposition

Now, this is a good place to explain why the sequential algorithm for finding the maximum that we described earlier is a bad choice. It is not that the solution is incorrect. Obviously, it gives you valid answers, doesn’t it? Where it fails is its effectiveness. It prohibits scaling up with the increasing number of workers. It totally ignores the possibility that the system may be able to put several processors on the problem.

Supermarkets solved this challenge decades ago. When a queue at the cash desk gets too long, they call in an additional cashier to serve the customers and so the work-load gets distributed and throughput increases.

Back to our problem of finding the maximum: Leveraging the Groovy functional collection API, GPars adds parallel versions of each of the popular iterative methods such as eachParallel(), collectParallel(), findAllParallel(), maxParallel() and others. These methods hide the actual implementation from the user. Behind the scenes, the collection gets divided into smaller chunks, possibly organized hierarchically, and each of these chunks will be processed by a different thread (Figure 1).

The actual work is performed by threads from a thread pool that the user must provide. GPars comes with two types of thread pools:

  • GParsExecutorsPool uses straight Java 5 Executors

  • GParsPool uses a Fork/Join thread pool

 

Figure 1: Geometric Decomposition

 

Parallel collections in use:
GParsPool.withPool 16, {
    def myFavorite = programmingLanguages.collectParallel {it.grabSpec()}
                        .maxParallel {it.spec.count(/parallel/)}
}

 

Within the withPool code block, the parallel collection methods automatically distribute work among the threads of the surrounding thread pool. The more threads you add to the pool, the more parallelism you get. Without an explicit pool size requirement, the pool would create a thread for each available processor detected at runtime, giving you the maximum computing power. That way there will be no artificial upper boundaries limiting parallelism inside your algorithm. The code will run at full speed no matter whether it is being run on an old single-processor machine or on a future hundred-core chip.

GPars parallel collection API offers solutions to what is frequently called The Loop Parallelism problems or more generally Geometrical Decomposition problems. There are also other types of challenges that require more creative approaches to concurrency. We’re going to discuss two of them in the next part of the article.

            

Asynchronous functions

Having seen collections being processed concurrently, we’ll now focus on functions. Groovy has pretty good support for functional programming; after all, being able to parallelize method and function invocations will surely come in handy. To stay close to the domain where Groovy is at home, I chose the problem of software project build orchestration for our next journey.

Note: When parallelizing build processes, which are typically more I/O bound than CPU bound, we’re obviously increasing the utilization of the disk and the network bandwidth and not so much the processor. It is not only the CPU, but also other resources the utilization of which can be improved with concurrent code. Obviously, the demonstrated principles could be applied exactly the same way to CPU-bound problems. 

Let’s assume for the demonstration that we have a set of functions, perhaps implemented as shell scripts, gradle tasks or GAnt methods, which can perform different parts of a build. The traditional build script could then look something like the one shown below:

Listing 1: A sequential version of the build script

println "Starting the build process."
def projectRoot = checkout('git@github.com:vaclav/GPars.git')
def classes = compileSources(projectRoot)
def api = generateAPIDoc(projectRoot)
def guide = generateUserDocumentation(projectRoot)
def result = deploy(packageProject(classes, api, guide))

Making it parallel

The task for us now is to safely parallelize the build as much as possible without excessive effort. You can probably see that compileSources(), generateAPIDoc(), and generateUserGuide() can safely run in parallel, since they have no mutual dependencies. They only need to wait for checkout() to finish before they start their work. The script, however, runs them serially.

I’m sure you can imagine much more complex build scenarios than this one. However, without a good abstraction, if our task is to run build tasks concurrently, even with such an artificially simplified build script we are in for quite some work (listing 2). Well, what do you think about this?

Listing 2: A concurrent version of the build script using asynchronous functions

withPool {
    /* We need asynchronous variants of all the individual build steps */
    def aCheckout = checkout.asyncFun()
    def aCompileSources = compileSources.asyncFun()
    def aGenerateAPIDoc = generateAPIDoc.asyncFun()
    def aGenerateUserDocumentation = generateUserDocumentation.asyncFun()
    def aPackageProject = packageProject.asyncFun()
    def aDeploy = deploy.asyncFun()

    /* Here's the composition of asynchronous build steps to form a process */
    Promise projectRoot = aCheckout('git@github.com:vaclav/GPars.git')
    Promise classes = aCompileSources(projectRoot)
    Promise api = aGenerateAPIDoc(projectRoot)
    Promise guide = aGenerateUserDocumentation(projectRoot)
    Promise result = aDeploy(aPackageProject(classes, api, guide))

    /* Now we're setup and can wait for the build to finish */
    println "Starting the build process. This line is quite likely to be printed first ..."
    println result.get()
}

 

The wiring stayed totally the same. We only turned our original functions into asynchronous ones through the asyncFun() method. Also, the whole block of code is now wrapped inside a GParsPool.withPool() block, so that the functions have some threads to use for their hard work.

Obviously, there’s quite some magic happening in the asyncFun() method to allow the functions to run asynchronously and yet cooperate when they need data from one another.

The beauty of details

Essentially, asyncFun() wraps the original function inside a new one. The signature of the new function is slightly different from the signature of the original function. For example, while the original compileSources() function takes a String as a parameter and returns a String as a result:

 

String compileSources = {String projectRoot -> ...}

the newly constructed aCompileSources() function returns a Promise of a String, not the String itself. Also, both String and Promise<String> are accepted as arguments:

 

Promise<String> aCompileSources = {String | Promise<String> projectRoot -> ...}

Promises to Keep

The Promise interface is a fundamental cornerstone of several GPars APIs. It is somewhat similar to java.util.concurrent.Future in that it represents an on-going asynchronous activity and the result can be waited for and obtained through its blocking get() method. The most important distinction between the two is that Promises, unlike Java’s Futures, allow non-blocking reads.

 

promise.then {println "Now we have a result: $it"}

This allows our asynchronous functions to only consume system threads when all the values they need for their calculation are readily available. Thus, for example, packaging will only start after the classes, api and guide local variables are all bound to a result value. Until then the aPackage() function is silently waiting in the back room unscheduled and inactive (Figure 2).

 

Figure 2: Functions

Cui bono?

Now you should be able to see how nicely the building blocks fit together. Since asynchronous functions return and accept promises, they can be composed in the same way the synchronous original functions were. And the second and perhaps more prominent benefit of functional composition is that we don’t have to explicitly specify which tasks can run in parallel. I’m sure if we continued adding tasks to our build process we’d soon lose the global view of what activity can safely run in parallel with what other activities. Fortunately, our asynchronous functions will discover parallelism by themselves at run-time. At any moment, all tasks that have their parameters ready will grab a thread from the assigned thread pool and start running. By limiting the number of threads in the pool you can set an upper limit on the number of tasks run in parallel.

Where the data flows

Now we are ready for the most interesting abstraction in our today’s set – dataflow concurrency. Building on our previous example, we’ll now go continuous. We’ll be running the build script repeatedly in order to build multiple projects. Think of it as an initial stage of a future build server, if you like. Build requests for various projects will be coming in through a pipe and our build server will be processing them in turn, as the system resources permit.

You may be tempted to try the simplest solution – run the asynchronous function-based code of the previous exercise for each incoming request. However, chances are high that this would be largely suboptimal. With multiple requests stacked in the request queue, we have an opportunity for much greater parallelism. Not only the independent parts of the same project can be built in parallel, but perhaps different parts of different projects can also be processed at the same time. Simply put, processing of different projects can overlap in time (Figure 3).

 

 

Figure 3: Overlap

Go with the flow

The model that is a natural fit for problems like these is called Dataflow networks. It is commonly used for concurrent data processing such as encryption and compression, data mining, image processing, and others. Essentially, a dataflow network consists of active data-transforming elements, called operators, connected by asynchronous channels. Each operator consumes data from its input channels and publishes results through several output channels. It also has an associated transformation function, which transforms data received through the input channels into the data to send down the output channels. Under the covers operators share a thread pool, so an inactive operator that has no data to process doesn’t consume a system thread. For our simple build server, the network could look like Figure 4.

 

Figure 4: Operator Network Option 1

We have an operator for each step. The channels represent dependencies between the build tasks and each operator will only demand a system thread and initiate the calculation after all its input channels have a value to read.

Listing 3: Concurrent build server using dataflow operators

/* We need channels to wire active elements together */
def urls = new DataflowQueue()
def checkedOutProjects = new DataflowBroadcast()
def compiledProjects = new DataflowQueue()
def apiDocs = new DataflowQueue()
def userDocs = new DataflowQueue()
def packages = new DataflowQueue()
def done = new DataflowQueue()

/* Here's the composition of individual build steps into a process */
operator(inputs: [urls], outputs: [checkedOutProjects], maxForks: 3) {url ->
    bindAllOutputs checkout(url)
}
operator([checkedOutProjects.createReadChannel()],
         [compiledProjects]) {projectRoot ->
    bindOutput compileSources(projectRoot)
}
operator(checkedOutProjects.createReadChannel(), apiDocs) {projectRoot ->
    bindOutput generateAPIDoc(projectRoot)
}
operator(checkedOutProjects.createReadChannel(), userDocs) {projectRoot ->
    bindOutput generateUserDocumentation(projectRoot)
}
operator([compiledProjects, apiDocs, userDocs], 
        [packages]) {classes, api, guide ->
    bindOutput packageProject(classes, api, guide)
}
def deployer = operator(packages, done) {packagedProject ->
    if (deploy(packagedProject) == 'success') bindOutput true
    else bindOutput false
}

/* Now we're setup and can wait for the build to finish */
println "Starting the build process. This line is quite likely to be printed first ..."
deployer.join()  //Wait for the last operator in the network to finish

The beauty of this model applied to our problem is that, for example, when the first operator performs project checkout and finishes fetching sources of a project, it can immediately grab the next request in the queue and start fetching its sources long before the former request is compiled, packaged and deployed.

By changing the number of operators assigned to a particular task in the network you can tune the system. If we realize, for example, that fetching the sources is the bottle-neck and provided the hardware (network bandwidth) still isn’t fully utilized, we might increase throughput of the server by increasing the number of source-fetching operators (Figure 5).

Figure 5: Operator Network Option 2

And a lot more

Obviously, your tuning options go much further than forking heavily used operators. To briefly name some of the other possibilities, you can consider load-balancing schemes among duplicated parts of your network, implement production throttling – either through synchronous communication channels or using the kanban-like Work In Progress throttling scheme. For some problems, data caching and speculative calculations may be considered.

Summary

Having dipped your toe in Groovy concurrency, you may be in a good position to take a deeper look. For a quick start, consider following one of the GPars fast tracks, and I definitely recommend you had a look at the user guide to investigate further details.

I’ll be happy if you take GPars for a ride. Go and enjoy concurrency, ‘coz concurrency is Groovy!

Author Bio: Václav is a programming enthusiast who’s constantly seeking ways to make development more effective and enjoyable. He’s particularly interested in server-side Java technologies, concurrency, modern programming languages and DSLs. He works at JetBrains on the MPS project as a senior developer and a technology evangelist. On the side, he’s leading the GPars project, an open-source library for easy concurrent programming in Groovy and Java.

This article originally appeared in Java Tech Journal – The Groovy Universe. Read more of that here.

Author
Comments
comments powered by Disqus