The extra pattern

Tutorial: Asynchronous Programming With Akka Actors

JamieAllen
akka

Typesafe’s Jamie Allen explains how useful anonymous actors are to capture context while processing asynchronous tasks

From November’s
JAX Magazine
, Typesafe’s Jamie Allen explains how useful
anonymous actors are to capture context while processing
asynchronous tasks in Akka.

One of the most difficult tasks in asynchronous
programming is trying to capture context so that the state of the
world at the time the task was started can be accurately
represented at the time the task finishes.

However, creating anonymous instances of
Akka actors is a very simple and
lightweight solution for capturing the context at the time the
message was handle, to be utilized when the tasks are successfully
completed.

The Problem

A great example is an actor which is
sequentially handling messages in its mailbox but performing the
tasks based on those message off-thread with Futures. This is a
great way to design your actors in that they will not block waiting
for responses, allowing them to handle more messages concurrently
and increase your application’s performance. However, the state of
the actor will likely change with every message.

Let’s take an example of an actor which will act
as a proxy to get a customer account information for a financial
services firm from multiple data sources. Further, let’s assume
that each of the subsystem proxies for savings, checking and money
market account balances will optionally return a list of accounts
and their balances of that kind for this customer. Let’s write some
basic Akka actor code to perform this task:

 

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout

case class GetCustomerAccountBalances(id: Long)
case class AccountBalances(
 val checking: Option[List[(Long, BigDecimal)]],
 val savings: Option[List[(Long, BigDecimal)]],
 val moneyMarket: Option[List[(Long, BigDecimal)]])
case class CheckingAccountBalances(
 val balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(
 val balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(
 val balances: Option[List[(Long, BigDecimal)]])

class SavingsAccountProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
 }
}
class CheckingAccountProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! CheckingAccountBalances(Some(List((3, 15000))))
 }
}
class MoneyMarketAccountsProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! MoneyMarketAccountBalances(None)
 }
}

class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
 implicit val timeout: Timeout = 100 milliseconds
 implicit val ec: ExecutionContext = context.dispatcher
 def receive = {
  case GetCustomerAccountBalances(id) =>
   val futSavings = savingsAccounts ? GetCustomerAccountBalances(id)
   val futChecking = checkingAccounts ? GetCustomerAccountBalances(id)
   val futMM = moneyMarketAccounts ? GetCustomerAccountBalances(id)
   val futBalances = for {
    savings <- futSavings.mapTo[Option[List[(Long, BigDecimal)]]]
    checking <- futChecking.mapTo[Option[List[(Long, BigDecimal)]]]
    mm <- futMM.mapTo[Option[List[(Long, BigDecimal)]]]
   } yield AccountBalances(savings, checking, mm)
   futBalances map (sender ! _)
 }
}

 

This code is fairly concise. The
AccountBalanceRetriever actor receives a message to get account
balances for a customer, and then it fires off three futures in
parallel. The first will get the customer’s savings account
balance, the second will get the checking account balance and the
third will get a money market balance. Doing these tasks in
parallel allows us to avoid the expensive cost of performing the
retrievals sequentially. Also, note that while the futures will
return Options of some account balances by account ID.  If
they return None, they will not short-circuit the for
comprehension. If None is returned from futSavings, it will still
continue the for comprehension.

However, there are
a couple of things about it that are not ideal. First of all, it is
using futures to ask other actors, for responses, and creating a
new PromiseActorRef for every message sent, which is a waste of
resources. It would be better to have our AccountBalanceRetriever
actor send messages out in a “fire and forget” fashion and collect
results asynchronously into *one* actor.
.

Furthermore, there is a glaring race condition
in this code – can you see it? We’re referencing the “sender” in
our map operation on the result from futBalances, which may not be
the same ActorRef when the future completes, because the
AccountBalanceRetriever ActorRef may now be handling another
message from a different sender at that point!

Avoiding Ask

Let’s focus on eliminating the need to ask for
responses in our actor first. We can send the messages with the “!”
and collect responses into a List of an optional List of balances
by account number. But how would we go about doing that?

 

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import akka.actor._

case class GetCustomerAccountBalances(id: Long)
case class AccountBalances(
  val checking: Option[List[(Long, BigDecimal)]],
  val savings: Option[List[(Long, BigDecimal)]],
  val moneyMarket: Option[List[(Long, BigDecimal)]])
case class CheckingAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])
case class SavingsAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])
case class MoneyMarketAccountBalances(
  val balances: Option[List[(Long, BigDecimal)]])

