days
-3
-6
hours
-2
-2
minutes
-4
-9
seconds
-5
-4
search
Akka Typed is back for round 4!

Tour of Akka Typed: Event Sourcing

Manuel Bernhardt
akka
© Shutterstock / Who is Danny

Manuel Bernhardt continues his series about Akka Typed, the new Akka Actor API that brings significant advantages over the classic one. In his fourth entry, he takes a closer look at one of the most popular use-cases for Akka: event sourcing.

Welcome to the fourth part of the Akka Typed series. In the first part we had a look at the the raison d’être of Akka Typed and what advantages it has over the classic, untyped API. In the second part we introduced the concepts of message adapters, the ask pattern and actor discovery. In the third part, we covered one of the core concepts of Actor systems: supervision and failure recovery. In this part of the series we’ll explore one of the most popular use-cases for Akka: event sourcing. We’ll assume that you are already familiar with the concept of event sourcing – if not, first read about it to get some context.

In Akka Classic, event sourcing are implemented using persistent actors, which work as follows:

akka

Persistent Actor in Akka Classic – Source: Reactive Web Applications

The flow is quite straightforward:

  • the actor receives a number of Commands which may get validated, then accepted or rejected
  • the Commands are turned into Events
  • the Events are persisted to a journal
  • once the journal has acknowledged the event, an event handler runs
  • as a result of running the event handler, one or more side-effects can occur, such as replying to the actor that sent the initial event

Whilst the concepts of Commands and Events were already present in Akka Classic (for example, the command handler is called receiveCommand), the protocol was not enforcing this distinction per-se.

With the Akka Typed API, this has now changed: many of the best practices and patterns used in combination with persistent actors are now offered directly through the API, which makes it easier to implement event-sourced actors. Let’s take a closer look!

EventSourcedBehavior

Since in Akka Typed, everything is a behavior, it makes sense that persistence too is represented via a special type of behavior.

In the previous article of the series, we used a dummy storage for credit cards (that was crashing all the time) to demonstrate supervision. Let’s now implement a real version using the new EventSourcedBehavior:

object CreditCardStorage {
 
  sealed trait Command
  sealed trait Event
 
  final case class State(cards: Map[CreditCardId, StoredCreditCard] = Map.empty)
  case class StoredCreditCard(id: CreditCardId)
 
  def apply(): Behavior[Command] = EventSourcedBehavior[Command, Event, State](
    persistenceId = PersistenceId("cc"),
    emptyState = State(),
    commandHandler = (state, command) => Effect.none,
    eventHandler = (state, event) => state
  )
 
}

As we can see from the code above, the concepts of Command and Event are now formalized. They of course need not be called this way, but the API will expect two types to point to these concepts in your code. Additionally, the type of the state (and an “empty” state from which to begin with) needs to be provided too – as we will see, event handlers need to return a new version of the in-memory state that represents the modified state after an event has been applied to it.

Next to formalizing the protocol of persistent actors, the Akka Typed API also formalizes the command and event handlers in order to specify what they can, or rather should, do.

Commands producing effects

A Command handler takes in the current state and a new Command, and returns an Effect:

def handleCommand(state: State, cmd: Command): Effect[Event, State] = ...

An Effect can be any of the following:

  • Effect.persist will persist one or several events to the journal
  • Effect.none will do nothing
  • Effect.unhandled signals that this Command is not supported (it may not be supported at all, or not supported in the current state)
  • Effect.stop stops the actor

Say for example that we are making it possible to add credit cards to our storage using a new AddCreditCard command and the corresponding CreditCardAdded:

// the protocol for adding credit cards
final case class AddCreditCard(userId: UserId, last4Digits: String) extends Command
final case class CreditCardAdded(id: CreditCardId, userId: UserId, last4Digits: String) extends Event
 
def handleCommand(state: State, cmd: Command): Effect[Event, State] = cmd match {
  case AddCreditCard(userId, last4Digits, replyTo) =>
    val cardAlreadyExists = state.cards.values.exists(cc => cc.userId == userId && cc.last4Digits == last4Digits)
    if (cardAlreadyExists) {
      Effect.unhandled
    } else {
      val event = CreditCardAdded(CreditCardId(UUID.randomUUID().toString), userId, last4Digits)
      Effect.persist(event)
  }
}

In the event of a credit card being added, the corresponding event handler is rather straight-forward:

def handleEvent(state: State, event: Event): State = event match {
  case CreditCardAdded(id, userId, last4Digits) =>
    state.copy(cards = state.cards.updated(id, StoredCreditCard(id, userId, last4Digits)))
}

