The extra pattern

Tutorial: Asynchronous Programming With Akka Actors

From November's JAX Magazine, Typesafe's Jamie Allen explains how useful anonymous actors are to capture context while processing asynchronous tasks in Akka.

One of the most difficult tasks in asynchronous programming is trying to capture context so that the state of the world at the time the task was started can be accurately represented at the time the task finishes.

However, creating anonymous instances of Akka actors is a very simple and lightweight solution for capturing the context at the time the message was handle, to be utilized when the tasks are successfully completed.

The Problem

A great example is an actor which is sequentially handling messages in its mailbox but performing the tasks based on those message off-thread with Futures. This is a great way to design your actors in that they will not block waiting for responses, allowing them to handle more messages concurrently and increase your application's performance. However, the state of the actor will likely change with every message.

Let's take an example of an actor which will act as a proxy to get a customer account information for a financial services firm from multiple data sources. Further, let's assume that each of the subsystem proxies for savings, checking and money market account balances will optionally return a list of accounts and their balances of that kind for this customer. Let's write some basic Akka actor code to perform this task:

 

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout

case class GetCustomerAccountBalances(id: Long)
case class AccountBalances(
 val checking: Option[List[(Long, BigDecimal)]],
 val savings: Option[List[(Long, BigDecimal)]],
 val moneyMarket: Option[List[(Long, BigDecimal)]])
case class CheckingAccountBalances(
 val balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(
 val balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(
 val balances: Option[List[(Long, BigDecimal)]])

class SavingsAccountProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
 }
}
class CheckingAccountProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! CheckingAccountBalances(Some(List((3, 15000))))
 }
}
class MoneyMarketAccountsProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! MoneyMarketAccountBalances(None)
 }
}

class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
 implicit val timeout: Timeout = 100 milliseconds
 implicit val ec: ExecutionContext = context.dispatcher
 def receive = {
  case GetCustomerAccountBalances(id) =>
   val futSavings = savingsAccounts ? GetCustomerAccountBalances(id)
   val futChecking = checkingAccounts ? GetCustomerAccountBalances(id)
   val futMM = moneyMarketAccounts ? GetCustomerAccountBalances(id)
   val futBalances = for {
    savings <- futSavings.mapTo[Option[List[(Long, BigDecimal)]]]
    checking <- futChecking.mapTo[Option[List[(Long, BigDecimal)]]]
    mm <- futMM.mapTo[Option[List[(Long, BigDecimal)]]]
   } yield AccountBalances(savings, checking, mm)
   futBalances map (sender ! _)
 }
}

 

This code is fairly concise. The AccountBalanceRetriever actor receives a message to get account balances for a customer, and then it fires off three futures in parallel. The first will get the customer's savings account balance, the second will get the checking account balance and the third will get a money market balance. Doing these tasks in parallel allows us to avoid the expensive cost of performing the retrievals sequentially. Also, note that while the futures will return Options of some account balances by account ID.  If they return None, they will not short-circuit the for comprehension. If None is returned from futSavings, it will still continue the for comprehension.

However, there are a couple of things about it that are not ideal. First of all, it is using futures to ask other actors, for responses, and creating a new PromiseActorRef for every message sent, which is a waste of resources. It would be better to have our AccountBalanceRetriever actor send messages out in a "fire and forget" fashion and collect results asynchronously into *one* actor..

Furthermore, there is a glaring race condition in this code – can you see it? We're referencing the "sender" in our map operation on the result from futBalances, which may not be the same ActorRef when the future completes, because the AccountBalanceRetriever ActorRef may now be handling another message from a different sender at that point!

Avoiding Ask

Let's focus on eliminating the need to ask for responses in our actor first. We can send the messages with the "!" and collect responses into a List of an optional List of balances by account number. But how would we go about doing that?

 

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor._

case class GetCustomerAccountBalances(id: Long)
case class AccountBalances(
  val checking: Option[List[(Long, BigDecimal)]],
  val savings: Option[List[(Long, BigDecimal)]],
  val moneyMarket: Option[List[(Long, BigDecimal)]])
case class CheckingAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])