class SavingsAccountProxy extends Actor {
  def receive = {
   case GetCustomerAccountBalances(id: Long) =>
    sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
 }
}
class CheckingAccountProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! CheckingAccountBalances(Some(List((3, 15000))))
  }
}
class MoneyMarketAccountsProxy extends Actor {
 def receive = {
  case GetCustomerAccountBalances(id: Long) =>
   sender ! MoneyMarketAccountBalances(None)
 }
}

class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
  val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
  var originalSender: Option[ActorRef] = None
  def receive = {
   case GetCustomerAccountBalances(id) =>
    originalSender = Some(sender)
    savingsAccounts ! GetCustomerAccountBalances(id)
    checkingAccounts ! GetCustomerAccountBalances(id)
    moneyMarketAccounts ! GetCustomerAccountBalances(id)
   case AccountBalances(cBalances, sBalances, mmBalances) =>
    (checkingBalances, savingsBalances, mmBalances) match {
     case (Some(c), Some(s), Some(m)) => originalSender.get ! AccountBalances(checkingBalances, savingsBalances,
      mmBalances)
     case _ =>
    }
  }
}

 

This is better, but still leaves a lot to be desired. First of
all, we’ve created our collection of balances we’ve received back
at the instance level, which means we can’t differentiate the
aggregation of responses to a single request to get account
balances. Worse still, we can’t time out a request back to our
original requestor. Finally, while we’ve captured the original
sender as an instance variable that may or may not have a value
(since there is no originalSender when the AccountBalanceRetriever
starts up), we have no way of being sure that the originalSender is
who we want it to be when we want to send data back!

Capturing Context

The problem is that we’re attempting to take the result of the
off-thread operations of retrieving data from multiple sources and
return it to whomever sent us the message in the first place.
However, the actor will likely have moved on to handling additional
messages in its mailbox by the time these futures complete, and the
state represented in the AccountBalanceRetriever actor for “sender”
at that time could be a completely different actor instance. So how
do we get around this?

The trick is to create an anonymous inner actor for each
GetCustomerInfo message that is being handled. In doing so, you can
capture the state you need to have available when the futures are
fulfilled. Let’s see how:

 

import scala.concurrent.ExecutionContext
        import scala.concurrent.duration._
        import akka.actor._

        case class GetCustomerAccountBalances(id: Long)
        case class AccountBalances(
          val checking: Option[List[(Long, BigDecimal)]],
          val savings: Option[List[(Long, BigDecimal)]],
          val moneyMarket: Option[List[(Long, BigDecimal)]])
        case class CheckingAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class SavingsAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class MoneyMarketAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])

        class SavingsAccountProxy extends Actor {
          def receive = {
           case GetCustomerAccountBalances(id: Long) =>
            sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
          }
        }
        class CheckingAccountProxy extends Actor {
          def receive = {
           case GetCustomerAccountBalances(id: Long) =>
            sender ! CheckingAccountBalances(Some(List((3, 15000))))
          }
        }
        class MoneyMarketAccountsProxy extends Actor {
          def receive = {
           case GetCustomerAccountBalances(id: Long) =>
            sender ! MoneyMarketAccountBalances(None)
          }
        }

        class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
          val checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
          var originalSender: Option[ActorRef] = None
          def receive = {
           case GetCustomerAccountBalances(id) => {
            context.actorOf(Props(new Actor() {
                var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
                val originalSender = sender
                def receive = {
                  case CheckingAccountBalances(balances) =>
                   checkingBalances = balances
                    isDone
                  case SavingsAccountBalances(balances) =>
                    savingsBalances = balances
                    isDone
                  case MoneyMarketAccountBalances(balances) =>
                    mmBalances = balances
                    isDone
               }

                def isDone =
                  (checkingBalances, savingsBalances, mmBalances) match {
                    case (Some(c), Some(s), Some(m)) =>
                     originalSender ! AccountBalances(checkingBalances, savingsBalances, mmBalances)
                      context.system.stop(self)
                    case _ =>
                }

                savingsAccounts ! GetCustomerAccountBalances(id)
                checkingAccounts ! GetCustomerAccountBalances(id)
                moneyMarketAccounts ! GetCustomerAccountBalances(id)
             }))
           }
          }
        }

 

This is much better. We’ve captured the state of each receive
and only send it back to the originalSender when all three have
values. But there are still two issues here.

First, we haven’t defined how we can time out on the original
request for all of the balances back to whomever requested them.
Secondly, our originalSender is still getting
a wrong value – the “sender” from which it is assigned is actually
the sender value of the anonymous inner actor, not
the one that sent the original GetCustomerAccountBalances
message!

