Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Tuesday, September 27, 2011

Non blocking composition using Redis and Futures

scala-redis now supports pooling of Redis clients. Using RedisClientPool you can do some cool stuff in non blocking mode and get an improved throughput for your application.

Suppose you have a bunch of operations that you can theoretically execute in parallel, maybe a few disjoint list operations and a few operations on key/values .. like the following snippets ..

val clients = new RedisClientPool("localhost", 6379)

// left push to a list
def lp(msgs: List[String]) = {
  clients.withClient {client => {
    msgs.foreach(client.lpush("list-l", _))
    client.llen("list-l")
  }}
}

// right push to another list
def rp(msgs: List[String]) = {
  clients.withClient {client => {
    msgs.foreach(client.rpush("list-r", _))
    client.llen("list-r")
  }}
}

// key/set operations
def set(msgs: List[String]) = {
  clients.withClient {client => {
    var i = 0
    msgs.foreach { v =>
      client.set("key-%d".format(i), v)
      i += 1
    }
    Some(1000) // some dummy
  }}
}

Redis, being single threaded, you can use client pooling to allocate multiple clients and fork these operations concurrently .. Here's a snippet that does these operations asynchronously using Scala futures ..

// generate some arbitrary values
val l = (0 until 5000).map(_.toString).toList

// prepare the list of functions to invoke
val fns = List[List[String] => Option[Int]](lp, rp, set)

// schedule the futures
val tasks = fns map (fn => scala.actors.Futures.future { fn(l) })

// wait for results
val results = tasks map (future => future.apply())

And while we are on this topic of using futures for non blocking redis operations, Twitter has a cool library finagle that offers lots of cool composition stuff on Futures and other non blocking RPC mechanisms. Over the weekend I used some of them to implement scatter/gather algorithms over Redis. I am not going into the details of what I did, but here's a sample dummy example of stuffs you can do with RedisConnectionPool and Future implementation of Finagle ..

The essential idea is to be able to compose futures and write non blocking code all the way down. This is made possible through monadic non-blocking map and flatMap operations and a host of other utility functions that use them. Here's an example ..