class SavingsAccountProxy extends Actor {
  def receive = {
   case GetCustomerAccountBalances(id: Long) =>
    sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
 }
}
class CheckingAccountProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! CheckingAccountBalances(Some(List((3, 15000))))
  }
}
class MoneyMarketAccountsProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! MoneyMarketAccountBalances(None)
 }
}

class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
  val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
  var originalSender: Option[ActorRef] = None
  def receive = {
   case GetCustomerAccountBalances(id) =>
    originalSender = Some(sender)
    savingsAccounts ! GetCustomerAccountBalances(id)
    checkingAccounts ! GetCustomerAccountBalances(id)
    moneyMarketAccounts ! GetCustomerAccountBalances(id)
   case AccountBalances(cBalances, sBalances, mmBalances) =>
    (checkingBalances, savingsBalances, mmBalances) match {
     case (Some(c), Some(s), Some(m)) => originalSender.get ! AccountBalances(checkingBalances, savingsBalances,
      mmBalances)
     case _ =>
    }
  }
}

 

This is better, but still leaves a lot to be desired. First of all, we've created our collection of balances we've received back at the instance level, which means we can't differentiate the aggregation of responses to a single request to get account balances. Worse still, we can't time out a request back to our original requestor. Finally, while we've captured the original sender as an instance variable that may or may not have a value (since there is no originalSender when the AccountBalanceRetriever starts up), we have no way of being sure that the originalSender is who we want it to be when we want to send data back!

Capturing Context

The problem is that we're attempting to take the result of the off-thread operations of retrieving data from multiple sources and return it to whomever sent us the message in the first place. However, the actor will likely have moved on to handling additional messages in its mailbox by the time these futures complete, and the state represented in the AccountBalanceRetriever actor for "sender" at that time could be a completely different actor instance. So how do we get around this?

The trick is to create an anonymous inner actor for each GetCustomerInfo message that is being handled. In doing so, you can capture the state you need to have available when the futures are fulfilled. Let's see how:

 

import scala.concurrent.ExecutionContext
        import scala.concurrent.duration._
        import akka.actor._

        case class GetCustomerAccountBalances(id: Long)
        case class AccountBalances(
          val checking: Option[List[(Long, BigDecimal)]],
          val savings: Option[List[(Long, BigDecimal)]],
          val moneyMarket: Option[List[(Long, BigDecimal)]])
        case class CheckingAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class SavingsAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class MoneyMarketAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])

        class SavingsAccountProxy extends Actor {
          def receive = {
           case GetCustomerAccountBalances(id: Long) =>
            sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
          }
        }
        class CheckingAccountProxy extends Actor {
          def receive = {
           case GetCustomerAccountBalances(id: Long) =>
            sender ! CheckingAccountBalances(Some(List((3, 15000))))
          }
        }
        class MoneyMarketAccountsProxy extends Actor {
          def receive = {
           case GetCustomerAccountBalances(id: Long) =>
            sender ! MoneyMarketAccountBalances(None)
          }
        }

        class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
          val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
          var originalSender: Option[ActorRef] = None
          def receive = {
           case GetCustomerAccountBalances(id) => {
            context.actorOf(Props(new Actor() {
                var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
                val originalSender = sender
                def receive = {
                  case CheckingAccountBalances(balances) =>
                   checkingBalances = balances
                    isDone
                  case SavingsAccountBalances(balances) =>
                    savingsBalances = balances
                    isDone
                  case MoneyMarketAccountBalances(balances) =>
                    mmBalances = balances
                    isDone
               }

                def isDone =
                  (checkingBalances, savingsBalances, mmBalances) match {
                    case (Some(c), Some(s), Some(m)) =>
                     originalSender ! AccountBalances(checkingBalances, savingsBalances, mmBalances)
                      context.system.stop(self)
                    case _ =>
                }

                savingsAccounts ! GetCustomerAccountBalances(id)
                checkingAccounts ! GetCustomerAccountBalances(id)
                moneyMarketAccounts ! GetCustomerAccountBalances(id)
             }))
           }
          }
        }

 

This is much better. We've captured the state of each receive and only send it back to the originalSender when all three have values. But there are still two issues here.

