Martin Fowler talked about
Event Sourcing. Greg Young
mentioned about using Event Sourcing in algorithmic trading applications. Very recently Jonas Boner
hacked up Martin's example using asynchronous Scala actors. That's heck of a precedence to conclude that there is enough meat in this lunch.
Let me summarize my thoughts on Event Sourcing using Scala actors, and how it can be used to perform retroactive monitoring and diagnostics in an environment that supports code hot-swapping. The thoughts are from the perspective of a real world application with real world requirements. And yes, this is also a trading application that processes security trades that pass through various state transitions in its lifecycle before being finally persisted in the database. In a trading application, tracking exceptions is one of the main components, and exception monitors play an important role in the way the trading desk would like to explore and visualize the processing pipeline. The whole idea is to prevent manual intervention. Hence trades falling off the straight-thru-processing pipeline because of exceptions need to be put back on rails, sometimes even digging into the past, patching up services real-time, and exploring the what-if s and what-would-have-happened scenarios. In short, there is enough reason to do retroactive mining of the application through replays and hotswaps.
Without further ado, let us hack up something similar that accumulates events as they occur in the lifecycle of a trade .. but first, some trivial abstractions (of course elided for demonstration purposes) to set up the stage of the domain model ..
sealed trait InstrumentType
case object EQ extends InstrumentType
case object FI extends InstrumentType
case class Instrument(isin: String, name: String, insType: InstrumentType)
case class Trade(id: Int, ref: String, ins: Instrument,
qty: Int, unitPrice: Int, taxFee: Int, net: Int)
The domain objects have been simplified beyond imagination, but still enough to serve the purpose of this post. The only point of note is that all of them have been modeled as immutable abstractions.
As I mentioned above, a trade goes through many state transitions in its lifecycle. We model each transition as being triggerred through an event, which we would like to capture in an event queue.
sealed abstract case class Event {
val recorded = new Date
}
sealed abstract case class TradeProcessingEvent(val trade: Trade)
extends Event
case class EnrichTrade(override val trade: Trade)
extends TradeProcessingEvent(trade)
case class ValueTrade(override val trade: Trade)
extends TradeProcessingEvent(trade)
case object Show
Once again, as per the best practices, the events are modeled as immutable objects. Also, we keep the events independent of the processing logic, and keep all processing in the layer that seems most natural for them - the domain service layer. And here, we model the service as an actor, that receives all events for the trade and does appropriate processing ..
val tradingService_1 = actor {
loop {
react {
case e@EnrichTrade(t@trade) =>
println("processing event " + e + " out trade: " +
Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, 100, t.net))
case v@ValueTrade(t@trade) =>
println("processing event " + v + " out trade: " +
Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, t.taxFee, 1000))
}
}
}
And finally the
EventProcessor
that sources event processing for all trades and acts as the facade to the layer above.
EventProcessor
is also modeled as an actor, for obvious reasons of scalability,
eventual consistency and all the benefits that asynchronous models offer.
class EventProcessor extends Actor {
def act = loop(tradingService_1, Nil)
def loop(ts: Actor, events: List[TradeProcessingEvent]) {
react {
case Replay =>
events.reverse.foreach(ts ! _)
loop(ts, events)
case event: TradeProcessingEvent =>
ts ! event
loop(ts, event :: events)
case Show =>
events.reverse.foreach(println)
loop(ts, events)
}
}
}
case object Replay extends Event
Note that the loop in the actor above takes the list of events as the state and maintains it in each recursive call. The list gets appended to when we receive new events - this is an effective idiom in functional programming of state management, which does not induce any mutual side-effect. The service is also part of the state, and is used to process every event that comes its way. This will have a very important role to play, as we move along ..
REPL ing it out
Ok, now with enough meat on our plates, let us hack the hell out. Let us fire up the scala REPL and see some of the above stuff in action ..
scala> import org.dg.event.EventSourcing._
import org.dg.event.EventSourcing._
scala> i1
res0: org.dg.event.EventSourcing.Instrument = Instrument(ISIN_1,IBM,EQ)
scala> t1
res1: org.dg.event.EventSourcing.Trade = Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)
scala> val e = new EventProcessor
e: org.dg.event.EventSourcing.EventProcessor = org.dg.event.EventSourcing$EventProcessor@6bb93c
scala> e.start
res2: scala.actors.Actor = org.dg.event.EventSourcing$EventProcessor@6bb93c
What we have done so far is some boilerplate stuff in defining an
Instrument
, a
Trade
and start up our
EventProcessor
. As the above session indicates, all are cool and healthy, as they should be .. ok .. next step .. let us fire some events and check the event queue ..
scala> e ! EnrichTrade(t1)
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
scala> e ! ValueTrade(t1)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)
scala> e ! Show
EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0))
ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0))
Two events have been fired, a trade has been enriched and valuated and the result trades have been printed in the REPL. For simplicity, these independent events operating on the same trade are not chained and the impact of one does not affect the source trade of another. In real life, this may as well be necessary .. So the 2 events above process the trades correctly and as the
Show
message indicates, there are 2 events in the queue that are sequenced properly.
Can we Replay the events ?
Sure, we can. That is one of the major benefits of the entire architecture. In fact, the event queue is a live shot of the sequence of activities that has happened on the trades flowing through the pipeline. It is right up there to be manipulated by multiple consumers in various ways for application visualization and monitoring.
In case of exceptions, the trader desk may need to rerun the sequence, often selectively.
Replay
message does that precisely. The current implementation shows an unconditional replay, while in reality, we can have variations like ..
Replay(date: Date)
Replay(trade: Trade)
Replay(ins: Instrument)
Let us see what
Replay
gives us ..
scala> e ! Replay
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)
As expected,
Replay
replays the event list and simulates the processing as of some filter condition. This actually opens up a world of options in that the computations can be replayed, situations can be simulated and exceptions can be monitored, visualized and debugged more efficiently, which is one of the vital mandates of an STP trading system.
And now for fun extrema ..
Excerpted from the article of Martin Fowler, let us look at this seemingly innocuous line ..
So this discussion has made the assumption that the application processing the events stays the same. Clearly that's not going to be the case. Events handle changes to data, what about changes to code?As Martin goes on to mention in the same article, code changes may occur due to addition of new features, defect fixes and temporal logic. There may be scenarios where we may need to process
already occurred events with the newly introduced fix. Event sourcing provides a great way out for such practices - in our trading application, exception monitors often need to peek at the event queue, pick up the
event at fault and re-process the same using the new version of the service introduced as a fix. Martin Fowler defines a Retroactive Event as one that can be used to
(a)utomatically correct the consequences of a incorrect event that's already been processed.
Scala actors support
hotswapping of code, can we take advantage of this feature and process retroactive events using Event Sourcing ? The real fun begins here, we need to add the capability of executing event processing with retroactive effect, i.e. each event in the event queue will be processed by the processor which existed at the time of its original processing. We can implement hotswapping in Scala through states being passed to actors in a recursive loop. And since we need to store the service that existed at that point in time, we need to pass that as state as well .. Like the following ..
class EventProcessor extends Actor {
def act = loop(tradingService_1, Nil)
def loop(ts: Actor, events: List[(Actor, TradeProcessingEvent)]) {
react {
case Replay =>
events.reverse.foreach(x => x._1 ! x._2)
loop(ts, events)
case event: TradeProcessingEvent =>
ts ! event
loop(ts, (ts, event) :: events)
case Show =>
events.reverse.foreach(println)
loop(ts, events)
case HotSwap(s) =>
loop(s, events)
}
}
}
case class HotSwap(s: Actor) extends Event
Here we are passing the service as an additional state and storing it in the event queue as well. The storage can be made more efficient in the production code, but I guess, the idea is clear in the above implementation.
So let us implement another version of the service, do a hotswapping and watch the events being processed retroactively ..
val tradingService_2 = actor {
loop {
react {
case e@EnrichTrade(t@trade) =>
println("processing event " + e + " result trade: " +
Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, 200, t.net))
case v@ValueTrade(t@trade) =>
println("processing event " + v + " result trade: " +
Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, t.taxFee, 2000))
}
}
}
The following session demonstrates how events can be processed retroactively when the code changes. The event queue is still consistent and has everything to replay as of the past. This is an extremely powerful idiom of Event Sourcing that can implement features for real time application analysis. Any problem, bug detected on your code base can be swapped out in favor of the earlier version and the event queue gives you the power of replaying every event with the version of code that you want.
scala> val e = new EventProcessor
e: org.dg.event.EventSourcing.EventProcessor = org.dg.event.EventSourcing$EventProcessor@88a970
scala> e.start
res2: scala.actors.Actor = org.dg.event.EventSourcing$EventProcessor@88a970
scala> e ! EnrichTrade(t1)
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
scala> e ! ValueTrade(t1)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)
scala> e ! Show
(scala.actors.Actor$$anon$1@825cf3,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@825cf3,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
scala> e ! HotSwap(tradingService_2)
scala> e ! EnrichTrade(t1)
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,200,0)
scala> e ! ValueTrade(t1)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,2000)
scala> e ! Show
(scala.actors.Actor$$anon$1@825cf3,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@825cf3,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@1f6f3dc,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@1f6f3dc,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
scala> e ! Replay
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,200,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,2000)
In the above, the first two replays are with the earlier version of the service, after which we hotswapped
tradingService_1
with
tradingService_2
. The last 2 replays reflect this change and runs with the newer version of the service.