def collect[A](fs: Seq[Future[A]]): Future[Seq[A]] = { //..

It uses flatMap and map to collect the results from the given list of futures into a new future of Seq[A].

Let's have a look at a specific example where we push a number of elements into 100 lists concurrently using a pool of futures, backed by ExecutorService. This is the scatter phase of the algorithm. The function listPush actually does the push using a RedisConnectionPool and each of these operations is done within a Future. FuturePool gives you a Future where you can specify timeouts and exception handlers using Scala closures.

Note how we use the combinator collect for concurrent composition of the futures. The resulting future that collect returns will be complete when all the underlying futures have completed.

After the scatter phase we prepare for the gather phase by pipelining the future computation using flatMap. Unlike collect, flatMap is a combinator for sequential composition. In the following snippet, once allPushes completes, the result pipelines into the following closure that generates another Future. The whole operation completes only when we have both the futures completed. Or we have an exception in either of them.

For more details on how to use these combinators on Future abstractions, have a look at the tutorial that the Twitter guys published recently.

implicit val timer = new JavaTimer

// set up Executors
val futures = FuturePool(Executors.newFixedThreadPool(8))

// abstracting the flow with future
private[this] def flow[A](noOfRecipients: Int, opsPerClient: Int, fn: (Int, String) => A) = {
  val fs = (1 to noOfRecipients) map {i => 
    futures {
      fn(opsPerClient, "list_" + i)
    }.within(40.seconds) handle {
      case _: TimeoutException => null.asInstanceOf[A]
    }
  }
  Future.collect(fs)
}

// scatter across clients and gather them to do a sum
def scatterGatherWithList(opsPerClient: Int)(implicit clients: RedisClientPool) = {
  // scatter
  val allPushes: Future[Seq[String]] = flow(100, opsPerClient, listPush)
  val allSum = allPushes flatMap {result =>
    // gather
    val allPops: Future[Seq[Long]] = flow(100, opsPerClient, listPop)
    allPops map {members => members.sum}
  }
  allSum.apply
}

For the complete example implementations of these patterns like scatter/gather using Redis, have a look at the github repo for scala-redis.

Monday, April 13, 2009

Objects as Actors ?

Tony Arcieri, creator of Reia, recently brought up an interesting topic on unifying actors and objects. Talking about Scala and his disliking towards Scala's implementation of actors as an additional entity on top of objects, he says, it would have been a more useful abstraction to model all objects as actors. Doing it that way would eschew many of the overlapping functions that both of the object and actor semantics have implemented today. In Reia, which is supposed to run on top of BEAM (the Erlang VM), he has decided to make all objects as actors.

The way I look at it, this is mostly a decision of the philosophy of the language design. Scala is targetted to be a general purpose programming language, where concurrency and distribution are not the central concerns to address as part of the core language design. The entire actor model has hence been implemented as a library that integrates seamlessly with the rest of Scala's core object/functional engineering. This is a design decision which the language designers did take upfront - hence objects in Scala, by default, bind to local invocation semantics, that enable it to take advantage of all the optimizations and efficiencies of being collocated in the same process.

The actor model was designed primarily to address the concerns of distributed programming. As Jonas Boner recently said on Twitter - "The main benefit of the Actor model is not simpler concurrency but fault-tolerance and reliability". And for fault tolerance you need to have at least two machines running your programs. We all know the awesome capabilities of fault tolerance that the Erlang actor model offers through supervisors, linked actors and transparent restarts. Hence languages like Erlang, which address the concerns of concurrency and distribution as part of the core, have decided to implement actors as their basic building block of abstractions. This was done with the vision that the Erlang programming style will be based on simple primitives of process spawning and message passing, both of which implemented as low overhead primitives in the virtual machine. The philosophy of Scala is, however, a bit different. Though still it is not that difficult to implement the Active Object pattern on top of the Scala actors platform.

Erlang allows you to write programs that will run without any change in a regular non-distributed Erlang session, on two different Erlang nodes running on the same computer and as well on Erlang nodes running on two physically separated computers either in the same LAN or over the internet. It can do this, because the language designers decided to map the concurrency model naturally to distributed deployments extending the actor model beyond VM boundaries.

Another language Clojure, which also has strong concurrency support decided to go the Scala way addressing distribution concerns. Distribution is not something that Rich Hickey decided to hardwire into the core of the language. Here is what he says about it ..

"In Erlang the concurrency model is (always) a distributed one and in Clojure it is not. I have some reservations about unifying the distributed and non-distributed models [..], and have decided not to do so in Clojure, but I think Erlang, in doing so, does the right thing in forcing programmers to work as if the processes are distributed even when they are not, in order to allow the possibility of transparent distribution later, e.g. in the failure modes, the messaging system etc. However, issues related to latency, bandwidth, timeouts, chattiness, and costs of certain data structures etc remain."

And finally, on the JVM, there are a host of options that enable distribution of your programs, which is yet another reason not to go for language specific solutions. If you are implementing your language on top of the Erlang VM, it's all but natural to leverage the awesome power of cross virtual machine distribution capabilities that it offers. While for JVM, distribution can better be left to specialized frameworks.

Sunday, March 15, 2009

Real world event based solutions using MINA and Scala Actors

In the QCon London track, Architectures in Financial Applications, Oracle presented an interesting solution that seeks to transform the reconciliation process from an inherently batch process to an incremental event driven asynchronous one. They presented a solution based on Coherence, their data grid that addresses the capacity challenge by spreading trades across servers and the availability challenge through the resilience of continuous replication that Coherence supports. As part of the presentation, the Coherence based solution touched upon one important paradigm, which can be adopted even outside grid based architectures to improve system performance and throughput.

Asynchronous processing ..


As the Oracle presentation rightly pointed out, one of the recurring problems in today's investment management solutions is the time pressure that the back-office faces dealing with regular overnight batch programs (like reconciliations, confirmations etc.) and struggling to complete them before the next day trading starts. Reconciliation is a typical example which needs to be executed at various levels between entities like traders, brokers, banks and custodians and with varying periodicities. The main bottleneck is these processes are executed as monolithic batch programs that operate on monstrous end-of-day databases containing millions of records that need to be processed, joined, validated and finally aggregated for reconciliation.

Enter Event based Reconciliation ..


This solution is not about grids, it is about poor man's asynchronous event based processing that transforms the end-of-day batch job into incremental MapReduce style progressions that move forward with every event, do real time processing and raise alerts and events for investment managers to respond to and take more informed decisions. One of the main goals of reconciliation being risk mitigation, I suppose such real time progressions will result in better risk handling and management. Also you save on time pressures that mandatory completion of today's batch processes imply, along with a more predictable and uniform load distribution across your array of servers.

And this proposed solution uses commodity tools and frameworks, good old proven techniques of asynchronous event processing, and never claims to be as scalable as the grid based one proposed by Coherence.

Think asynchronous, think events, think abstraction of transport, think selectors, think message queues, think managed thread-pools and thousands of open sockets and connections. After all you need to have a managed infrastructure that will be able to process your jobs incrementally based on events.

Asynchronous IO abstractions, MINA and Scala actors ..


Instead of one batch process for every reconciliation, we can think of handling reconciliation events. For example, loading STREET trades is an event that can trigger reconciliation with HOUSE trades. Or receipt of Trade Confirmations is an event for reconciliation with our placed Orders. We can set this up nicely using generic socket based end-points that listen for events .. and using event dispatch callbacks that do all the necessary processing. MINA provides nice abstractions for registering sockets, managing IoSessions, handling IO events that happen on your registered sockets and provides all the necessary glue to handle protocol encoding and decoding.

Here is a brief thought sequence .. (click to enlarge)



We can use MINA's asynchronous I/O service that abstracts the underlying transport's connection.

// set up the thread pool
executor = Executors.newCachedThreadPool()

// set up the socket acceptor with the thread pool
acceptor = new NioSocketAcceptor(executor, new NioProcessor(executor))

//.. set up other MINA stuff


Using MINA we can decouple protocol encoding / decoding from the I/O, and have separate abstractions for codec construction. Note how the responsibilities are nicely separated between the I/O session handling, protocol filters and event handling.

// add protocol encoder / decoder
acceptor.getFilterChain.addLast("codec", 
  new ProtocolCodecFilter(//.., //..))


Now we need to register an IoHandler, which will handle the events and call the various callbacks like messageReceived(), sessionClosed() etc. Here we would like to be more abstract so that we do not have to handle all complexities of thread and lock management ourselves. We can delegate the event handling to Scala actors, which again can optimize on thread usage and help make the model scale.

// set the IoHandler that delegates event handling to the underlying actor
acceptor.setHandler(
  new IoHandlerActorAdapter(session => new ReconHandler(session, ...)))

// bind and listen
acceptor.bind(new InetSocketAddress(address, port))


So, we have a socket endpoint where clients can push messages which result in MINA events that get routed through the IoHandler implementation, IoHandlerActorAdapter and translated to messages which our Scala actor ReconHandler can react to.

The class IoHandlerActorAdapter is adopted from the naggati DSL for protocol decoding from Twitter ..


class IoHandlerActorAdapter(val actorFactory: (IoSession) => Actor) 
  extends IoHandler {
  //..

  //.. callback
  def messageReceived(session: IoSession, message: AnyRef) = 
    send(session, MinaMessage.MessageReceived(message))

  // send a message to the actor associated with this session
  def send(session: IoSession, message: MinaMessage) = {
    val info = IoHandlerActorAdapter.sessionInfo(session)
    for (actor <- info.actor; if info.filter contains MinaMessage.classOfObj(message)) {
      actor ! message
    }
  }
  //..
}




and the class ReconHandler is the actor that handles reconciliation messages ..

class ReconHandler(val session: IoSession, ...) 
  extends Actor {

  //..
  //..

  def act = {
    loop {
      react {
        case MinaMessage.MessageReceived(msg) =>
          // note we have registered a ProtocolCodecFilter
          // hence we get msg as an instance of specific
          // reconciliation message
          doRecon(msg.asInstanceOf[ReconRequest])

        case MinaMessage.ExceptionCaught(cause) => 
          //.. handle

        case MinaMessage.SessionClosed =>
          //.. handle

        case MinaMessage.SessionIdle(status) =>
          //.. handle
      }
    }
  }

  private def doRecon(request: ReconRequest) = {
    request.header match {
      case "TRADE_CONF_ORDER" => 
        //.. handle reconciliation of confirmation with order

      case "TRADE_STREET_HOME" =>
        //.. handle reconciliation of stree side with home trades

      //..
  }
  //..
}


Note that the above strategy relies on incremental progress. It may not be the case that the entire process gets done upfront. We may have to wait till the closing of the trading hours until we receive the last batch of trades or position information from upstream components. But the difference with the big bang batch process is that, by that time we have progressed quite a bit and possibly have raised some alerts as well, which would not have been possible in the earlier strategy of execution. Another way to view it is as an implementation of MapReduce that gets processed incrementally throughout the day on a real time basis and comes up with the eventual result much earlier than a scheduled batch process.

Saturday, February 14, 2009

Retroactive Event Processing with Scala Actors

Martin Fowler talked about Event Sourcing. Greg Young mentioned about using Event Sourcing in algorithmic trading applications. Very recently Jonas Boner hacked up Martin's example using asynchronous Scala actors. That's heck of a precedence to conclude that there is enough meat in this lunch.

Let me summarize my thoughts on Event Sourcing using Scala actors, and how it can be used to perform retroactive monitoring and diagnostics in an environment that supports code hot-swapping. The thoughts are from the perspective of a real world application with real world requirements. And yes, this is also a trading application that processes security trades that pass through various state transitions in its lifecycle before being finally persisted in the database. In a trading application, tracking exceptions is one of the main components, and exception monitors play an important role in the way the trading desk would like to explore and visualize the processing pipeline. The whole idea is to prevent manual intervention. Hence trades falling off the straight-thru-processing pipeline because of exceptions need to be put back on rails, sometimes even digging into the past, patching up services real-time, and exploring the what-if s and what-would-have-happened scenarios. In short, there is enough reason to do retroactive mining of the application through replays and hotswaps.

Without further ado, let us hack up something similar that accumulates events as they occur in the lifecycle of a trade .. but first, some trivial abstractions (of course elided for demonstration purposes) to set up the stage of the domain model ..

sealed trait InstrumentType
case object EQ extends InstrumentType
case object FI extends InstrumentType

// instrument to be traded  
case class Instrument(isin: String, name: String, insType: InstrumentType)

// trade domain model: immutable
case class Trade(id: Int, ref: String, ins: Instrument, 
  qty: Int, unitPrice: Int, taxFee: Int, net: Int)


The domain objects have been simplified beyond imagination, but still enough to serve the purpose of this post. The only point of note is that all of them have been modeled as immutable abstractions.

As I mentioned above, a trade goes through many state transitions in its lifecycle. We model each transition as being triggerred through an event, which we would like to capture in an event queue.

// base class
sealed abstract case class Event {
  val recorded = new Date
}

// any event that occurs in the lifecycle of a trade  
sealed abstract case class TradeProcessingEvent(val trade: Trade) 
  extends Event

// enrich the trade with tax/fee and other values  
case class EnrichTrade(override val trade: Trade) 
  extends TradeProcessingEvent(trade)

// compute the net value of the trade using market practices and rules
case class ValueTrade(override val trade: Trade) 
  extends TradeProcessingEvent(trade)

// show all events    
case object Show


Once again, as per the best practices, the events are modeled as immutable objects. Also, we keep the events independent of the processing logic, and keep all processing in the layer that seems most natural for them - the domain service layer. And here, we model the service as an actor, that receives all events for the trade and does appropriate processing ..

// a dummy service mainly for demonstration using mock values
val tradingService_1 = actor {
  loop {
    react {
      case e@EnrichTrade(t@trade) => 
        // fills up the tax/fee part
        println("processing event " + e + " out trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, 100, t.net))

      case v@ValueTrade(t@trade) =>
        // fills up the net value of the trade
        println("processing event " + v + " out trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, t.taxFee, 1000))
    }
  }
}


And finally the EventProcessor that sources event processing for all trades and acts as the facade to the layer above. EventProcessor is also modeled as an actor, for obvious reasons of scalability, eventual consistency and all the benefits that asynchronous models offer.

class EventProcessor extends Actor {
  def act = loop(tradingService_1, Nil)
    
  def loop(ts: Actor, events: List[TradeProcessingEvent]) {
    react {
      case Replay =>
        events.reverse.foreach(ts ! _)
        loop(ts, events)
          
      case event: TradeProcessingEvent =>
        ts ! event
        loop(ts, event :: events)
          
      case Show =>
        events.reverse.foreach(println)
        loop(ts, events)
    }
  }
}

// event for replaying from the queue  
case object Replay extends Event


Note that the loop in the actor above takes the list of events as the state and maintains it in each recursive call. The list gets appended to when we receive new events - this is an effective idiom in functional programming of state management, which does not induce any mutual side-effect. The service is also part of the state, and is used to process every event that comes its way. This will have a very important role to play, as we move along ..

REPL ing it out


Ok, now with enough meat on our plates, let us hack the hell out. Let us fire up the scala REPL and see some of the above stuff in action ..


scala> import org.dg.event.EventSourcing._
import org.dg.event.EventSourcing._

scala> i1
res0: org.dg.event.EventSourcing.Instrument = Instrument(ISIN_1,IBM,EQ)

scala> t1
res1: org.dg.event.EventSourcing.Trade = Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)

scala> val e = new EventProcessor
e: org.dg.event.EventSourcing.EventProcessor = org.dg.event.EventSourcing$EventProcessor@6bb93c

scala> e.start
res2: scala.actors.Actor = org.dg.event.EventSourcing$EventProcessor@6bb93c



What we have done so far is some boilerplate stuff in defining an Instrument, a Trade and start up our EventProcessor. As the above session indicates, all are cool and healthy, as they should be .. ok .. next step .. let us fire some events and check the event queue ..


scala> e ! EnrichTrade(t1)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)


scala> e ! ValueTrade(t1)

processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)


scala> e ! Show

EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0))
ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0))



