Functional Event Sourcing meets The Elm Architecture

This post is part of the F# Advent Calendar 2016 series.

One of the highlights of the year for me was the farewell to FRP post by Evan Czaplicki. For a long time, I've been looking for a simple functional alternative to the MVC UI models.

There are a number of FRP alternatives but they all have limitations. They use signals heavily and many have inherent memory leak issues.

Czaplicki removed signals, simplifying the model dramatically. It resulted in something truly beautiful. A simple and composable way of building UIs.

Event Sourcing is also a compelling pattern I have found very useful. In some domains like accounting it is a perfect fit.

This post explores how Functional Event Sourcing fits with the Elm Architecture covered in a previous post. A combined festive WPF application is developed to streamline Santa's workload. The application code can be found here.

Functional Event Sourcing

Event

type EventID = EventID of time:DateTime * user:UserID

An event is something that happens at a specific point. In physics an event is a change that happens at a point in spacetime. In event sourcing the point is a unique identifier of the event.

In most systems this can just be the time and user who created the event. This EventID can also include additional data required to make it unique.

The example application uses Stopwatch to increase the precision of DateTime. The application also ensures each EventID time is unique. NTP servers could also be used to calibrate the application if a comparison of time between different machines is required.

As well as being a unique identifier of the event the EventID also satisfies all the data requirement for audit.

Aggregate

type 'Aggregate ID = Created of EventID

type 'Aggregate Events = (EventID * 'Aggregate list1) list1

An aggregate is a collection of events that are bound together by a root entity. External entities can only hold a reference to the root entity identifier. An aggregate is a unit of consistency that has atomicity and autonomy.

In the example application we have the following domain model. Each case represents a possible change to the aggregate.

type Work = uint16
type Age = byte
type Behaviour = Good | Mixed | Bad

type Toy =
    | Name of string
    | AgeRange of lo:Age * hi:Age
    | WorkRequired of Work

type Elf =
    | Name of string
    | WorkRate of Work
    | Making of Toy ID option

type Kid =
    | Name of string
    | Age of Age
    | Behaviour of Behaviour
    | WishList of Toy ID SetEvent

Most of the events are simple field changes but events such as Recalled for Toy are possible.

The rules for domain model schema migration and data serialization are

FsPickler can be configured to comply with these rules, making it easy to serialize events.

Store

type 'Aggregate MemoryStore =
    {
        Latest: Map<'Aggregate ID, 'Aggregate Events>
        Observers: IObserver<Map<'Aggregate ID, 'Aggregate Events>> list
    }

Stores are the databases of event sourcing. They can be in memory, remote or disconnected for example.

Many different concurrency models are possible. In the example application we have linear event sourcing with optimistic concurrency which is the simplest and corresponds to most relational database applications.

More fine grained concurrency is possible and Making on Elf would be a good candidate as only the Santa process changes this. Advanced concurrency models are also possible with event sourcing where events are designed to commute such as CRDTs. These enable disconnected systems. Git is an example of a successful disconnected system.

Benefits of functional event sourcing

Example Application

The application has two background processes running continuously. The first is the kids process that randomly adds and removes toys to the kids' Christmas wishlists. The second is the Santa process that assigns free elfs to make toys in the priority order of kid behaviour and request time.

All the screens update in realtime and any of the entities in the domain can be edited. All the entity edit screens have validation at both the field and aggregate level. A field editor Elm app was reused across all these fields.

The previous F# Elm implementation was extended to include subscriptions and commands. Minimal UI styling functionality was also added.

Santa's Summary

Conclusion

This turns out to be quite a complicated problem we have solved. It would be interesting to see a more traditional solution in OO and a relational data model. I can only imagine that both the domain model and code would be much more complicated.

One caveat with event sourcing would be that cross aggregate transactions are not possible. This may take a little thinking to become comfortable with. It is possible to express two phase commits explicitly in the domain model. Being explicit about these may also tease out the correct business requirements and lead to a better solution.

Functional Event Sourcing fits naturally with the subscription and command model in Elm. Time travel debug and easy regression are features of both patterns and work well together. The patterns result in a highly type safe and testable system.

I would recommend functional event sourcing in any application where strong audit or schema evolution are a requirement. Linear event sourcing, optimistic concurrency and persisting each type to a single database table would be a natural starting point.

Hopefully F# will get Santa's follow up present delivery project. Happy holidays!

namespace System
type 'a list1 = | List1 of 'a list


 A non-empty list
union case list1.List1: 'a list -> 'a list1
type 'T list = List<'T>
type UserID = | User of int
union case UserID.User: int -> UserID
Multiple items
val int : value:'T -> int (requires member op_Explicit)

