‘coz Concurrency is Groovy!

Tutorial - GPars, making parallel systems Groovy and Java-friendly - Part 2


Asynchronous functions

Having seen collections being processed concurrently, we’ll now focus on functions. Groovy has pretty good support for functional programming; after all, being able to parallelize method and function invocations will surely come in handy. To stay close to the domain where Groovy is at home, I chose the problem of software project build orchestration for our next journey.

Note: When parallelizing build processes, which are typically more I/O bound than CPU bound, we’re obviously increasing the utilization of the disk and the network bandwidth and not so much the processor. It is not only the CPU, but also other resources the utilization of which can be improved with concurrent code. Obviously, the demonstrated principles could be applied exactly the same way to CPU-bound problems. 

Let’s assume for the demonstration that we have a set of functions, perhaps implemented as shell scripts, gradle tasks or GAnt methods, which can perform different parts of a build. The traditional build script could then look something like the one shown below:

Listing 1: A sequential version of the build script

println "Starting the build process."
def projectRoot = checkout('git@github.com:vaclav/GPars.git')
def classes = compileSources(projectRoot)
def api = generateAPIDoc(projectRoot)
def guide = generateUserDocumentation(projectRoot)
def result = deploy(packageProject(classes, api, guide))

Making it parallel

The task for us now is to safely parallelize the build as much as possible without excessive effort. You can probably see that compileSources(), generateAPIDoc(), and generateUserGuide() can safely run in parallel, since they have no mutual dependencies. They only need to wait for checkout() to finish before they start their work. The script, however, runs them serially.

I’m sure you can imagine much more complex build scenarios than this one. However, without a good abstraction, if our task is to run build tasks concurrently, even with such an artificially simplified build script we are in for quite some work (listing 2). Well, what do you think about this?

Listing 2: A concurrent version of the build script using asynchronous functions