Two events have been fired, a trade has been enriched and valuated and the result trades have been printed in the REPL. For simplicity, these independent events operating on the same trade are not chained and the impact of one does not affect the source trade of another. In real life, this may as well be necessary .. So the 2 events above process the trades correctly and as the Show message indicates, there are 2 events in the queue that are sequenced properly.

Can we Replay the events ?


Sure, we can. That is one of the major benefits of the entire architecture. In fact, the event queue is a live shot of the sequence of activities that has happened on the trades flowing through the pipeline. It is right up there to be manipulated by multiple consumers in various ways for application visualization and monitoring.

In case of exceptions, the trader desk may need to rerun the sequence, often selectively. Replay message does that precisely. The current implementation shows an unconditional replay, while in reality, we can have variations like ..


Replay(date: Date)        // replay all events filtered by the input date
Replay(trade: Trade)      // replay all events for the particular trade
Replay(ins: Instrument)   // replay all events for all trades on the input Instrument



Let us see what Replay gives us ..


scala> e ! Replay

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)



As expected, Replay replays the event list and simulates the processing as of some filter condition. This actually opens up a world of options in that the computations can be replayed, situations can be simulated and exceptions can be monitored, visualized and debugged more efficiently, which is one of the vital mandates of an STP trading system.

And now for fun extrema ..