“But wait”, I hear you object, “how can I now reply to the sender to tell them that the credit card was successfully added, or log a warning if it was already there?”. That’s now also supported by the API as SideEffect-s.

Commands producing side-effects

Next to persisting events (or not), command handlers have the ability to specify one or more actions to be executed if the initial effect is executed successfully. Taking our example above, we could decide to log a warning if an already existing card was asked to be added again:

if (cardAlreadyExists) {
  Effect.unhandled.thenRun { _ => 
    context.log.warn("Tried adding already existing card")
  }
} else {
  // ...
}

It is possible to chain side-effects if need be:

if (cardAlreadyExists) {
  Effect.unhandled.thenRun { _ => 
    context.log.warn("Tried adding already existing card")
  }.thenRun { _ =>
    replyTo ! CardAlreadyExists(userId, last4Digits) 
  }
} else {
  // ...
}

These chained side-effects will then be executed in sequence.

Arguably, the most common side-effect in persistent actors is to reply to the actor that initiated the command so as to enable at-least-once delivery and exactly-once effect semantics. The Akka Team thought of this and promoted the concept of a reply to a top-level concern in the Akka Typed API.

Commands producing enforced replies

In the spirit of Akka Typed leveraging the compiler in order to prevent programming errors, there’s now a way to ensure that actors with persistent behavior reply to their clients after an event has been persisted (or in other cases, such as validation errors).

In order to be able to use this feature, we’ll need to extend our top-level protocol definition so as to include the concept of replies into it:

sealed trait Command[Reply <: CommandReply] {
  def replyTo: ActorRef[Reply]
}
sealed trait Event
sealed trait CommandReply

Command-s must now specify a reply type. Typically there will be a handful of replies for each operation a persistent actor supports, such as in our case:

final case class AddCreditCard(userId: UserId, last4Digits: String, replyTo: ActorRef[AddCreditCardResult]) extends Command[AddCreditCardResult]
final case class CreditCardAdded(id: CreditCardId, userId: UserId, last4Digits: String) extends Event
sealed trait AddCreditCardResult extends CommandReply
case object Added extends AddCreditCardResult
case object Duplicate extends AddCreditCardResult

At first sight, the protocol may now seem to get a little bit verbose, but then again chances are that you’ll require this amount of messages anyway in a real-world system – so it’s even the better if they are organised and checked by the compiler.

In order to use the feature we’ll need to use the withEnforcedReplies factory of EventSourcedBehavior:

def apply(): Behavior[Command[_]] = Behaviors.setup { context =>
  EventSourcedBehavior.withEnforcedReplies[Command[_], Event, State](
    persistenceId = PersistenceId("cc"),
    emptyState = State(),
    commandHandler = handleCommand(context),
    eventHandler = handleEvent
  )
}

So for example, if we now omit to reply to the sender that the credit card has been successfully added, we’ll get the following compile-time error:

[error] /home/manu/workspace/typed-payment-processor/src/main/scala/io/bernhardt/typedpayment/CreditCardStorage.scala:48:23: polymorphic expression cannot be instantiated to expected type;
[error]  found   : [State]akka.persistence.typed.scaladsl.EffectBuilder[io.bernhardt.typedpayment.CreditCardStorage.CreditCardAdded,State]
[error]  required: akka.persistence.typed.scaladsl.ReplyEffect[io.bernhardt.typedpayment.CreditCardStorage.Event,io.bernhardt.typedpayment.CreditCardStorage.State]
[error]         Effect.persist(event)

(granted, the compilation error may be a little bit cryptic – but the mention of ReplyEffect might be a good hint as to what is going on)

Replies (or the omission thereof) can be produced via the Effect.replyEffect.noReplyEffect.thenReply and Effect.thenNoReply methods:

val event = CreditCardAdded(CreditCardId(UUID.randomUUID().toString), userId, last4Digits)
Effect
  .persist(event)
  .thenReply(replyTo)(_ => Added)

Writing large persistent behaviors

Our example so far is pretty simple: the state is “flat” in the sense that there is only really one state of the credit card storage with a more or less filled map of known cards. Adding / removing or querying this state is not all too complicated, and for the time being the amount of supported operations on the storage is rather limited.