First, we haven't defined how we can time out on the original request for all of the balances back to whomever requested them. Secondly, our originalSender is still getting a wrong value – the "sender" from which it is assigned is actually the sender value of the anonymous inner actor, not the one that sent the original GetCustomerAccountBalances message!

Using a Promise

We can use a Promise to handle our need to timeout the original request, by allowing another task to compete for the right to complete the operation with a timeout (see listing below). Promise in Scala holds the expected return value as an Either, typed as Throwable if something goes wrong and as a generic type of whatever successful response you expected. In this case, we want an AccountBalances instance, so our Promise will be typed as Promise[AccountBalance].

 

import java.util.concurrent.TimeoutException
        import scala.concurrent.{ExecutionContext, Promise}
        import scala.concurrent.duration._
        import akka.actor._
        import scala.math.BigDecimal.int2bigDecimal

        case class GetCustomerAccountBalances(id: Long)
        case class AccountBalances(
          val checking: Option[List[(Long, BigDecimal)]],
          val savings: Option[List[(Long, BigDecimal)]],
          val moneyMarket: Option[List[(Long, BigDecimal)]])
        case class CheckingAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class SavingsAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class MoneyMarketAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])

        class SavingsAccountProxy extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id: Long) =>
              sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
          }
        }
        class CheckingAccountProxy extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id: Long) =>
              sender ! CheckingAccountBalances(Some(List((3, 15000))))
          }
        }
        class MoneyMarketAccountsProxy extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id: Long) =>
              sender ! MoneyMarketAccountBalances(None)
          }
        }

        class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id) => {
              val originalSender = sender
              implicit val ec: ExecutionContext = context.dispatcher

              context.actorOf(Props(new Actor() {
                val promisedResult = Promise[AccountBalances]()
                var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
                def receive = {
                  case CheckingAccountBalances(balances) =>
                    checkingBalances = balances
                    collectBalances
                  case SavingsAccountBalances(balances) =>
                    savingsBalances = balances
                    collectBalances
                  case MoneyMarketAccountBalances(balances) =>
                    mmBalances = balances
                    collectBalances
                }

                def collectBalances = (checkingBalances, savingsBalances, mmBalances) match {
                  case (Some(c), Some(s), Some(m)) =>
                    if (promisedResult.trySuccess(AccountBalances(checkingBalances, savingsBalances, mmBalances)))
                      sendResults
                  case _ =>
                }

                def sendResults = {
                  originalSender ! ((promisedResult.future.map(x => x)) recover { case t: TimeoutException => t })
                  context.system.stop(self)
                }

                savingsAccounts ! GetCustomerAccountBalances(id)
                checkingAccounts ! GetCustomerAccountBalances(id)
                moneyMarketAccounts ! GetCustomerAccountBalances(id)
                context.system.scheduler.scheduleOnce(250 milliseconds) {
                  if (promisedResult.tryFailure(new TimeoutException))
                    sendResults
                }
              }))
            }
          }
        }

 

Now we can collect our results and check to see if we successfully completed the promise. If so, we can return the appropriate value or the TimeoutException instance. Finally, we must remember to stop our anonymous inner actor so that we do not leak memory for every GetCustomerAccountBalances message we receive!

Conclusion

Asynchronous programming is simply not easy, even with powerful tools at our disposal. We always must think about the state we need and the context from which we get it. I hope this article has shown you some nice ideas about how to use Actors, Futures and Promises to perform complex tasks, as well as patterns for using them that will help you in your everyday coding. All source code can be found in my Github repository.

Author Bio: Jamie Allen has over 18 years of experience delivering enterprise solutions across myriad industries, platforms, environments and languages. He has been developing enterprise applications with Scala since 2009, primarily using actors for fault tolerance and managing concurrency at scale. Jamie currently works for Typesafe, where he helps users develop actor-based systems using the Akka framework and Scala.

This tutorial originally appeared in JAX Magazine: JavaFX Revitalised. For more of that issue and others, click here.

 

Jamie Allen
Jamie Allen

What do you think?

JAX Magazine - 2014 - 06 Exclucively for iPad users JAX Magazine on Android

Comments

Latest opinions