Excerpted from the article of Martin Fowler, let us look at this seemingly innocuous line ..

So this discussion has made the assumption that the application processing the events stays the same. Clearly that's not going to be the case. Events handle changes to data, what about changes to code?

As Martin goes on to mention in the same article, code changes may occur due to addition of new features, defect fixes and temporal logic. There may be scenarios where we may need to process already occurred events with the newly introduced fix. Event sourcing provides a great way out for such practices - in our trading application, exception monitors often need to peek at the event queue, pick up the event at fault and re-process the same using the new version of the service introduced as a fix. Martin Fowler defines a Retroactive Event as one that can be used to (a)utomatically correct the consequences of a incorrect event that's already been processed.

Scala actors support hotswapping of code, can we take advantage of this feature and process retroactive events using Event Sourcing ? The real fun begins here, we need to add the capability of executing event processing with retroactive effect, i.e. each event in the event queue will be processed by the processor which existed at the time of its original processing. We can implement hotswapping in Scala through states being passed to actors in a recursive loop. And since we need to store the service that existed at that point in time, we need to pass that as state as well .. Like the following ..

class EventProcessor extends Actor {
  def act = loop(tradingService_1, Nil)
    
  def loop(ts: Actor, events: List[(Actor, TradeProcessingEvent)]) {
    react {
      case Replay =>
        events.reverse.foreach(=> x._1 ! x._2)
        loop(ts, events)
          
      case event: TradeProcessingEvent =>
        ts ! event
        loop(ts, (ts, event) :: events)
          
      case Show =>
        events.reverse.foreach(println)
        loop(ts, events)

      case HotSwap(s) =>
        loop(s, events)
    }
  }
}