In real life, however, persistent entities tend to have many subtleties and to grow in terms of supported operations. It is therefore a good idea to stick to a few simple rules that make the code more readable. The Akka Typed documentation describes this with an accordingly involved example. I’d definitely recommend you checking it out in order to get a grasp of how to deal with a more complex domain. Whilst the recommendations are a matter of taste, it turns out that organizing the code as proposed has its advantages insofar as they follow the teachings of domain driven design, where the state represents the core domain object.

We’ll be exploring the two most impactful recommendations by applying them to our example, which we’ll enrich with the capability of looking up credit cards by id – as said, make sure to check out the style guide to get an idea of what a more advanced example looks like.

Defining event handlers in the state

Event handlers are not side-effecting and therefore the only thing they do is to alter the in-memory state based on an event. As such, keeping the handlers in the state is something that has a quite natural feel to it:

// state definition
final case class Storage(cards: Map[CreditCardId, StoredCreditCard] = Map.empty) {
  def applyEvent(event: Event): Storage = event match {
    case CreditCardAdded(id, userId, last4Digits) =>
      copy(cards = cards.updated(id, StoredCreditCard(id, userId, last4Digits)))
  }
}

Defining command handlers in the state

Taking this idea one step further, it is also possible to push the command handlers inside of the state:

// the protocol for looking up credit cards by credit card id
final case class FindById(id: CreditCardId, replyTo: ActorRef[FindCreditCardResult]) extends Command[FindCreditCardResult]
sealed trait FindCreditCardResult extends CommandReply
case class CreditCardFound(card: StoredCreditCard) extends FindCreditCardResult
case object CreditCardNotFound extends FindCreditCardResult
 
// state definition
final case class Storage(cards: Map[CreditCardId, StoredCreditCard] = Map.empty) {
 
  def applyEvent(event: Event): Storage = // ...
 
  def applyCommand(context: ActorContext[Command[_]], cmd: Command[_]): ReplyEffect[Event, Storage] = cmd match {
    case AddCreditCard(userId, last4Digits, replyTo) =>
      val cardAlreadyExists = cards.values.exists(cc => cc.userId == userId && cc.last4Digits == last4Digits)
      if (cardAlreadyExists) {
        Effect.unhandled.thenRun { _: Storage =>
          context.log.warn("Tried adding already existing card")
        }.thenReply(replyTo)(_ => Duplicate)
      } else {
        val event = CreditCardAdded(CreditCardId(UUID.randomUUID().toString), userId, last4Digits)
        Effect
          .persist(event)
          .thenReply(replyTo)(_ => Added)
      }
    case FindById(id, replyTo) if cards.contains(id) =>
      Effect.reply(replyTo)(CreditCardFound(cards(id)))
    case FindById(id, replyTo) if !cards.contains(id) =>
      Effect.reply(replyTo)(CreditCardNotFound)
  }
}

The result of this style is that the technical behavior definition is now quite straight-forward:

def apply(): Behavior[Command[_]] = Behaviors.setup { context =>
  EventSourcedBehavior.withEnforcedReplies[Command[_], Event, Storage](
    persistenceId = PersistenceId("cc"),
    emptyState = Storage(),
    commandHandler = (state, cmd) => state.applyCommand(context, cmd),
    eventHandler = (state, evt) => state.applyEvent(evt)
  )
}

All the business logic is contained in the Storage state object, which, in combination with the actor protocol, is really the core of what a persistent entity should all be about.

This is it for this article of the series. Note that we haven’t touched all aspects of persistent behaviors as that would be too much to cover. If you are just getting started with Akka, make sure to think twice about serialization and schema evolution as these concerns are in my experience some of the more important aspects of developing production applications with Akka Persistence.

As usual, here’s the concept comparison table between Akka Classic and Akka Typed (see also the official migration guide):

akka

The code for this article can be found on GitHub.

This post was originally published on Manuel Bernhardt’s blog.

Author
Manuel Bernhardt
Manuel Bernhardt is a passionate engineer, author, speaker and consultant who has a keen interest in the science of building and operating networked applications that run smoothly despite their distributed nature. Since 2008, he has guided and trained enterprise teams on the transformation to distributed computing. In recent years he is focusing primarily on production systems that embrace the reactive application architecture, using Scala, Play Framework and Akka to this end. Manuel likes to travel and is a frequent speaker at international conferences. He lives in Vienna where he is a co-organizer of the Scala Vienna User Group. Next to thinking, talking about and fiddling with computers he likes to spend time with his family, run, scuba-dive and read. You can find out more about Manuel's recent work at http://manuel.bernhardt.io.

Leave a Reply

Be the First to Comment!

avatar
400
  Subscribe  
Notify of