Using a Promise

We can use a Promise to handle our need to timeout the original
request, by allowing another task to compete for the right to
complete the operation with a timeout (see listing below). Promise
in Scala holds the expected return value as an Either, typed as
Throwable if something goes wrong and as a generic type of whatever
successful response you expected. In this case, we want an
AccountBalances instance, so our Promise will be typed as
Promise[AccountBalance].

 

import java.util.concurrent.TimeoutException
        import scala.concurrent.{ExecutionContext, Promise}
        import scala.concurrent.duration._
        import akka.actor._
        import scala.math.BigDecimal.int2bigDecimal

        case class GetCustomerAccountBalances(id: Long)
        case class AccountBalances(
          val checking: Option[List[(Long, BigDecimal)]],
          val savings: Option[List[(Long, BigDecimal)]],
          val moneyMarket: Option[List[(Long, BigDecimal)]])
        case class CheckingAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class SavingsAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])
        case class MoneyMarketAccountBalances(
          val balances: Option[List[(Long, BigDecimal)]])

        class SavingsAccountProxy extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id: Long) =>
              sender ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
          }
        }
        class CheckingAccountProxy extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id: Long) =>
              sender ! CheckingAccountBalances(Some(List((3, 15000))))
          }
        }
        class MoneyMarketAccountsProxy extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id: Long) =>
              sender ! MoneyMarketAccountBalances(None)
          }
        }

        class AccountBalanceRetriever(savingsAccounts: ActorRef, checkingAccounts: ActorRef, moneyMarketAccounts: ActorRef) extends Actor {
          def receive = {
            case GetCustomerAccountBalances(id) => {
              val originalSender = sender
              implicit val ec: ExecutionContext = context.dispatcher

              context.actorOf(Props(new Actor() {
                val promisedResult = Promise[AccountBalances]()
                var checkingBalances, savingsBalances, mmBalances: Option[List[(Long, BigDecimal)]] = None
                def receive = {
                  case CheckingAccountBalances(balances) =>
                    checkingBalances = balances
                    collectBalances
                  case SavingsAccountBalances(balances) =>
                    savingsBalances = balances
                    collectBalances
                  case MoneyMarketAccountBalances(balances) =>
                    mmBalances = balances
                    collectBalances
                }

                def collectBalances = (checkingBalances, savingsBalances, mmBalances) match {
                  case (Some(c), Some(s), Some(m)) =>
                    if (promisedResult.trySuccess(AccountBalances(checkingBalances, savingsBalances, mmBalances)))
                      sendResults
                  case _ =>
                }

                def sendResults = {
                  originalSender ! ((promisedResult.future.map(x => x)) recover { case t: TimeoutException => t })
                  context.system.stop(self)
                }

                savingsAccounts ! GetCustomerAccountBalances(id)
                checkingAccounts ! GetCustomerAccountBalances(id)
                moneyMarketAccounts ! GetCustomerAccountBalances(id)
                context.system.scheduler.scheduleOnce(250 milliseconds) {
                  if (promisedResult.tryFailure(new TimeoutException))
                    sendResults
                }
              }))
            }
          }
        }

 

Now we can collect our results and check to see if we
successfully completed the promise. If so, we can return the
appropriate value or the TimeoutException instance. Finally, we
must remember to stop our anonymous inner actor so that we do not
leak memory for every GetCustomerAccountBalances message we
receive!

Conclusion

Asynchronous programming is simply not easy, even with powerful
tools at our disposal. We always must think about the state we need
and the context from which we get it. I hope this article has shown
you some nice ideas about how to use Actors, Futures and Promises
to perform complex tasks, as well as patterns for using them that
will help you in your everyday coding. All source code can be found
in my
Github repository
.

Author Bio: Jamie Allen has
over 18 years of experience delivering enterprise solutions across
myriad industries, platforms, environments and languages. He has
been developing enterprise applications with Scala since 2009,
primarily using actors for fault tolerance and managing concurrency
at scale. Jamie currently works for Typesafe, where he helps users
develop actor-based systems using the Akka framework and
Scala.

This tutorial originally appeared in JAX
Magazine: JavaFX Revitalised. For more of that issue and others,
click
here
.

 

Author
JamieAllen
Jamie Allen has over 18 years of experience delivering enterprise solutions across myriad industries, platforms, environments and languages. He has been developing enterprise applications with Scala since 2009, primarily using actors for fault tolerance and managing concurrency at scale. Jamie currently works for Typesafe, where he helps users develop actor-based systems using the Akka framework and Scala.
Comments
comments powered by Disqus