// event for hotswapping
case class HotSwap(s: Actor) extends Event


Here we are passing the service as an additional state and storing it in the event queue as well. The storage can be made more efficient in the production code, but I guess, the idea is clear in the above implementation.

So let us implement another version of the service, do a hotswapping and watch the events being processed retroactively ..

// another version of the trading service
val tradingService_2 = actor {
  loop {
    react {
      case e@EnrichTrade(t@trade) =>
        // new mocked values, 200 for tax/fee
        println("processing event " + e + " result trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, 200, t.net))

      case v@ValueTrade(t@trade) =>
        // new mocked values, 2000 for net value
        println("processing event " + v + " result trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, t.taxFee, 2000))
    }
  }
}


The following session demonstrates how events can be processed retroactively when the code changes. The event queue is still consistent and has everything to replay as of the past. This is an extremely powerful idiom of Event Sourcing that can implement features for real time application analysis. Any problem, bug detected on your code base can be swapped out in favor of the earlier version and the event queue gives you the power of replaying every event with the version of code that you want.


scala> val e = new EventProcessor
e: org.dg.event.EventSourcing.EventProcessor = org.dg.event.EventSourcing$EventProcessor@88a970

scala> e.start
res2: scala.actors.Actor = org.dg.event.EventSourcing$EventProcessor@88a970

scala> e ! EnrichTrade(t1)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)


scala> e ! ValueTrade(t1)

processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)


scala> e ! Show

(scala.actors.Actor$$anon$1@825cf3,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@825cf3,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))


scala> e ! HotSwap(tradingService_2)

scala> e ! EnrichTrade(t1)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,200,0)

scala> e ! ValueTrade(t1)

processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,2000)

scala> e ! Show
(scala.actors.Actor$$anon$1@825cf3,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@825cf3,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@1f6f3dc,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@1f6f3dc,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))


scala> e ! Replay
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,200,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,2000)




In the above, the first two replays are with the earlier version of the service, after which we hotswapped tradingService_1 with tradingService_2. The last 2 replays reflect this change and runs with the newer version of the service.