withPool {
    /* We need asynchronous variants of all the individual build steps */
    def aCheckout = checkout.asyncFun()
    def aCompileSources = compileSources.asyncFun()
    def aGenerateAPIDoc = generateAPIDoc.asyncFun()
    def aGenerateUserDocumentation = generateUserDocumentation.asyncFun()
    def aPackageProject = packageProject.asyncFun()
    def aDeploy = deploy.asyncFun()

    /* Here's the composition of asynchronous build steps to form a process */
    Promise projectRoot = aCheckout('git@github.com:vaclav/GPars.git')
    Promise classes = aCompileSources(projectRoot)
    Promise api = aGenerateAPIDoc(projectRoot)
    Promise guide = aGenerateUserDocumentation(projectRoot)
    Promise result = aDeploy(aPackageProject(classes, api, guide))

    /* Now we're setup and can wait for the build to finish */
    println "Starting the build process. This line is quite likely to be printed first ..."
    println result.get()


The wiring stayed totally the same. We only turned our original functions into asynchronous ones through the asyncFun() method. Also, the whole block of code is now wrapped inside a GParsPool.withPool() block, so that the functions have some threads to use for their hard work.

Obviously, there’s quite some magic happening in the asyncFun() method to allow the functions to run asynchronously and yet cooperate when they need data from one another.

The beauty of details

Essentially, asyncFun() wraps the original function inside a new one. The signature of the new function is slightly different from the signature of the original function. For example, while the original compileSources() function takes a String as a parameter and returns a String as a result:


String compileSources = {String projectRoot -> ...}

the newly constructed aCompileSources() function returns a Promise of a String, not the String itself. Also, both String and Promise<String> are accepted as arguments:


Promise<String> aCompileSources = {String | Promise<String> projectRoot -> ...}

Promises to Keep

The Promise interface is a fundamental cornerstone of several GPars APIs. It is somewhat similar to java.util.concurrent.Future in that it represents an on-going asynchronous activity and the result can be waited for and obtained through its blocking get() method. The most important distinction between the two is that Promises, unlike Java’s Futures, allow non-blocking reads.


promise.then {println "Now we have a result: $it"}

This allows our asynchronous functions to only consume system threads when all the values they need for their calculation are readily available. Thus, for example, packaging will only start after the classes, api and guide local variables are all bound to a result value. Until then the aPackage() function is silently waiting in the back room unscheduled and inactive (Figure 2).


Figure 2: Functions

Cui bono?

Now you should be able to see how nicely the building blocks fit together. Since asynchronous functions return and accept promises, they can be composed in the same way the synchronous original functions were. And the second and perhaps more prominent benefit of functional composition is that we don’t have to explicitly specify which tasks can run in parallel. I’m sure if we continued adding tasks to our build process we’d soon lose the global view of what activity can safely run in parallel with what other activities. Fortunately, our asynchronous functions will discover parallelism by themselves at run-time. At any moment, all tasks that have their parameters ready will grab a thread from the assigned thread pool and start running. By limiting the number of threads in the pool you can set an upper limit on the number of tasks run in parallel.

Where the data flows

Now we are ready for the most interesting abstraction in our today’s set – dataflow concurrency. Building on our previous example, we’ll now go continuous. We’ll be running the build script repeatedly in order to build multiple projects. Think of it as an initial stage of a future build server, if you like. Build requests for various projects will be coming in through a pipe and our build server will be processing them in turn, as the system resources permit.

You may be tempted to try the simplest solution – run the asynchronous function-based code of the previous exercise for each incoming request. However, chances are high that this would be largely suboptimal. With multiple requests stacked in the request queue, we have an opportunity for much greater parallelism. Not only the independent parts of the same project can be built in parallel, but perhaps different parts of different projects can also be processed at the same time. Simply put, processing of different projects can overlap in time (Figure 3).



Figure 3: Overlap

Go with the flow

The model that is a natural fit for problems like these is called Dataflow networks. It is commonly used for concurrent data processing such as encryption and compression, data mining, image processing, and others. Essentially, a dataflow network consists of active data-transforming elements, called operators, connected by asynchronous channels. Each operator consumes data from its input channels and publishes results through several output channels. It also has an associated transformation function, which transforms data received through the input channels into the data to send down the output channels. Under the covers operators share a thread pool, so an inactive operator that has no data to process doesn’t consume a system thread. For our simple build server, the network could look like Figure 4.


Figure 4: Operator Network Option 1

We have an operator for each step. The channels represent dependencies between the build tasks and each operator will only demand a system thread and initiate the calculation after all its input channels have a value to read.

Listing 3: Concurrent build server using dataflow operators

/* We need channels to wire active elements together */
def urls = new DataflowQueue()
def checkedOutProjects = new DataflowBroadcast()
def compiledProjects = new DataflowQueue()
def apiDocs = new DataflowQueue()
def userDocs = new DataflowQueue()
def packages = new DataflowQueue()
def done = new DataflowQueue()

/* Here's the composition of individual build steps into a process */
operator(inputs: [urls], outputs: [checkedOutProjects], maxForks: 3) {url ->
    bindAllOutputs checkout(url)
         [compiledProjects]) {projectRoot ->
    bindOutput compileSources(projectRoot)
operator(checkedOutProjects.createReadChannel(), apiDocs) {projectRoot ->
    bindOutput generateAPIDoc(projectRoot)
operator(checkedOutProjects.createReadChannel(), userDocs) {projectRoot ->
    bindOutput generateUserDocumentation(projectRoot)
operator([compiledProjects, apiDocs, userDocs], 
        [packages]) {classes, api, guide ->
    bindOutput packageProject(classes, api, guide)
def deployer = operator(packages, done) {packagedProject ->
    if (deploy(packagedProject) == 'success') bindOutput true
    else bindOutput false

/* Now we're setup and can wait for the build to finish */
println "Starting the build process. This line is quite likely to be printed first ..."
deployer.join()  //Wait for the last operator in the network to finish

The beauty of this model applied to our problem is that, for example, when the first operator performs project checkout and finishes fetching sources of a project, it can immediately grab the next request in the queue and start fetching its sources long before the former request is compiled, packaged and deployed.

By changing the number of operators assigned to a particular task in the network you can tune the system. If we realize, for example, that fetching the sources is the bottle-neck and provided the hardware (network bandwidth) still isn’t fully utilized, we might increase throughput of the server by increasing the number of source-fetching operators (Figure 5).

Figure 5: Operator Network Option 2

And a lot more

Obviously, your tuning options go much further than forking heavily used operators. To briefly name some of the other possibilities, you can consider load-balancing schemes among duplicated parts of your network, implement production throttling – either through synchronous communication channels or using the kanban-like Work In Progress throttling scheme. For some problems, data caching and speculative calculations may be considered.


Having dipped your toe in Groovy concurrency, you may be in a good position to take a deeper look. For a quick start, consider following one of the GPars fast tracks, and I definitely recommend you had a look at the user guide to investigate further details.

I’ll be happy if you take GPars for a ride. Go and enjoy concurrency, ‘coz concurrency is Groovy!

Author Bio: Václav is a programming enthusiast who’s constantly seeking ways to make development more effective and enjoyable. He’s particularly interested in server-side Java technologies, concurrency, modern programming languages and DSLs. He works at JetBrains on the MPS project as a senior developer and a technology evangelist. On the side, he’s leading the GPars project, an open-source library for easy concurrent programming in Groovy and Java.

This article originally appeared in Java Tech Journal - The Groovy Universe. Read more of that here.


Václav Pech
Václav Pech

What do you think?

JAX Magazine - 2014 - 03 Exclucively for iPad users JAX Magazine on Android


Latest opinions