--------------------
type int = int32

--------------------
type int<'Measure> = int
type 'a SetEvent =
  | SetAdd of 'a
  | SetRemove of 'a
union case SetEvent.SetAdd: 'a -> 'a SetEvent
union case SetEvent.SetRemove: 'a -> 'a SetEvent
Multiple items
union case EventID.EventID: time: DateTime * user: UserID -> EventID

--------------------
type EventID = | EventID of time: DateTime * user: UserID
Multiple items
type DateTime =
  struct
    new : ticks:int64 -> DateTime + 10 overloads
    member Add : value:TimeSpan -> DateTime
    member AddDays : value:float -> DateTime
    member AddHours : value:float -> DateTime
    member AddMilliseconds : value:float -> DateTime
    member AddMinutes : value:float -> DateTime
    member AddMonths : months:int -> DateTime
    member AddSeconds : value:float -> DateTime
    member AddTicks : value:int64 -> DateTime
    member AddYears : value:int -> DateTime
    ...
  end

--------------------
DateTime ()
   (+0 other overloads)
DateTime(ticks: int64) : DateTime
   (+0 other overloads)
DateTime(ticks: int64, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, kind: DateTimeKind) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, calendar: Globalization.Calendar) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int) : DateTime
   (+0 other overloads)
DateTime(year: int, month: int, day: int, hour: int, minute: int, second: int, millisecond: int, kind: DateTimeKind) : DateTime
   (+0 other overloads)
type 'Aggregate ID = | Created of EventID
union case ID.Created: EventID -> 'Aggregate ID
type 'Aggregate Events = (EventID * 'Aggregate list1) list1
type Work = uint16
Multiple items
val uint16 : value:'T -> uint16 (requires member op_Explicit)

--------------------
type uint16 = UInt16
type Age = byte
Multiple items
val byte : value:'T -> byte (requires member op_Explicit)

--------------------
type byte = Byte
type Behaviour =
  | Good
  | Mixed
  | Bad
union case Behaviour.Good: Behaviour
union case Behaviour.Mixed: Behaviour
union case Behaviour.Bad: Behaviour
type Toy =
  | Name of string
  | AgeRange of lo: Age * hi: Age
  | WorkRequired of Work
union case Toy.Name: string -> Toy
Multiple items
val string : value:'T -> string

--------------------
type string = String
union case Toy.AgeRange: lo: Age * hi: Age -> Toy
union case Toy.WorkRequired: Work -> Toy
type Elf =
  | Name of string
  | WorkRate of Work
  | Making of Toy ID option
union case Elf.Name: string -> Elf
union case Elf.WorkRate: Work -> Elf
union case Elf.Making: Toy ID option -> Elf
type 'T option = Option<'T>
type Kid =
  | Name of string
  | Age of Age
  | Behaviour of Behaviour
  | WishList of Toy ID SetEvent
union case Kid.Name: string -> Kid
Multiple items
union case Kid.Age: Age -> Kid

--------------------
type Age = byte
Multiple items
union case Kid.Behaviour: Behaviour -> Kid

--------------------
type Behaviour =
  | Good
  | Mixed
  | Bad
union case Kid.WishList: Toy ID SetEvent -> Kid
type 'Aggregate MemoryStore =
  {Latest: Map<'Aggregate ID,'Aggregate Events>;
   Observers: IObserver<Map<'Aggregate ID,'Aggregate Events>> list;}
MemoryStore.Latest: Map<'Aggregate ID,'Aggregate Events>
Multiple items
module Map

from Microsoft.FSharp.Collections

--------------------
type Map<'Key,'Value (requires comparison)> =
  interface IReadOnlyDictionary<'Key,'Value>
  interface IReadOnlyCollection<KeyValuePair<'Key,'Value>>
  interface IEnumerable
  interface IComparable
  interface IEnumerable<KeyValuePair<'Key,'Value>>
  interface ICollection<KeyValuePair<'Key,'Value>>
  interface IDictionary<'Key,'Value>
  new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
  member Add : key:'Key * value:'Value -> Map<'Key,'Value>
  member ContainsKey : key:'Key -> bool
  ...

--------------------
new : elements:seq<'Key * 'Value> -> Map<'Key,'Value>
MemoryStore.Observers: IObserver<Map<'Aggregate ID,'Aggregate Events>> list
type IObserver<'T> =
  member OnCompleted : unit -> unit
  member OnError : error:Exception -> unit
  member OnNext : value:'T -> unit