Please refer to the FAQ, README.md, the Ideas and the Issues for background info on what's outstanding (aside from there being lots of room for more and better docs).
While there is no canonical book on doing event-sourced Domain Modelling, there are some must read books that stand alone as timeless books in general but are also specifically relevant to the considerations involved in building such systems:
Domain Driven Design, Eric Evans, 2003 aka 'The Blue Book'; not a leisurely read but timeless and continues to be authoritative
Domain Modelling Made Functional, Scott Wlaschin, 2018; extremely well edited traversal of Domain Modelling in F# which is a pleasure to read. Does not talk specifically about event sourcing, but is still very relevant.
Implementing Domain Driven Design, Vaughn Vernon, 2013; aka 'The Red Book'. Worked examples and a slightly different take on DDD (the appendix on Functional Event sourcing is not far from what we have around these parts; which is not surprising given there's input from Jérémie Chassaing)
Functional Event Sourcing Decider, Jérémie Chassaing, 2021. Precursor to an excellent book covering this space more broadly. There's teasers with extensive code walk-through and discussion in this 2h45m video on Event Driven Information Systems
- Your link here - Please add notable materials that helped you on your journey here via PRs!
The following diagrams are based on the style defined in
@simonbrowndotje's C4 model,
rendered using @skleanthous's
PlantUmlSkin.
It's highly recommended to view
the talk linked from c4model.com
.
See README.md acknowledgments section
Equinox and Propulsion together provide a loosely related set of libraries that you can leverage in an application as you see fit. These diagrams are intended to give a rough orientation; what you actually build is up to you...
Equinox focuses on the Consistent Processing elements of building an event-sourced system, offering tailored components that interact with a specific Consistent Event Store, as laid out here in this C4 System Context Diagram:
☝️ Propulsion elements (which we consider External to Equinox) support the building of complementary facilities as part of an overall Application:
- Ingesters: read stuff from outside the Bounded Context of the System. This kind of service covers aspects such as feeding reference data into Read Models, ingesting changes into a consistent model via Consistent Processing. These services are not acting in reaction to events emanating from the Consistent Event Store, as opposed to...
- Publishers: react to events as they are arrive from the Consistent Event Store by filtering, rendering and producing to feeds for downstreams. While these services may in some cases rely on synchronous queries via Consistent Processing, it's never transacting or driving follow-on work; which brings us to...
- Reactors: drive reactive actions triggered by either upstream feeds, or events observed in the Consistent Event Store. These services handle anything beyond the duties of Ingesters or Publishers, and will often drive follow-on processing via Process Managers and/or transacting via Consistent Processing. In some cases, a reactor app's function may be to progressively compose a notification for a Publisher to eventually publish.
The Systems and Components involved break out roughly like this:
Equinox encourages sticking with
Test Pyramid principles:
focus on unit testing things by default (based on calling interpret
/decide
,
initial
and fold
from the Aggregate module)
However, the Equinox MemoryStore
package can also be relevant as part of your
overall testing strategy. The aims are to:
- provide a mechanism where one can provide an empty and/or specifically prepared set of streams initialized in ways that make sense for your test suite
- allow one to test with fully configured
Service
types if necessary - enable one to test flows or scenarios (e.g. Process Managers) crossing
multiple
Service
types - allow one to validate the above logic works well independent of the effects of any of the stores
- allow one to reduce reliance on mechanisms such as the CosmosDB simulator
NOTE: MemoryStore
is a complement to testing with a real store - it's
absolutely not a substitute for testing how your app really performs with your
load against your actual store
A primary supported pattern is to be able to be able to define a test suite and then run the suite with the right store for the context - e.g.:
- for unit tests, you might opt to run some important scenarios with a
MemoryStore
- for integration tests, you might run lots of iterations of a Property Based Test against a memory store, and a reduced number of iterations of the same test against your concrete store
- for acceptance Tests, you'll likely primarily focus on using your concrete store
This diagram shows the high level building blocks used in constructing an
integration test using Equinox.MemoryStore
This breaks down the components involved internally with the layout above in terms of the actual structures involved:
From the point of view of Equinox, SqlStreamStore and EventStore have a lot in common in terms of how Equinox interacts with them. For this reason, it's safe to treat them as equivalent for the purposes of this overview.
This diagram walks through the basic sequence of operations, where:
- this node has not yet read this stream (i.e. there's nothing in the Cache)
- when we do read it, it's empty (no events):
Next, we extend the scenario to show:
- how the State held in the Cache influences the EventStore/SqlStreamStore APIs used
- how writes are managed:
- when there's no conflict
- when there's conflict and we're retrying (handle
WrongExpectedVersionException
, read the conflicting, loop using those) - when there's conflict and we're giving up (throw
MaxAttemptsExceededException
; no need to read the conflicting events)
After the write, we circle back to illustrate the effect of the caching when we have correct state
In other processes (when a cache is not fully in sync), the sequence runs slightly differently:
This diagram walks through the basic sequence of operations, where:
- this node has not yet read this stream (i.e. there's nothing in the Cache)
- when we do read it, the Read call returns
404
(with a charge of1 RU
)
Next, we extend the scenario to show:
- how state held in the Cache influences the Cosmos APIs used
- How reads work when a snapshot is held within the Tip
- How reads work when the state is built form the events via a Query
- how writes are managed:
- when there's no conflict (
Sync
stored procedure returns no conflicting events) - when there's conflict and we're retrying (re-run the decision the
conflicting events the call to
Sync
yielded) - when there's conflict and we're giving up (throw
MaxAttemptsExceededException
)
- when there's no conflict (
After the write, we circle back to illustrate the effect of the caching when we
have correct state (we get a 304 Not Modified
and pay only 1 RU
)
In other processes (when a cache is not fully in sync), the sequence runs slightly differently:
- we read the Tip document, and can work from that snapshot
- the same Loading Fallback sequence shown in the initial read will take place if no
suitable snapshot that passes the
isOrigin
predicate is found within the Tip
Event Sourcing is easier and harder than you think. This document is not a tutorial, and you can and will make a mess on your first forays. This glossary attempts to map terminology from established documentation outside to terms used in this documentation.
Term | Description |
---|---|
Aggregate | Boundary within which a set of Invariants are to be preserved across a set of related Entities and Value Objects |
Append | Add Events reflecting a Decision to a Stream, contingent on an Optimistic Concurrency Check |
Bounded Context | Doman Driven Design term for a cohesive set of application functionality. Events should not pass directly between BCs (see Ingestion, Publishing) |
Command | Arguments supplied to one of a Stream's Decision functions; may result in Events being Appended |
CQRS | Command/Query Responsibility Segregation: Architectural principle critical to understand (but not necessarily slavishly follow) when building an Event Sourced System |
Decision | Application logic function representing the mapping of an Aggregate State together with arguments reflecting a Command. Yields a response and/or Events to Append to the Stream in order to manifest the intent implied; the rules it considers in doing so are in effect the Invariants of the Aggregate |
Event | Details representing the facts of something that has occurred, or a Decision that was made with regard to an Aggregate state as represented in a Stream |
Eventually Consistent | A Read Model can momentarily lag a Stream's current State as events are being Reacted to |
Fold | FP Term used to describe process of building State from the sequence of Events observed on a Stream |
Idempotent | Multiple executions have the same net effect; can safely be processed >1 time without adverse effects |
Ingestion | The act of importing and reconciling data from external systems into the Models and/or Read Models of a given Bounded Context |
Invariants | Rules that an Aggregate's Fold and Decision process work to uphold |
Optimistic Concurrency Check | (non-) Locking/transaction mechanism used to ensure that Appends to a Stream maintain the Aggregate's Invariants in the presence of multiple concurrent writers |
Projection | Umbrella term for the production, emission or synchronization of models or outputs from the Events being Appended to Streams. Lots of ways of skinning this cat, see Reactions, Read Model, Query, Synchronous Query, Publishing |
Publishing | Selectively rendering Rich Events for a downstream consumer to Ingest into an external Read Model (as opposed to Replication) |
Query | Eventually Consistent read from a Read Model managed via Projections. See also Synchronous Query |
Reactions | Work carried out as a Projection that drives ripple effects, including maintaining Read Models to support Queries or carrying out a chain of activities that conclude in the Publishing of Rich Events |
Read Models | Denormalized data maintained inside a Bounded Context as Reactions, honoring CQRS. As opposed to: Replication, Synchronous Query, Publishing |
Replication | Rendering Events as an unfiltered feed in order to facilitate generic comnsumption/syncing. Can be a useful tool to scale or decouple Publishing / Reactions from a Store's feed; BUT: can just as easily be abused to be functionally equivalent to Database Integration -- maintaining a Read Model as a Reaction and/or deliberately Publishing Rich Events is preferred |
Rich Events | Messages deliberately emitted from a Bounded Context (as opposed to Replication) via Publishing |
State | Information inferred from traching the sequence of Events on a Stream in support of Decision (and/or Synchronous Queries) |
Store | Holds Events for a Bounded Context as ordered Streams |
Stream | Ordered sequence of Events in a Store |
Synchronous Query | Consistent read direct from Stream State (breaking CQRS and coupling implementation to the State used to support the Decision process). See CQRS, Query, Reactions |
Term | Description |
---|---|
Change Feed | set of query patterns enabling the running of continuous queries reading Items (documents) in a Range (physical partition) in order of their last update |
Change Feed Processor | Library from Microsoft exposing facilities to Project from a Change Feed, maintaining Offsets per Range of the Monitored Container in a Lease Container |
Container | logical space in a CosmosDB holding [loosely] related Items (aka Documents). Items bear logical Partition Keys. Formerly Collection. Can be allocated Request Units. |
CosmosDB | Microsoft Azure's managed document database system |
Database | Group of Containers. Can be allocated Request Units. |
DocumentDb | Original offering of CosmosDB, now entitled the SQL Query Model, Microsoft.Azure.DocumentDb.Client[.Core] |
Document id |
Identifier used to load a document (Item) directly as a point read without a Query |
Lease Container | Container (separate from the Monitored Container to avoid feedback effects) that maintains a set of Offsets per Range, together with leases reflecting instances of the Change Feed Processors and their Range assignments (aka aux container) |
Partition Key | Logical key identifying a Stream (a Range is a set of logical partitions identified by such keys). A Logical Partition is limited to a max of 10GB (as is a Range) |
Projector | Process running a [set of] Change Feed Processors across the Ranges of a Monitored Container |
Query | Using indices to walk a set of relevant items in a Container, yielding Items (documents). Normally confined to a single Partition Key (unless one enters into the parallel universe of cross-partition queries) |
Range | Physical Partition managing a subset of the Partition Key space of a Container (based on hashing) consisting of colocated data running as an individual CosmosDB node. Can be split as part of scaling up the RUs allocated to a Container. Typically limited to a maximum capacity of 10 GB. |
Replay | The ability to re-run the processing of the Change Feed from the oldest Item (document) forward at will |
Request Units | Pre-provisioned Virtual units representing used to govern the per-second capacity of a Range's query processor (while they are assigned at Container or Database level, the load shedding / rate limiting takes effect at the Range level) |
Request Charge | Number of Request Units charged for a specific action, apportioned against the RU cacity of the relevant Range for that second |
Stored Procedure | JavaScript code stored in a Container that (repeatedly) maps an input request to a set of actions to be transacted as a group within CosmosDB. Incurs equivalent Request Charges for work performed; can chain to a continuation internally after a read or write. Limited to 5 seconds of processing time per action. |
Term | Description |
---|---|
Table | Defined storage area in DynamoDB, defining a schema and (optionally), a Streams configuration. (There's no notion of a Database) |
Streams | Buffer used to record information about all changes with a 24h retention window |
Transactions | Feature allowing up to 100 atomic updates across multiple tables and logical partitions. Doubles the RU cost of a write |
DynamoStore Index | Custom implementation (in Propulsion) that indexes the DynamoDB Streams output to enable traversing all the events in the store akin to how the CosmosDB ChangeFeed enables that |
Export | A Table can be exported in full to an S3 bucket as a set of json files containing all items |
Term | Description |
---|---|
Category | Group of Streams bearing a common prefix {category}-{streamId} (events are indexed into $ec-{Category} by system projections) |
Event | json or blob payload, together with an Event Type name representing an Event |
EventStore | Open source Event Sourcing-optimized data store server and programming model with powerful integrated projection facilities |
Rolling Snapshot | Event written to an EventStore stream in order to ensure minimal store roundtrips when there is a Cache miss |
Stream | Core abstraction presented by the API - an ordered sequence of Events |
WrongExpectedVersion |
Low level exception thrown to convey the occurence of an Optimistic Concurrency Violation |
Term | Description |
---|---|
Cache | System.Net.MemoryCache or equivalent holding State and/or etag information for a Stream with a view to reducing roundtrips, latency and/or Request Charges |
Unfolds | Snapshot information, stored in an appropriate storage location (not as a Stream's actual Events), but represented as Events, to minimize Queries and the attendant Request Charges when there is a Cache miss |
Version | When a decision function is invoked, it's presented with a State derived from the Stream's Events and/or Unfolds up to a given position. If the newest event has Index 9 (or it was loaded from an Unfold with i=9 ) the Version is 10 |
NB this has lots of room for improvement, having started as a placeholder in #50; improvements are absolutely welcome, as this is intended for an audience with diverse levels of familiarity with event sourcing in general, and Equinox in particular.
All the code handling any given Aggregate’s Invariants, Decisions and
Synchronous Queries should be encapsulated within a single
module
. It's
highly recommended to use the following canonical skeleton layout:
module Aggregate
let [<Literal>] private CategoryName = "category"
let private streamId = FsCodec.StreamId.gen Id.toString
(* Optionally (rarely) Helpers/Types *)
// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequiredQualifiedAccess>]
module Events =
type Snapshotted = ... // NOTE: event body types past tense with same name as case
type Event =
| ...
| [<DataMember(Name = "Snapshotted">] Snapshotted of Snapshotted // NOTE: Snapshotted event explictly named to remind one can/should version it
// optionally: `encode`, `tryDecode` (only if you're doing manual decoding)
let codec = FsCodec ... Codec.Create<Event>(...)
Some notes about the intents being satisfied here:
- types and cases in
Events
cannot be used without prefixing withEvents.
- while it can make sense to assert in terms of them in tests, in general sibling code in adjacentmodule
s should not be using them directly (in general interaction should be via thetype Service
)
✅ DO use tupled arguments for the streamId
function
All the inputs of which the StreamId
is composed should be represented as one argument:
✅ let streamId = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString
✅ let streamId struct (tenantId, clientId) = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString (tenantId, clientId)
❌ let streamId tenantId clientId = FsCodec.StreamId.gen2 TenantId.toString ClientId.toString (tenantId, clientId)
✅ DO keep CategoryName
and streamId
visibility private
, present via module Reactions
If the composition of stream names is relevant for Reactions processing, expose relevant helpers in a module Reactions
facade.
For instance, rather than having external reaction logic refer to Aggregate.CategoryName
, expose a facade such as:
module Reactions =
let streamName = streamName
let deletionNamePrefix tenantIdStr = $"%s{CategoryName}-%s{tenantIdStr}"
let [<return: Struct>] (|For|_|) = Stream.tryDecode
let [<return: Struct>] (|Decode|_|) = function
| struct (For id, _) & Streams.Decode dec events -> ValueSome struct (id, events)
| _ -> ValueNone
module Fold =
type State =
let initial: State = ...
module Snapshot =
let generate (state: State): Events.Event =
Events.Snapshotted { ... }
let isOrigin = function Events.Snapshotted -> true | _ -> false
let config = isOrigin, generate
let hydrate (e: Events.Snapshotted): State = ...
let private evolve state = function
| Events.Snapshotted e -> Snapshot.hydrate e
| Events.X -> (state update)
| Events.Y -> (state update)
let fold = Array.fold evolve
module Decisions =
let interpretX ... (state: Fold.State): Events.Event[] = ...
type Decision =
| Accepted
| Failed of Reason
let decideY ... (state: Fold.State): Decision * Events.Event[] = ...
interpret
,decide
and related input and output types / interfaces are public and top-level for use in unit tests (often unit tests willopen
themodule Fold
to useinitial
andfold
)
In some cases, where surfacing the state in some way makes sense (it doesn't always; see CQRS), you'll have a:
module Queries =
type XzyInfo = { ... }
let renderXyz (s: State): XzyInfo =
{ ... }
The above functions can all be unit tested directly. All other tests should use the Service
with a MemoryStore
via the member
s on that:
type Service internal (resolve: Id -> Equinox.Decider<Events.Event, Fold.State) = ...`
member _.Execute(id, command): Async<unit> =
let decider = resolve id
decider.Transact(Decisions.interpretX command)
member _.Decide(id, inputs): Async<Decision> =
let decider = resolve id
decider.Transact(Decisions.decideX inputs)
member private _.Query(maxAge, render): Async<Queries.XyzInfo> =
let decider = resolve id
decider.Query(render, Equinox.LoadOption.AllowStale maxAge)
member x.ReadCachedXyz(id): Async<Queries.XyzInfo> =
x.Query(TimeSpan.FromSeconds 10, Queries.renderXyz)
let create category = Service(streamId >> Equinox.Decider.forStream Serilog.Log.Logger category)
Service
's constructor isinternal
;create
is the main way in which one wires things up (using either a concrete store or aMemoryStore
) - there should not be a need to have it implement an interface and/or go down mocking rabbit holes.
While not all sections are omnipresent, significant thought and discussion has gone into arriving at this layout. Having everything laid out consistently is a big win, so customizing your layout / grouping is something to avoid doing until you have at least 3 representative aggregates of your own implemented.
Over the top of the Aggregate Module structure, one then binds this to a concrete storage subsystem. For example:
Depending on how you structure your app, you may opt to maintain such module
either within the module Aggregate
, or somewhere outside closer to the
Composition Root.
let defaultCacheDuration = System.TimeSpan.FromMinutes 20.
let cacheStrategy cache = Equinox.CosmosStore.CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
module EventStore =
let accessStrategy = Equinox.EventStoreDb.AccessStrategy.RollingSnapshots Fold.Snapshot.config
let category (context, cache) =
Equinox.EventStore.EventStoreCategory(context, CategoryName, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
module Cosmos =
let accessStrategy = Equinox.CosmosStore.AccessStrategy.Snapshot Fold.Snapshot.config
let category (context, cache) =
Equinox.CosmosStore.CosmosStoreCategory(context, CategoryName, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy cache)
For integration testing higher level functionality in Application Services
(straddling multiple Domain Service
s and/or layering behavior over them), you
can use the MemoryStore
in the context of your tests:
module MemoryStore =
let category (store: Equinox.MemoryStore.VolatileStore) =
Equinox.MemoryStore.MemoryStoreCategory(store, CategoryName, Events.codec, Fold.fold, Fold.initial)
This is key to being able to test Service
behavior without having to use Mocks.
In F#, independent of the Store being used, the Equinox programming model involves (largely by convention, see FAQ), per aggregation of events on a given category of stream:
-
CategoryName
: the common part of the Stream Name, i.e., the"Favorites"
part of the"Favorites-clientId"
-
streamId
: function responsible for mapping from the input elements that define the Aggregate's identity to thestreamId
portion of the{categoryName}-{streamId}
StreamName that's used within the concrete store. In general, the inputs should be strongly typed ids -
'event
: a discriminated union representing all the possible Events from which a state can beevolve
d (seee
vents andu
nfolds in the Storage Model). Typically the mapping of the json to an'event
c
ase is driven by aUnionContractEncoder
-
'state
: the rolling state maintained to enable Decisions or Queries to be made given a command and/or other context (not expected to be serializable or stored directly in a Store; can be held in a .NETMemoryCache
) -
initial: 'state
: the [implied] state of an empty stream. See also Null Object Pattern, Identity element -
fold: 'state -> 'event[] -> 'state
: function used to fold one or more loaded (or proposed) events (real ones and/or unfolded ones) into a given running persistent data structure of type'state
-
(
evolve: state -> 'event -> 'state
- thefolder
function from whichfold
is built, representing the application of a single delta that the'event
implies for the model to thestate
. Note:evolve
is an implementation detail of a given Aggregate;fold
is the function used in tests and used to parameterize the Category's storage configuration.. Sometimes namedapply
) -
interpret: (context/command etc ->) 'state -> 'event[]
ordecide: (context/command etc ->) 'state -> 'result * 'event[]
: responsible for Deciding (in an idempotent manner) how the intention represented bycontext/command
should be mapped with regard to the providedstate
in terms of: a) the'events
that should be written to the stream to record the decision b) (for the'result
in thedecide
signature) any response to be returned to the invoker (NB returning a result likely represents a violation of the CQS and/or CQRS principles, see Synchronous Query in the Glossary)
When using a Store with support for synchronous unfolds and/or snapshots, one
will typically implement two further functions in order to avoid having every
'event
in the stream be loaded and processed in order to build the 'state
per Decision or Query (versus a single cheap point read from CosmosDB to read
the tip):
-
isOrigin: 'event -> bool
: predicate indicating whether a given'event
is sufficient as a starting point i.e., provides sufficient information for theevolve
function to yield a correctstate
without any preceding event being supplied to theevolve
/fold
functions -
unfold: 'state -> 'event seq
: function used to render events representing the'state
which facilitate short circuiting the building ofstate
, i.e.,isOrigin
should be able to yieldtrue
when presented with this'event
. (in some cases, the store implementation will provide a customAccessStrategy
where theunfold
function should only produce a singleevent
; where this is the case, typically this is referred to astoSnapshot: 'state -> 'event
).
When running a decision process, we have the following stages:
-
establish a known
'state
( as at a given Position in the stream of Events) -
present the request/command and the
state
to theinterpret
function in order to determine appropriate events (can be many, or none) that represent the decision in terms of events -
append to the stream, contingent on the stream still being in the same State/Position it was in step 1: a. if there is no conflict (nobody else decided anything since we decided what we'd do given that command and state), append the events to the stream (retaining the updated position and etag)
b. if there is a conflict, obtain the conflicting events [that other writers have produced] since the Position used in step 1,
fold
them into ourstate
, and go back to 2 (aside: the CosmosDB stored procedure can send them back immediately at zero cost or latency, and there is a proposal for EventStore to afford the same facility) -
[if it makes sense for our scenario], hold the state, position and etag in our cache. When a Decision or Synchronous Query is needed, do a point-read of the tip and jump straight to step 2 if nothing has been modified.
See Cosmos Storage Model for a more detailed discussion of the role of the Sync Stored Procedure in step 3
The following example is a minimal version of the Favorites model, with shortcuts for brevity, that implements all the relevant functions above:
(* Event stream naming + schemas *)
let [<Literal>] private CategoryName = "Favorites"
let private streamId = FsCodec.StreamId.gen ClientId.toString
type Item = { id: int; name: string; added: DateTimeOffset }
type Event =
| Added of Item
| Removed of itemId: int
| Snapshotted of items: Item[]
(* State types/helpers *)
type State = Item list // NB IRL don't mix Event and State types
let is id x = x.id = id
(* Folding functions to build state from events *)
let evolve state = function
| Snapshotted items -> List.ofArray items
| Added item -> item :: state
| Removed id -> state |> List.filter (is id)
let fold = Array.fold evolve
let has id state = state |> List.exists (is id)
let decideAdd item state =
if state |> has item.id then [||]
else [| Added item |]
let decideRemove id state =
if state |> has id then [| Removed id |]
else [||]
(*
* Optional: Snapshot/Unfold-related functions to allow establish state
* efficiently, without having to read/fold all Events in a Stream
*)
let toSnapshot state = [| Event.Snapshotted (Array.ofList state) |]
(*
* The Service defines operations in business terms, neutral to any concrete
* store selection or implementation supplied only a `resolve` function that can
* be used to map from ids (as supplied to the `streamId` function) to an
* Equinox.Decider; Typically the service should be a stateless Singleton
*)
type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
member _.Favorite(clientId, sku) =
let decider = resolve clientId
decider.Transact(decideAdd sku)
member _.Unfavorite(clientId, sku) =
let decider = resolve clientId
decider.Transact(decideRemove sku)
member _.List clientId: Async<Events.Favorited []> =
let decider = resolve clientId
decider.Query id
let create resolve: Service =
Service(streamId >> resolve)
The most terse walkthrough of what's involved in using Equinox to do a
Synchronous Query and/or Execute a Decision process is in the
Programming Model section. In this section we’ll walk
through how one implements common usage patterns using the Equinox Decider
API
in more detail.
There are a plethora of basics underlying Event Sourcing and its cousin, the CQRS architectural style. There are also myriad ways of arranging event driven processing across a system.
The goal of CQRS is to enable representation of the same data using multiple models. It’s not a panacea, and most definitely not a top level architecture, but you should at least be considering whether it’s relevant in your context. There are various ways of applying the general CQRS concept as part of an architecture.
There are many trade-offs to be considered along the journey from an initial proof of concept implementation to a working system and evolving that over time to a successful and maintainable model. There is no official magical combination of CQRS and ES that is always correct, so it’s important to look at the following information as a map of all the facilities available - it’s certainly not a checklist; not all achievements must be unlocked.
At a high level we have:
- Aggregates - a set of information (Entities and Value Objects) within which Invariants need to be maintained, leading us us to logically group them in order to consider them when making a Decision
- Events - Events that have been accepted into the model as part of a Transaction represent the basic Facts from which State or Projections can be derived
- Commands/requests/Decisions - taking intent implied by an upstream request
or other impetus
(e.g., automated synchronization based on an upstream data source) driving a
need to sync a system’s model to reflect that implied need (while upholding
the Aggregate's Invariants). The Decision process is responsible proposing
Events to be appended to the Stream representing the relevant events in the
timeline of that Aggregate.
NOTE while you'll find 90% of event sourcing samples use a Command Pattern
that employs a class hierarchy (or, if the language supports it, represents it as
a discriminated union). You'll also find lots of examples of pipelines with
Mediators etc that illustrate how this enables you do hook in cross-cutting concerns.
In practice, the negatives of applying such a straitjacket outweigh the benefits.
Hence, it is advised to prefer having individual methods (
member
s) ontype Service
, and pass the relevant inputs via the argument list. - State - the State at any point is inferred by folding the events in order; this state typically feeds into the Decision with the goal of ensuring Idempotent handling (if its a retry and/or the desired state already pertained, typically one would expect the decision to be "no-op, no Events to emit")
- Projections/Reactions/Notifications - while most State we'll fold from the events will be dictated by what we need (in addition to a request's arguments) to be able to make the Decision implied by the command, the same Events that are necessary for Decision processing to be able to uphold the Invariants can also be used as the basis for various summaries at the single aggregate level, Rich Events exposed as notification feeds at the boundary of the Bounded Context, and/or as denormalized representations forming a Materialized View.
- Queries - as part of the processing, one might wish to expose the state before or after the Decision and/or a computation based on that to the caller as a result. In its simplest form (just reading the value and emitting it without any potential Decision applying), such a Synchronous Query is a violation of CQRS, which suggests that reads should always be served from a Read Model. Be sure you understand the trade-offs involved in applying CQRS before deciding to use it in All cases (or in None).
Equinox’s Transaction Handling consists of < 200 lines including interfaces and comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements you'll touch in a normal application are:
module Stream
- internal implementation of Optimistic Concurrency Control / retry loop used byDecider
. It's recommended to at least scan this file as it defines the Transaction semantics that are central to Equinox and the overallDecider
concept.type Decider
- surface API one uses toTransact
orQuery
against a specific stream's statetype LoadOption
Discriminated Union - used to specify optimization overrides to be applied when aDecider
'sQuery
orTransact
operations need to establish the state of the stream
Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the abstractions; the two files can tell many of the thousands of words about to follow!
type Equinox.Decider(...) =
// Run interpret function with present state, retrying with Optimistic Concurrency
member _.Transact(interpret: 'state -> 'event[]): Async<unit>
// Run decide function with present state, retrying with Optimistic Concurrency, yielding result on exit
member _.Transact(decide: 'state -> 'result * 'event[]): Async<'result>
// Runs a Null Flow that simply yields a `projection` from the state
member _.Query(projection: 'state -> 'view): Async<'view>
In this section, we’ll use possibly the simplest toy example: an unbounded list of items a user has 'favorited' (starred) in an e-commerce system.
See samples/Tutorial/Favorites.fsx. It’s recommended to load this in your IDE and feed it into the F# Interactive REPL to observe it step by step. Here, we'll skip some steps and annotate some aspects with regard to trade-offs that should be considered.
type Event =
| Added of string
| Removed of string
let initial: string list = []
let evolve state = function
| Added sku -> sku :: state
| Removed sku -> state |> List.filter (fun x -> x <> sku)
let fold = Array.fold evolve
Events are represented as an F# Discriminated Union; see the article on the
UnionContractEncoder
for information about how that's typically applied to map to/from an Event
Type/Case in an underlying Event storage system.
The evolve
function is responsible for computing the post-State that should
result from taking a given State and incorporating the effect that single
Event implies in that context and yielding that result without mutating either
input.
While the evolve
function operates on a state
and a single event, fold
(named for the standard FP operation of that name) walks an array of events,
propagating the running state into each evolve
invocation. It is the fold
operation that's typically used a) in tests and b) when passing a function to
an Equinox Resolver
to manage the behavior.
It should also be called out that Events represent Facts about things that have
happened - an evolve
or fold
should not throw Exceptions or log. There
should be absolutely minimal conditional logic.
In order to fulfill the without mutating either input constraint, typically
fold
and evolve
either deep clone to a new mutable structure with space for
the new events, or use a [persistent/immutable/incremental data structure, such as F#'s list
]
[https://en.wikipedia.org/wiki/Persistent_data_structure,]. The reason this is
necessary is that the result from fold
can also be used for one of the
following reasons:
- computing a 'proposed' state that never materializes due to a failure to save and/or an Optimistic Concurrency failure
- the store can sometimes take a
state
from the cache andfold
s in differentevents
(when the conflicting events are loaded for the retry in the loop) - concurrent executions against the stream may be taking place in parallel within the same process; this is permitted, Equinox makes no attempt to constrain the behavior in such a case
module Decisions =
let add sku state = [|
if not (state |> List.contains sku) then
Added sku |]
let remove sku state = [|
if state |> List.contains sku then
Removed sku |]
Transactions should almost invariably be implemented in an Idempotent fashion. Some would argue that a blind append may be an OK way to do this, perhaps where the operations are simple add/removes that are not problematic if repeated. However it should be noted that:
-
each write roundtrip (i.e. each
Transact
) to the store is not free, and neither are ripple effects resulting from all subscriptions having to process an event. As the cache can be used to validate whether an Event is actually necessary in the first instance, it's highly recommended to follow the convention as above and return no Events in the case of the intended state the request implied already being in effect. -
understanding the reasons for each event typically yields a more correct model and/or test suite, which pays off in more understandable code
-
as in the above, if the events are clean and minimal, the Fold State can be simpler; if
add
does not deduplicate redundant additions as it does above, the model would need to change to use aSet
, or consider whether the item was already present during everyevolve
operation involving anAdded
event -
under load, retries frequently bunch up, and being able to deduplicate them without hitting the store and causing a conflict can significantly reduce feedback effects that cause inordinate impacts on stability at the worst possible time
It should also be noted that, when executing a Decision, the interpret
function is expected to behave statelessly; as with fold
, multiple concurrent
calls within the same process can occur._
A final consideration to mention is that, even when correct idempotent handling
is in place, two writers can still produce conflicting events. In this
instance, the Transact
loop's Optimistic Concurrency control will cause the
'loser' to re-interpret
the Decision with an updated state
[incorporating
the conflicting events the other writer (thread/process/machine) produced] as
context
type Service internal (resolve: string -> Equinox.Decider<Events.Event, Fold.State>) =
member _.Favorite(clientId, sku) =
let decider = resolve clientId
decider.Transact(Decisions.add sku)
member _.Unfavorite(clientId, sku) =
let decider = resolve clientId
decider.Transact(Decisions.remove sku)
member _.List clientId: Async<string list> =
let decider = resolve clientId
decider.Query id
Note the resolve
parameter affords one a sufficient
seam that
facilitates testing independently with the MemoryStore
.
In general, you want to have as little logic as possible in your type Service
as possible - the ideal situation is for everything to be tested independent of
of storage, event encoding (and Equinox behaviors) by simply invoking functions
from module Decisions
.
Any logic that makes sense to put in type Service
should be tested with the
MemoryStore
to the maximum degree possible. You will likely also opt
to have some further tests that exercise the actual concrete store, but you
want to maximize the amount of validation of the paths that can be performed
against the in-memory store in order to have a fast-running test suite without
the flakiness that's inevitable with even a store emulator.
Favorite
and Unfavorite
run an Optimistic Concurrency Controlled Transact
loop in order to effect the intent of the [write-only] request. This involves:
- establish state
- use the Decision Function (
Decisions.add
/Decisions.remove
) to determine what (if any) Events need to be appended to manifest the intent - submit the Events, if any, for appending
- retrying 2/3 where there's a conflict (i.e., the version of the stream that pertained in 1 has been superseded)
- after
maxAttempts
retries, aMaxResyncsExhaustedException
is thrown, and an upstream can retry as necessary (depending on SLAs, a timeout may further constrain the number of retries that can occur)
Note that we've opted not to employ the Command pattern here; we have an individual
Decision Function per action/request, and an associated member
in the Service
that takes the parameters and passes them to it (as opposed to the relevant parameters
being stored in a Command class or Discriminated Union case)
-
while the Command pattern can help clarify a high level flow, there's no substitute for representing actual business functions as well-named methods representing specific behaviors that are meaningful in the context of the application's Ubiquitous Language, can be documented and tested.
-
the exact result type that makes sense for a given operation can vary; if you have a single handler, you're forced to use a highest common factor in the result type, i.e. you might end up using a result type that contains aspects that are redundant for some requests
List
above will do a roundtrip to the Store in order to fetch the most recent
state (in AnyCachedValue
or AllowStale
modes, the store roundtrip can be optimized
out by reading through the cache). This Synchronous Read can be used to
Read-your-writes
to establish a state incorporating the effects of any request you
know to have been completed.
It's important to consider that (unless you're trying to leverage the Read-your-writes guarantee), doing reads direct from an event-sourced store is generally not considered a best practice (the opposite, in fact). Any state you surface to a caller is by definition out of date the millisecond you obtain it, so in many cases a caller might as well use an eventually-consistent version of the same state obtained via a [n eventually-consistent] Projection (see terms above).
That said, if you're in a situation where your cache hit ratio is going to be high and/or you have reason to believe the underlying Event-Streams are not going to be long, pretty good performance can be achieved nonetheless; just consider that taking this shortcut will impede scaling and, at worst, can result in you ending up with a model that's potentially both:
-
overly simplistic - you're passing up the opportunity to provide a Read Model that directly models the requirement by providing a Materialized View
-
unnecessarily complex - the increased complexity of the
fold
function and/or any output fromunfold
(and its storage cost) is a drag on one's ability to read, test, extend and generally maintain the Command Handling/Decision logic that can only live on the write side
See the TodoBackend.com sample for reference info
regarding this sample, and
the .fsx
file from where this code is copied.
Note that the bulk if the design of the events stems from the nature of the
TodoBackend spec; there are many aspects of the implementation that constitute
less than ideal design; please note the provisos below...
module Events =
type Todo = { id: int; order: int; title: string; completed: bool }
type Event =
| Added of Todo
| Updated of Todo
| Deleted of int
| Cleared
| Snapshotted of Todo[]
The fact that we have a Cleared
Event stems from the fact that the spec
defines such an operation. While one could implement this by emitting a
Deleted
event per currently existing item, there many reasons to do model
this as a first class event:
- Events should reflect user intent in its most direct form possible; if the user clicked Delete All, it's not the same to implement that as a set of individual deletes that happen to be united by having timestamp with a very low number of ms of each other.
- Because the
Cleared
Event establishes a known State, one can have theisOrigin
flag the event as being the furthest one needs to search backwards before starting tofold
events to establish the state. This also prevents the fact that the stream gets long in terms of numbers of events from impacting the efficiency of the processing - While having a
Cleared
event happens to work, it also represents a technical trick in a toy domain and should not be considered some cure-all Pattern - real Todo apps don't have a 'declare bankruptcy' function. And example alternate approaches might be to represent each Todo list as it's own stream, and then have aTodoLists
aggregate coordinating those.
The Snapshotted
event is used to represent Rolling Snapshots (stored in-stream)
and/or Unfolds (stored in Tip document-Item); For a real Todo list, using this
facility may well make sense - the State can fit in a reasonable space, and the
likely number of Events may reach an interesting enough count to justify
applying such a feature:
- it should also be noted that Caching may be a better answer - note
Snapshotted
is also anisOrigin
event - there's no need to go back any further if you meet one. - we use an Array in preference to a [F#]
list
; while there areListConverter
s out there (notably not inFsCodec
), in this case an Array is better from a GC and memory-efficiency stance, and does not need any special consideration when usingNewtonsoft.Json
to serialize.
type State = { items: Todo list; nextId: int }
let initial = { items = []; nextId = 0 }
module Snapshot =
let private generate state = Snapshotted (Array.ofList state.items)
let private isOrigin = function Cleared | Snapshotted _ -> true | _ -> false
let config = isOrigin, generate
let hydrate items = { initial with items = List.ofArray items }
let private evolve s = function
| Snapshotted items -> Snapshot.hydrate items
| Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 }
| Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) }
| Deleted id -> { s with items = s.items |> List.filter (fun x -> x.id <> id) }
| Cleared -> { s with items = [] }
let fold = Array.fold evolve
- for
State
we use records andlist
s as the state needs to be a Persistent data structure. - in general the
evolve
function is straightforward idiomatic F# - while there are plenty ways to improve the efficiency (primarily by implementingfold
using mutable state), in reality this would reduce the legibility and malleability of the code.
module Decisions =
let add value (state: Fold.State) = [|
Events.Added { value with id = state.nextId } |]
let update (value: Events.Todo) (state: Fold.State) = [|
match state.items |> List.tryFind (function { id = id } -> id = value.id) with
| Some current when current <> value -> Events.Updated value
| _ -> () |]
let delete id (state: Fold.State) = [|
if state.items |> List.exists (fun x -> x.id = id) then
Events.Deleted { id = id } |]
let clear (state: Fold.State) = [|
if state.items |> List.isEmpty |> not then
Events.Cleared |]
-
Note
add
does not adhere to the normal idempotency constraint, being unconditional. If the spec provided an id or token to deduplicate requests, we'd track that in thefold
and use it to rule out duplicate requests. -
For
update
, we can lean on structural equality inwhen current <> value
to cleanly rule out redundant updates -
The current implementation is 'good enough' but there's always room to argue for adding more features. For
clear
, we could maintain a flag about whether we've just seen a clear, or have a request identifier to deduplicate, rather than risk omitting a chance to mark the stream clear and hence leverage theisOrigin
aspect of having the event.
type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
member _.List(clientId): Async<Events.Todo seq> =
let decider = resolve clientId
decider.Query(fun s -> s.items |> Seq.ofList)
member _.TryGet(clientId, id) =
let decider = resolve clientId
decider.Query(fun s -> s.items |> List.tryFind (fun x -> x.id = id))
member _.Create(clientId, template: Events.Todo): Async<Events.Todo> =
let decider = resolve clientId
decider.Transact(Decisions.add template, fun s -> s.items |> List.head)
member _.Patch(clientId, item: Events.Todo): Async<Events.Todo> =
let decider = resolve clientId
decider.Transact(Decisions.update item, fun s -> s.items |> List.find (fun x -> x.id = item.id))
member _.Delete(clientId, id): Async<unit> =
let decider = resolve clientId
decider.Transact(Decisions.delete id)
member _.Clear(clientId): Async<unit> =
let decider = resolve clientId
decider.Transact(Decisions.clear)
-
Create
andPatch
represents a command processing flow where we (idempotently) manifest the intent that the incoming request represents (often referred to as the Command), but then also echo back a response based on the resulting state to the caller, as dictated by the needs of the call as specified in the TodoBackend spec. Therender
function we pass is presented with the post-state (folded from the input state and the events (if any) applied), and then project from that -
While we could theoretically use Projections to service queries from an eventually consistent Read Model, this is not in alignment with the Read-you-writes expectation embodied in the tests (i.e. it would not pass the tests), and, more importantly, would not work correctly as a backend for the app.
-
The main conclusion to be drawn from the Favorites and TodoBackend
Service
implementation's use ofDecider
Methods is that, while there can be commonality in terms of the sorts of transactions one might encapsulate in this manner, there's also It Depends factors; for instance:- the design doesnt provide complete idempotency and/or follow the CQRS style
- the fact that this is a toy system with lots of artificial constraints and/or simplifications when compared to aspects that might present in a more complete implementation.
-
the
streamId
helper (and optionalDecode
Active Patterns) provide succinct ways to map an incomingclientId
(which is not astring
in the real implementation but instead an id usingFSharp.UMX
in an unobtrusive manner.
Queries are handled by Equinox.Decider
s' Query
function.
A query projects a value from the 'state
of an Aggregate. Queries should be
used sparingly, as loading and folding the events each time is against the
general principle of Command Query Responsibility Segregation (CQRS). A query
should not simply expose the 'state
of an aggregate, as this will inevitably
lead to the leaking of decision logic outside of the Aggregate's module
.
// Query function exposing part of the state
member _.ReadAddress(clientId) =
let decider = resolve clientId
decider.Query(fun state -> state.address)
// Return the entire state we hold for this aggregate (NOTE: generally not a good idea)
member _.Read(clientId) =
let decider = resolve clientId
decider.Query id
Commands or Decisions are handled via Equinox.Decider
's Transact
method
The normal command pattern
involves taking the execution context (e.g., the principal on behalf of which
the processing is happening), a command (with relevant parameters) reflecting
the intent and the present 'state
of the Aggregate into account and mapping
that to one or more Events that represent that intent as a decision on the
stream.
In this case, the Decision Process is interpret
ing the Command in the
context of a 'state
.
The function signature is:
let interpret (context, command, args) state: Events.Event[]
Note the 'state
is the last parameter; it's computed and supplied by the
Equinox Flow.
If the interpret function does not yield any events, there will be no trip to the store them.
A requst may be rejected
by throwing
from within the interpret
function.
Note that emitting an event dictates that the processing may be rerun should a conflicting write have taken place since the loading of the state
let interpret (context, command) state: Events.Event[] = [|
match tryCommand context command state with
| None ->
() // not relevant / already in effect
| Some eventDetails -> // accepted, mapped to event details record
Events.HandledCommand eventDetails |]
type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.State>)
// Given the supplied context, apply the command for the specified clientId
member _.Execute(clientId, context, command): Async<unit> =
let decider = resolve clientId
decider.Transact(interpret (context, command))
// Given the supplied context, apply the command for the specified clientId
// Throws if this client's data is marked Read Only
member _.Execute(clientId, context, command): Async<unit> =
let decider = resolve clientId
decider.Transact(fun state ->
if state.isReadOnly then raise AccessDeniedException() // Mapped to 403 externally
interpretCommand (context, command) state)
In some cases, depending on the domain in question, it may be appropriate to
record some details of the request (that are represented as Events that become
Facts), even if the 'command' is logically ignored. In such cases, the
necessary function is a hybrid of a projection and the preceding interpret
signature: you're both potentially emitting events and yielding an outcome or
projecting some of the 'state'.
In this case, the signature is: let decide (context, args) state: 'result * Events.Event[]
Note that the return value is a tuple of ('result, Events.Event[])
:
- the
fst
element is returned fromdecider.Transact
- the
snd
element of the tuple represents the events (if any) that should represent the state change implied by the request.
Note if the decision function yields events, and a conflict is detected, the
flow may result in the decide
function being rerun with the conflicting state
until either no events are emitted, or there were on further conflicting writes
supplied by competing writers.
let decide (context, command) state: int * Events.Event[] =
// ... if `snd` contains event, they are written
// `fst` (an `int` in this instance) is returned as the outcome to the caller
type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.State>) =
// Given the supplied context, attempt to apply the command for the specified clientId
// NOTE Try will return the `fst` of the tuple that `decide` returned
// If >1 attempt was necessary (e.g., due to conflicting events), the `fst`
// from the last attempt is the outcome
member _.Try(clientId, context, command): Async<int> =
let decider = resolve clientId
decider.Transact(decide (context, command))
-
Identify Invariants you're seeking to maintain. Events are ordered and updates consistency checked for this reason; it'll also be an important part of how you test things.
-
In general, you want to default to separating reads from writes for ease of understanding, testing, maintenance and scaling (see CQRS)
-
Any command's processing should take into account the current
'state
of the aggregate,interpreting
the state in an idempotent manner; processing the same request twice should result in no events being written when the same logical request is made the second time.
-
Write blindly: blind writes (ignoring idempotence principles) is normally a design smell
-
Mixing Commands and Queries - in general, the read and write paths should be separated as much as possible (see CQRS)
The canonical interpret
and decide
signatures above make unit testing
possible without imposing the use of any support libraries or DSLs.
Given an opening state
and an interpret
command, you can validate the
handling is idempotent as follows:
let fold, initial = Aggregate.Fold.fold, Aggregate.Fold.initial
// Alternately: open Aggregate.Fold
let validateInterpret contextAndOrArgsAndOrCommand state =
let events = interpret contextAndOrArgsAndOrCommand state
// TODO assert/match against the events to validate correct events
// considering the contextAndOrArgsAndOrCommand
let state' = fold state events
// TODO assert the events, when `fold`ed, yield the correct successor state
// (in general, prefer asserting against `events` than `state'`)
state'
// Validate handling is idempotent in nature
let validateIdempotent contextAndOrArgsAndOrCommand state' =
let events' = interpret contextAndOrArgsAndOrCommand state'
match events' with
| [||] -> ()
// TODO add clauses to validate edge cases that should still generate events on a re-run
| xs -> failwithf "Not idempotent; Generated %A in response to %A" xs contextAndOrArgsAndOrCommand
With FsCheck.Xunit
To validate command is always valid given the Aggregate's initial
state:
let [<Property>] properties contextAndOrArgsAndOrCommand =
let state' = validateInterpret contextAndOrArgsAndOrCommand initial
validateIdempotent contextAndOrArgsAndOrCommand state'
With xUnit
TheoryData
type InterpretCases() as this =
inherit TheoryData()
do this.Add( case1 )
do this.Add( case2 )
let [<Theory; ClassData(nameof InterpretCases>] examples args =
let state' = validateInterpret contextAndOrArgsAndOrCommand initial
validateIdempotent contextAndOrArgsAndOrCommand state'
In some cases, a Command is logically composed of separable actions against the
aggregate. It's advisable in general to represent each aspect of the processing
in terms of the above interpret
function signature. This allows that aspect
of the behavior to be unit tested cleanly. The overall chain of processing can
then be represented as a composed method
which can then manage the overall transaction.
There's an example of such a case in the Cart's Domain Service:
let interpretMany fold (interpreters: seq<'state -> 'event[]>) (state: 'state): 'state * 'event[] = [|
let mutable state = state
for interpret in interpreters do
let events = interpret state
state <- fold state events
yield! events |]
type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.State>) =
member _.SyncItems(cartId, reqs: Item seq, ?prepare): Async<unit> =
let interpret state = async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpretSync reqs) state }
let decider = resolve cartId
decider.Transact(interpret)
NOTE: This is an example of an alternate approach provided as a counterpoint - there's no need to read it as the preceding approach is the recommended one is advised as a default strategy to use
An alternate approach is to encapsulate the folding (Equinox in V1 exposed an interface that encouraged such patterns; this was removed in two steps, as code written using the idiomatic approach is intrinsically Simpler, even if it seems not as Easy at first)
/// Maintains a rolling folded State while Accumulating Events pended as part
/// of a decision flow
type Accumulator<'event, 'state>(fold: 'state -> 'event[] -> 'state, originState: 'state) =
let accumulated = ResizeArray<'event>()
/// The Events that have thus far been pended via the `decide` functions
/// `Execute`/`Decide`d during the course of this flow
member _.Events: 'event[] =
accumulated.ToArray()
/// The current folded State, based on the Stream's `originState` + any
/// events that have been Accumulated during the the decision flow
member _.State: 'state =
accumulated |> fold originState
/// Invoke a decision function, gathering the events (if any) that it
/// decides are necessary into the `Accumulated` sequence
member x.Transact(interpret: 'state -> 'event[]): unit =
interpret x.State |> accumulated.AddRange
/// Invoke an Async decision function, gathering the events (if any) that
/// it decides are necessary into the `Accumulated` sequence
member x.Transact(interpret: 'state -> Async<'event[]>): Async<unit> = async {
let! events = interpret x.State
accumulated.AddRange events }
/// Invoke a decision function, while also propagating a result yielded as
/// the fst of an (result, events) pair
member x.Transact(decide: 'state -> 'result * 'event[]): 'result =
let result, newEvents = decide x.State
accumulated.AddRange newEvents
result
/// Invoke a decision function, while also propagating a result yielded as
/// the fst of an (result, events) pair
member x.Transact(decide: 'state -> Async<'result * 'event[]>): Async<'result> = async {
let! result, newEvents = decide x.State
accumulated.AddRange newEvents
return result }
type Service ... =
member _.Run(cartId, reqs: Item seq, ?prepare): Async<unit> =
let interpret state = async {
match prepare with None -> () | Some prep -> do! prep
let acc = Accumulator(Fold.fold, state)
for req in reqs do
acc.Transact(interpretSync req)
return acc.Events }
let decider = resolve cartId
decider.Transact(interpret)
There are virtually unlimited ways to build an event-sourced model. It's critical that, for any set of components to be useful, that they are designed in a manner where one combines small elements to compose a whole, versus trying to provide a hardwired end-to-end 'framework'.
While going the library route leaves plenty seams needing to be tied together at the point of consumption (with resulting unavoidable complexity), it's unavoidable if one is to provide a system that can work in the real world.
This section outlines key concerns that the Equinox Programming Model is specifically taking a view on, and those that it is going to particular ends to leave open.
F#, out of the box has a very relevant feature set for building Domain models in an event sourced fashion (DUs, persistent data structures, total matching, list comprehensions, async builders etc). However, there are still lots of ways to split the process of folding the events, encoding them, deciding events to produce etc.
In the general case, it doesnt really matter what way one opts to model the events, folding and decision processing.
However, given one has a specific store (or set of stores) in mind for the events, a number of key aspects need to be taken into consideration:
-
Coding/encoding events - Per aggregate or system, there is commonality in how one might wish to encode and/or deal with versioning of event representations. Per store, the most efficient way to bridge to that concern can vary. Per domain and encoding system, the degree to which one wants to unit or integration test this codec process will vary.
-
Caching - Per store, there are different tradeoffs/benefits for Caching. Per system, caching may or may not even make sense. For some stores, it makes sense to integrate caching into the primary storage.
-
Snapshotting - The store and/or the business need may provide a strong influence on whether or not (and how) one might employ a snapshotting mechanism.
This section enumerates key concerns feeding into how Stores in general, and specific concrete Stores bind to the Programming Model:
TL;DR caching not really needed, storing snapshots has many considerations in play, projections built in
Overview: EventStore is a mature and complete system, explicitly designed to address key aspects of building an event-sourced system. There are myriad bindings for multiple languages and various programming models. The docs present various ways to do snapshotting. The projection system provides ways in which to manage snapshotting, projections, building read models etc.
Key aspects relevant to the Equinox programming model:
-
In general, EventStore provides excellent caching and performance characteristics intrinsically by virtue of its design
-
Projections can be managed by the
Propulsion.EventStoreDb
library; there is also aneqx project stats es
feature). -
In general event streams should be considered append only, with no mutations or deletes
-
For snapshotting, one can either maintain a separate stream with a maximum count or TTL rule, or include faux Compaction events in the normal streams (to make it possible to combine reading of events and a snapshot in a single roundtrip). The latter is presently implemented in
Equinox.EventStore
-
While there is no generic querying facility, the APIs are designed in such a manner that it's generally possible to achieve any typically useful event access pattern needed in an optimal fashion (projections, catchup subscriptions, backward reads, caching)
-
While EventStore allows either json or binary data, its generally accepted that json (presented as UTF-8 byte arrays) is a good default for reasons of interoperability (the projections facility also strongly implies json)
TL;DR caching can optimize RU consumption significantly. Due to the intrinsic ability to mutate easily, the potential to integrate rolling snapshots into core storage is clear. Providing ways to cache and snapshot matter a lot on CosmosDB, as lowest-common-denominator queries loading lots of events cost in performance and cash. The specifics of how you use the changefeed matters more than one might thing from the CosmosDB high level docs.
Overview: CosmosDB has been in production for >5 years and is a mature Document database. The initial DocumentDb offering is at this point a mere projected programming model atop a generic Document data store. Its changefeed mechanism affords a base upon which one can manage projections, but there is no directly provided mechanism that lends itself to building Projections that map directly to EventStore's facilities in this regard (i.e., there is nowhere to maintain consumer offsets in the store itself).
Key aspects relevant to the Equinox programming model:
-
CosmosDB has pervasive optimization feedback per call in the form of a Request Charge attached to each and every action. Working to optimize one's request charges per scenario is critical both in terms of the effect it has on the amount of Request Units/s one you need to pre-provision (which translates directly to costs on your bill), and then live predictably within if one is not to be throttled with 429 responses. In general, the request charging structure can be considered a very strong mechanical sympathy feedback signal
-
Point reads of single documents based on their identifier are charged as 1 RU plus a price per KB and are optimal. Queries, even ones returning that same single document, have significant overhead and hence are to be avoided
-
One key mechanism CosmosDB provides to allow one to work efficiently is that any point-read request where one supplies a valid
etag
is charged at 1 RU, regardless of the size one would be transferring in the case of a cache miss (the other key benefit of using this is that it avoids unnecessarily clogging of the bandwidth, and optimal latencies due to no unnecessary data transfers) -
Indexing things surfaces in terms of increased request charges; at scale, each indexing hence needs to be justified
-
Similarly to EventStore, the default ARS encoding CosmosDB provides, together with interoperability concerns, means that straight json makes sense as an encoding form for events (UTF-8 arrays)
-
Collectively, the above implies (arguably counter-intuitively) that using the powerful generic querying facility that CosmosDB provides should actually be a last resort.
-
See Cosmos Storage Model for further information on the specific encoding used, informed by these concerns.
-
Because reads, writes and updates of items in the Tip document are charged based on the size of the item in units of 1KB, it's worth compressing and/or storing snapshots outside of the Tip-document (while those factors are also a concern with EventStore, the key difference is their direct effect of charges in this case).
The implications of how the changefeed mechanism works also have implications for how events and snapshots should be encoded and/or stored:
-
Each write results in a potential cost per changefeed consumer, hence one should minimize changefeed consumers count
-
Each update of a document can have the same effect in terms of Request Charges incurred in tracking the changefeed (each write results in a document "moving to the tail" in the consumption order - if multiple writes occur within a polling period, you'll only see the last one)
-
The ChangeFeedProcessor presents a programming model which needs to maintain a position. Typically one should store that in an auxiliary collection in order to avoid feedback and/or interaction between the changefeed and those writes
It can be useful to consider keeping snapshots in the auxiliary collection employed by the changefeed in order to optimize the interrelated concerns of not reading data redundantly, and not feeding back into the oneself (although having separate roundtrips obviously has implications).
This article provides a walkthrough of how Equinox.CosmosStore
encodes, writes and
reads records from a stream under its control.
The code (see source) contains lots of comments and is intended to be read - this just provides some background.
Events are stored in immutable batches consisting of:
p
artitionKey:string
// stream identifier, e.g. "Cart-{guid}"i
ndex:int64
// base index position of this batch (0
for first event in a stream)n
extIndex:int64
// base index ('i') position value of the next record in the stream - NB this always corresponds toi
+e.length
(in the case of theTip
record, there won't actually be such a record yet)id
:string
// same asi
(CosmosDB forces every item (document) to have one[, and it must be astring
])e
vents:Event[]
// (see next section) typically there is one item in the array (can be many if events are small, for RU and performance/efficiency reasons; RU charges are per 1024 byte block)ts
// CosmosDB-intrinsic last updated date for this record (changes when replicated etc, hence seet
below)
Per Event
, we have the following:
c
ase - the case of this union in the Discriminated Union of Events this stream bears (aka Event Type)d
ata - json data (CosmosDB maintains it as actual json; you are free to index it and/or query based on that if desired)m
etadata - carries ancillary information for an event; also jsont
- creation timestamp
The tip is always readable via a point-read, as the id
has a fixed,
well-known value: "-1"
). It uses the same base layout as the aforementioned
Batch (Tip
isa Batch
), adding the following:
id
: always-1
so one can reference it in a point-read GET request and not pay the cost and latency associated with a full indexed query_etag
: CosmosDB-managed field updated per-touch (facilitatesNotModified
result, see below)u
: Array of _unfold_ed events based on a point-in-time state (see State, Snapshots, Events and Unfolds, Unfolded Events andunfold
in the programming model section). Not indexed. While the data is json, the actuald
ata andm
etadata fields are compressed and encoded as base64 (and hence can not be queried in any reasonable manner).
In an Event Sourced system, we typically distinguish between the following basic elements
-
Events - Domain Events representing real world events that have occurred (always past-tense; it's happened and is not up for debate), reflecting the domain as understood by domain experts - see Event Storming. Examples: The customer favorited the item, the customer add SKU Y to their saved for later list, A charge of $200 was submitted successfully with transaction id X.
-
State - derived representations established from Events. A given set of code in an environment will, in service of some decision making process, interpret the Events as implying particular state changes in a model. If we change the code slightly or add a field, you wouldn't necessarily expect a version of your code from a year ago to generate you equivalent state that you can simply blast into your object model and go. (But you can easily and safely hold a copy in memory as long as your process runs as this presents no such interoperability or versioning concerns). State is not necessarily always serializable, nor should it be.
-
Snapshots - A snapshot is an intentionally roundtrippable version of a State, that can be saved and restored. Typically one would do this to save the (latency, roundtrips, RUs, deserialization and folding) cost of loading all the Events in a long running sequence of Events to re-establish the State. The EventStore folks have a great walkthrough on Rolling Snapshots.
-
Projections - the term projection is heavily overloaded, meaning anything from the proceeds of a SELECT statement, the result of a
map
operation, an EventStore projection to an event being propagated via Kafka (no, further examples are not required!).
.... and:
- Unfolds - the term
unfold
is based on the well known 'standard' FP function of that name, bearing the signature'state -> 'event seq
. => ForEquinox.CosmosStore
, one might sayunfold
yields projection s as event s to snapshot the state as at that position in the stream.
Periodically, along with writing the events that a decision function yields
to represent the implications of a command given the present state, we also
unfold
the resulting state'
and supply those to the Sync
function too.
The unfold
function takes the state
and projects one or more
snapshot-events that can be used to re-establish a state equivalent to that we
have thus far derived from watching the events on the stream. Unlike normal
events, unfold
ed events do not get replicated to other systems, and can also
be jetisonned at will (we also compress them rather than storing them as fully
expanded json).
NB, in the present implementation, u
nfolds are generated, transmitted and
updated upon every write; this makes no difference from a Request Charge
perspective, but is clearly suboptimal due to the extra computational effort
and network bandwidth consumption. This will likely be optimized by exposing
controls on the frequency at which unfold
s are triggered
The dominant pattern is that reads request Tip with an IfNoneMatch
precondition citing the etag it bore when we last saw it. That, when combined
with a cache means one of the following happens when a reader is trying to
establish the state of a stream prior to processing a Command:
-
NotModified
(depending on workload, can be the dominant case) - for1
RU, minimal latency and close-to-0
network bandwidth, we know the present state -
NotFound
(there's nothing in the stream) - for equivalently low cost (1
RU), we know the state isinitial
-
Found
- (if there are multiple writers and/or we don't have a cached version) - for the minimal possible cost (a point read, not a query), we have all we need to establish the state:i
: a version numbere
: events since that version numberu
: unfolded (auxiliary) events computed at the same time as the batch of events was sent (aka informally as snapshots) - (these enable us to establish thestate
without further queries or roundtrips to load and fold all preceding events)
Given a stream with:
[
{ "id":0, "i":0, "e": [{"c":"c1", "d":"d1"}]},
{ "id":1, "i":1, "e": [{"c":"c2", "d":"d2"}]},
{ "id":2, "i":2, "e": [{"c":"c2", "d":"d3"}]},
{ "id":3, "i":3, "e": [{"c":"c1", "d":"d4"}]},
{ "id":-1,
"i": 4,
"e": [{"i":4, "c":"c3", "d":"d5"}],
"u": [{"i":4, "c":"s1", "d":"s5Compressed"}, {"i":3, "c":"s2", "d":"s4Compressed"}],
"_etag": "etagXYZ"
}
]
If we have state4
based on the events up to {i:3, c:c1, d: d4}
and the Tip
Item-document, we can produce the state
by folding in a variety of ways:
fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ]
(but would need a query to load the first 2 batches, with associated RUs and roundtrips)fold state4 [ C3 d5 ]
(only need to pay to transport the tip document as a point read)- (if
isOrigin (S1 s5)
=true
):fold initial [S1 s5]
(point read + transport + decompresss5
) - (if
isOrigin (S2 s4)
=true
):fold initial [S2 s4; C3 d5]
(only need to pay to transport the tip document as a point read and decompresss4
ands5
)
If we have state3
based on the events up to {i:3, c:c1, d: d4}
, we can
produce the state
by folding in a variety of ways:
fold initial [ C1 d1; C2 d2; C3 d3; C1 d4; C3 d5 ]
(but query, round-trips)fold state3 [C1 d4 C3 d5]
(only pay for point read+transport)fold initial [S2 s4; C3 d5]
(only pay for point read+transport)- (if
isOrigin (S1 s5)
=true
):fold initial [S1 s5]
(point read + transport + decompresss5
) - (if
isOrigin (S2 s4)
=true
):fold initial [S2 s4; C3 d5]
(only need to pay to transport the tip document as a point read and decompresss4
ands5
)
If we have state5
based on the events up to C3 d5
, and (being the writer,
or a recent reader), have the etag: etagXYZ
, we can do a HTTP GET
with
etag: IfNoneMatch etagXYZ
, which will return 304 Not Modified
with < 1K of
data, and a charge of 1.00
RU allowing us to derive the state as:
state5
See Programming Model for what happens in the application based on the events presented.
This covers the V3 implementation of the JS Stored Procedure (see source) does when presented with a batch to be written.
The sync
stored procedure takes as input, a document that is almost identical
to the format of the Tip
batch (in fact, if the stream is found to be
empty, it is pretty much the template for the first document created in the
stream). The request includes the following elements:
-
expectedVersion
: the position the requester has based their [proposed] events on (no, providing anetag
to save on Request Charges is not possible in the Stored Proc) -
the
expectedEtag
enables competing writers to maintain and updateu
nfold data in a consistent fashion (backing off and retrying in the case of conflict, without any events being written per state change) (SeeAccessStrategy.RollingState
,AccessStrategy.Custom
) -
e
: array of Events (see Event, above) to append if, and only if, the expectedVersion check is fulfilled -
u
: array ofunfold
ed events (aka snapshots) that supersede items with equivalentc
ase values -
maxEventsInTip
: the maximum number of events permitted to be retained in the Tip (subject to that not exceeding themaxStringifyLen
rule). For example:- if
e
contains 2 events, the tip document'se
has 2 events and themaxEventsInTip
is5
, the events get appended onto the tip'se
vents - if the total length including the new
e
vents would exceedmaxEventsInTip
, the Tip is 'renamed' (gets itsid
set toi.toString()
) to become a batch (with the newe
vents included in that calved Batch), and the new Tip has a zero-lengthe
vents array as aBatch
, and a set ofu
nfolds (as an atomic transaction on the server side)
- if
-
maxStringifyLen
: secondary constraint on the events retained in the tip (in addition tomaxEventsInTip
constraint) - constrains the maximum length of the events being buffered in the Tip by applying a size limit in characters (as computed viaJSON.stringify(events).length
) -
(PROPOSAL/FUTURE)
thirdPartyUnfoldRetention
: how many events to keep before the base (i
) of the batch if required by laggingu
nfolds which would otherwise fall out of scope as a result of the appends in this batch (this will default to0
, so for example if a writer says maxEvents10
and there is anu
nfold based on an event more than10
old it will be removed as part of the appending process)
The Equinox.CosmosStore.Core
namespace provides a lower level API that can be used
to manipulate events stored within a Azure CosmosDB using optimized native
access patterns.
The higher level APIs (i.e. not Core
), as demonstrated by the dotnet new
templates are recommended to be used in the general case, as they provide the
following key benefits:
-
Domain logic is store-agnostic, leaving it easy to:
- Unit Test in isolation (verifying decisions produce correct events)
- Integration test using the
MemoryStore
, where relevant
-
Decouples encoding/decoding of events from the decision process of what events to write (means your Domain layer does not couple to a specific storage layer or encoding mechanism)
-
Enables efficient caching and/or snapshotting (providing Equinox with
fold
,initial
,isOrigin
,unfold
and a codec allows it to manage this efficiently) -
Provides Optimistic Concurrency Control with retries in the case of conflicting events
open Equinox.CosmosStore.Core
type EventData with
static member FromT eventType value =
FsCodec.Core.EventData.Create(eventType, Json.toBytes value)
// Load connection string from your Key Vault (example here is the CosmosDB
// simulator's well known key)
// see https://github.com/jet/equinox#provisioning-cosmosdb
let connectionString: string =
"AccountEndpoint=https://localhost:8081;AccountKey=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==;"
// Forward to Log (use `Log.Logger` if your app already uses Serilog)
let outputLog = LoggerConfiguration().WriteTo.Console().CreateLogger()
// Serilog has a `ForContext<T>()`, but if you are using a `module` for the
// wiring, you might create a tagged logger like this:
let gatewayLog =
outputLog.ForContext(Serilog.Core.Constants.SourceContextPropertyName, "Equinox")
let connector: Equinox.CosmosStore.CosmosStoreConnector =
CosmosStoreConnector(
Equinox.CosmosStore.Discovery.ConnectionString connectionString,
requestTimeout = TimeSpan.FromSeconds 5.,
maxRetryAttemptsOnRateLimitedRequests = 1,
maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.)
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
let context = CosmosStoreContext(client, "databaseName", "containerName")
let ctx = EventsContext(context, gatewayLog)
//
// Write an event
//
let expectedSequenceNumber = 0 // new stream
let streamName, eventType, eventJson = "stream-1", "myEvent", Request.ToJson event
let eventData = EventData.fromT(eventType, eventJson) |> Array.singleton
let! res =
Events.append
ctx
streamName
expectedSequenceNumber
eventData
match res with
| AppendResult.Ok -> ()
| c -> c |> failwithf "conflict %A"
An Access Strategy defines any optimizations regarding how one arrives at a State of an Aggregate based on the Events stored in a Stream in a Store.
The specifics of an Access Strategy depend on what makes sense for a given
Store, i.e. Equinox.CosmosStore
necessarily has a significantly different set of
strategies than Equinox.EventStore
(although there is an intersection).
Access Strategies only affect performance; you should still be able to infer
the state of the aggregate based on the fold
of all the events
ever written
on top of an initial
state
NOTE: its not important to select a strategy until you've actually actually modelled your aggregate, see what if I change my access strategy
TL;DR Equinox.CosmosStore
: (see also: the storage
model for a deep dive, and glossary,
below the table for definition of terms)
-
keeps all the Events for a Stream in a single CosmosDB logical partition
-
Transaction guarantees are provided at the logical partition level only. As for most typical Event Stores, the mechanism is based on Optimistic Concurrency Control. There's no holding of a lock involved - it's based on conveying your premise alongside with the proposed change; In terms of what we are doing, you observe a
state
, proposeevents
, and the store is responsible for applying the change, or rejecting it if thestate
turns out to longer be the case when you get around tosync
ing the change. The change is rejected if your premise (things have not changed since I saw them) is invalidated (whereupon you loop, working from the updated state). -
always has a special 'index' document (we term it the
Tip
document), per logical partition/stream that is accessible via an efficient point read (it always has a CosmosDBid
value of"-1"
) -
Events are stored in Batches in immutable documents in the logical partition, the
Tip
document is the only document that's ever updated (it's always updated, as it happens...). -
As all writes touch the
Tip
, we have a natural way to invalidate any cached State for a given Stream; we retain the_etag
of theTip
document, and updates (or consistent reads) are contingent on it not having changed. -
The (optimistic) concurrency control of updates is by virtue of the fact that every update to the
Tip
touches the document and thus alters (invalidates) the_etag
value. This means that, in contrast to how many SQL based stores (and most CosmosDB based ones) implement concurrency control, we don't rely on primary key constraint to prevent two writers writing conflicting events to the same stream. -
A secondary benefit of not basing consistency control on a primary key constraint or equivalent, is that we no longer having to insert an Event every time we are updating something. (This fact is crucial for the
RollingState
andCustom
strategies). -
The
interpret
/decide
function is expected to deduplicate writes by not producingevents
if thestate
implies such updates would be redundant. Equinox does not have any internal mechanism to deduplicate events, thus having correct deduplication is the key to reducing round-trips and hence minimizing RU consumption (and the adverse effects that the retry cycles due to contention have, which will most likely arise when load is at its highest). -
The
unfolds
maintained inTip
have the bodies (thed
andm
fields) 1) deflated 2) base64 encoded (as everyone is reading the Tip, its worthwhile having the writer take on the burden of compressing, with the payback being that write amplification effects are reduced by readers paying less RUs to read them). The snapshots can be inspected securely via theeqx
tool'sdump
facility, or unsecurely online via the decode button onjgraph
'sdrawio-tools
at https://jgraph.github.io/drawio-tools/tools/convert.html, if the data is not sensitive. -
The
Tip
document, (and the fact we hold its_etag
in our cache alongside the State we have derived from the Events), is at the heart of why consistent reads are guaranteed to be efficient (Equinox does the read of theTip
document contingent on the_etag
not having changed; a read of any size costs only 1 RU if the result is304 NOT Modified
) -
Specific Access Strategies:
- define what we put in the
Tip
- control how we short circuit the process of loading all the Events and
fold
ing them from the start, if we encounter a Snapshot or Reset Event - allow one to post-process the events we are writing as required for reasons of optimization
- define what we put in the
Strategy | TL;DR | Tip document maintains |
Best suited for |
---|---|---|---|
Unoptimized |
Keep Events only (but there's still an (empty) Tip document) |
Count of event (and ability to have any insertion invalidate the cache for any reader) | No load, event counts or event sizes worth talking about. initial implementations. |
LatestKnownEvent |
Special mode for when every event is completely independent, so we completely short-circuit loading the events and folding them and instead only use the latest event (if any) | A copy of the most recent event, together with the count | 1) Maintaining a local copy of a Summary Event representing information obtained from a partner service that is authoritative for that data 2) Tracking changes to a document where we're not really modelling events as such, but with optimal efficiency (every read is a point read of a single document) |
Snapshot |
Keep a (single) snapshot in Tip at all times, guaranteed to include all events |
A single unfold produced by toSnapshot based on the state' (i.e., including any events being written) every time, together with the event count |
Typical event-sourced usage patterns. Good default approach. Very applicable where you have lots of small 'delta' events that you only consider collectively. |
MultiSnapshot |
As per Snapshot , but toSnapshots can return arbitrary (0, one or many) unfold s |
Multiple (consistent) snapshots (and _etag /event count for concurrency control as per other strategies) |
Blue-green deployments where an old version and a new version of the app cannot share a single snapshot schema |
RollingState |
"No event sourcing" mode - no events, just state . Perf as good as Snapshot , but don't even store the events so we will never hit CosmosDB stream size limits |
toSnapshot state' : the squashed state + events which replace the incoming state |
Maintaining state of an Aggregate with lots of changes a) that you don't need a record of the individual changes of yet b) you would like to model, test and develop as if one did DO NOT use if a) you want to be able to debug state transitions by looking at individual change events b) you need to react to and/or project events relating to individual updates (CosmosDB does not provide a way to provide a notification of every single update, even if it does have a guarantee of always showing the final state on the Change Feed eventually) |
Custom |
General form that all the preceding strategies are implemented in terms of | Anything transmute yields as the fst of its result (but, typically the equivalent of what Snapshot writes) |
Limited by your imagination, e.g. emitting events once per hour but otherwise like RollingState |
-
decide
/interpret
: Application function that inspects astate
to proposeevents
, under control of aTransact
loop. -
Transact
loop: Equinox Core function that runs yourdecide
/interpret
, and thensync
s any generatedevents
to the Store. -
state
: The (potentially cached) version of the State of the Aggregate thatTransact
supplied to yourdecide
/interpret
. -
events
: The changes thedecide
/interpret
generated (thatTransact
is trying tosync
to the Store). -
fold
: standard function, supplied per Aggregate, which is used to apply Events to a given State in order toevolve
the State per implications of each Event that has occurred -
state'
: The State of an Aggregate (post-state in FP terms), derived from the currentstate
+ the proposedevents
beingsync
ed. -
Tip
: Special document stored alongside the Events (in the same logical partition) which holds theunfolds
associated with the current State of the stream. -
sync
: Stored procedure we use to manage consistent update of the Tip alongside insertion of Event Batch Documents (contingent on the Tip's_etag
not having changed) -
unfold
: JSON objects maintained in theTip
, which represent Snapshots taken at a given point in the event timeline for this stream.- the
fold
/evolve
function is presented Snapshots as if it was yet-another-Event - the only differences are
- they are not stored as in (immutable) Event documents as other Events are
- every write replaces all the
unfold
s inTip
with the result of thetoSnapshot
(s) function as defined in the given Access Strategy.
- the
-
isOrigin
: A predicate function supplied to an Access Strategy that defines the starting point from which we'll build astate
.- Must yield
true
for relevant Snapshots or Reset Events.
- Must yield
-
initial
: The (Application-defined) state value all loaded eventsfold
into, if anisOrigin
event is not encountered while walking back through thunfolds
and Events and instead hit the start of the stream. -
Snapshot: a single serializable representation of the
state'
- Facilitates optimal retrieval patterns when a stream contains a significant number of events
- NOTE: Snapshots should not ever yield an observable difference in the
state
when compared to building it from the timeline of events; it should be solely a behavior-preserving optimization.
-
Reset Event: an event (i.e. a permanent event, not a Snapshot) for which an
isOrigin
predicate yieldstrue
- e.g., for a Cart aggregate, a CartCleared event means there is no point in
looking at any preceding events in order to determine what's in the cart;
we can start
fold
ing from that point.) - Multiple Reset Event Types are possible per Category, and a stream can
often have multiple reset points (e.g., each time a Cart is
Cleared
, we enter a known state) - A Tombstone Event can also be viewed as a Reset Event, e.g. if you have a
(long running) bank account represented as a Stream per year, one might
annually write a
TrancheCarriedForwardAndClosed
event which a) bears everything we care about (the final balance) b) signifies the fact that this tranche has now transitioned to read-only mode. Conversely, aClosed
event is not by itself a Tombstone Event - while you can infer the Open/Closed mode aspect of the Stream's State, you would still need to look further back through its history to be able to determine the balance that applied at the point the period was markedClosed
.
- e.g., for a Cart aggregate, a CartCleared event means there is no point in
looking at any preceding events in order to determine what's in the cart;
we can start
Strategy | Reads involve | Writes involve |
---|---|---|
Unoptimized |
Querying for, and fold ing all events (although the cache means it only reads events it has not seen) the Tip is never read, even e.g. if someone previously put a snapshot in there |
1) Insert a document with the events 2) Update Tip to reflect updated event count (as a transaction, as with all updates) |
LatestKnownEvent |
Reading the Tip (never the events) |
1) Inserting a document with the new event. 2) Updating the Tip to a) up count/invalidate the _etag b) CC the event for efficient access |
Snapshot |
1) read Tip; stop if isOrigin accepts a snapshot from within 2) read backwards until the provided isOrigin function returns true for an Event, or we hit start of stream |
1) Produce proposed state' 2) write events to new document + toSnapshot state' result into Tip with new event count |
MultiSnapshot |
As per Snapshot , stop if isOrigin yields true for any unfold (then fall back to folding from base event or a reset event) |
1) Produce state' 2) Write events to new document + toSnapshots state' to Tip (could be 0 or many, vs exactly one) |
RollingState |
Read Tip (can fall back to building from events as per Snapshot mode if nothing in Tip, but normally there are no events) |
1) produce state' 2) update Tip with toSnapshot state' 3) no events are written 4) Concurrency Control is based on the _etag of the Tip, not an expectedVersion / event count |
Custom |
As per Snapshot or MultiSnapshot 1) see if any unfold s pass the isOrigin test 2) Otherwise, work backward until a Reset Event or start of stream |
1) produce state' 2) use transmute events state to determine a) the unfold s (if any) to write b) the events (if any) to emit 3) execute the insert and/or upsert operations, contingent on the _etag of the opening state |
Document stores share many common traits. Thus, applying Mechanical Sympathy
in optimizing the storage representation and access patterns will naturally yield an overlapping set of access patterns that work well.
This commonality definitely applies when contrasting CosmosDB and DynamoDB.
As a result Equinox.DynamoStore
can and does implement pretty much the same the same feature set, API patterns and access strategies as Equinox.CosmosStore
.
The implementation uses the excellent FSharp.AWS.DynamoDB
library 🙏 @eiriktsarpalis @samritchie,
which wraps the standard AWS AWSSDK.DynamoDBv2
SDK Package. It also leans on significant preparatory research carried out under the fsharp.org mentorship program 🙏 @pierregoudjo.
The following focuses on explaining the differences in terms of low level technical detail; the actual usage experience is identical.
The vast majority of the API design and implementation concerns detailed regarding CosmosStore
apply (scroll up ☝️)
In broad terms, the charging structure and rate limiting scheme in DynamoDB has only minor differences that manifest in
terms of the call patterns that Equinox uses. The most relevant variance in the charge structure is that TransactWriteItems
costs twice the (Read and) Write Capacity Units of an equivalent single-item PutItem
/ UpdateItem
calls;
therefore its use needs to be considered carefully.
- Good article with specifics on the charging structures: How to Calculate a DynamoDB Item’s Size and Consumed Capacity
The Append operation adds events and/or updates the unfolds in the Tip. Instead of using a Stored Procedure as CosmosStore
does, the implementation involves conditional PutItem
and
UpdateItem
requests to accumulate
events in the Tip (where there is space available).
At the point where the Tip exceeds any of the configured and/or implicit limits, a TransactWriteItems
request is used (see implementation in FSharp.AWS.DynamoDB
),
to Calve a Batch of items from the Tip. The calving is triggered by any of:
- maximum event count (not limited by default)
- maximum accumulated event size (default 32KiB)
- DynamoDB Item Size Limit (hard limit of 400KiB)
Further information:
- DynamoDB Transactions: Use Cases and Examples by Alex DeBrie
provides a thorough review of the
TransactWriteItems
facility (TL;DR: it's far more general than the stream level atomic transactions afforded by CosmosDB's Stored Procedures) - while it doesn't provide deeper insight into the API from a usage perspective, Distributed Transactions at Scale in Amazon DynamoDB is a great deep dive into how the facility is implemented.
- DynamoDB does not support an etag-checked Read API, which means a cache hit is not as efficient in RC terms as it is on CosmosDB (and the data travels and is deserialized unnecessarily).
- Concurrency conflicts necessitate an additional roundtrip to resync as the DynamoDB Service does not yield the item in the event of a
ConditionalCheckFailedException
DynamoStore
'sPosition
structure (in theStreamToken
held by Equinox while aTransact
call is in flight) includes the (compressed) events in the Tip, holds byte counts for use in the size calculations required to guarantee a Calve request will happen if the400KiB
size limit (or, more realistically, lower limits dictated by the increasing charges based on Item size) is due to be breached. TheCosmosStore
equivalent consists of only the_etag
and the event count (the stored procedure usesJSON.stringify
to compute an indicative size, but the 2 MB UTF-8 JSON size limit is ample given the fact that RU costs get prohibitive long before you hit such a limit).
CosmosStore
dictates (as of V4) that event bodies be supplied as System.Text.Json.JsonElement
s in order that events
can be included in the Document/ Items as JSON directly. This is also to underscore the fact that the only reasonable format
to use is valid JSON; binary data would need to be base64 encoded.
DynamoStore
accepts and yields event bodies as arbitrary ReadOnlyMemory<byte>
BLOBs (the AWS SDK round-trips such blobs
as a MemoryStream
and does not impose any restrictions on the blobs in terms of required format)
CosmosStore
defaults to compressing (with System.IO.Compression.DeflateStream
) the event bodies for Unfolds;
DynamoStore
round-trips an Encoding
field per blob (one for the data, one for the metadata) Events and Unfolds in order
to enable the IEventCodec
to decompress the blobs as required. In both cases, minimizing Request Charges is imperative:
request size directly maps to financial charges, 429s, reduced throughput and a lowered scaling ceiling.
Equinox.Cosmos.Core.Events.appendAtEnd
/NonIdempotentAppend
has not been ported (there's no obvious clean and efficient way to do a conditional insert/update/split as the CosmosDB stored proc can, and this is a low usage feature)
Azure CosmosDB's ChangeFeed mechanism naturally supports replays of all the documents/Items in a Store (with ordering
guarantees at stream level, but not across streams). The API implementation spins up a querying loop per consumer, which
can efficiently do a 'SELECT *
query based on its intrinsic ordering to cover both bulk reading and tailing requirements.
On the other hand, the DynamoDB Streams facility retains 24h of individual insert/update/delete records with concurrent readers capped at ~2.
In order to be able to provide facilities equivalent to those of Propulsion.CosmosStore
's CosmosStoreReader
(a lightweight
wrapper over the CosmosDB ChangeFeed API) provides, there are ancillary components that collectively provide equivalent functionality.
CosmosDB intrinsically maintains and surfaces the documents/Items (and physical partition metadata as that shifts over time) in such a manner that any number of consumers can concurrently walk all the data across all the physical partitions and be guaranteed to have traversed every change (though notably not including deletes; this is of course fine as our model is append-only) when one has reached the current 'end' of each physical partition (even in the face of physical partition splits and/or document updates during the walk).
It should be noted that these walks are not free; each reader induces RU consumption on the Container that impacts the capacity available for other reads and writes. There is also an amplification effect: each write immediately triggers N reads of the same size.
DynamoDB does not provide an equivalent mechanism for an online traversal that's guaranteed to see all Items. (You can do an export to an S3 bucket that is guaranteed to include all items at a point in time, but you'd then need to supplement that with subsequent changes via DynamoDB Streams).
The mechanism employed is to provision DynamoDB Streams for the Table holding the events, which is fed to an AWS Lambda via a DynamoDB Streams Trigger that is configured to reliably ingest the writes in the order of their occurence at the stream level via an Indexer.
Relevant background articles from Amazon:
- High level architecture with diagrams: How to perform ordered data replication between applications by using Amazon DynamoDB Streams
- Tutorial with
aws
CLI invocations and JavaScript, covering the key AWS artifacts involved: Tutorial: Process New Items with DynamoDB Streams and Lambda - Scannable slide deck with some excellent diagrams not found elsewhere: Slide deck: Real-time Data Processing with Amazon DynamoDB Streams and AWS Lambda (yes, despite dating back to 2015)
The DynamoStoreIndexer
represents each batch (1-10,000 DDB Streams Records fed to the Lambda) as an Ingested
event
in a sequence of
$AppendsEpoch-<trancheId>-<epoch>
streams in near real time after they are Appended.
Each Span
in an Ingested
event within an AppendsEpoch
stream consists of:
- the Stream Name
- Index/offset within that stream
- the Event Types
NOTE: as illustrated in the slide deck, as DynamoDB varies the number of Shards, the number of concurrent instances of the Lambda can also rise. In the present implementation, multiple Lambda instances will be competing to write to the Tip of a single chain of epochs.
Its conceivable that one might internally partition the overall Index (based on the StreamName), but the current
implementation does not address this, hence the value of the FeedTrancheId
is always 0
$AppendsIndex-0
maintains
the epoch number where the Indexer will be appending.
Propulsion.DynamoStore.DynamoStoreSource
fulfils the role that CosmosStoreSource
fulfils when using Propulsion.CosmosStore
:
catching up on any events a consumer group has not yet seen (in a batched fashion), and thereafter tailing the index to at sub-second intervals.
The reader logic in DynamoStoreSource
walks the chain of Index Epoch Streams in sequence.
There is a configurable LoadMode
which offers:
-
All
: thePropulsion
StreamsProjector
's Scheduler/Dispatch loop is passed theStream Name
,Event Type
andIndex
of every event -
Filtered
: As perAll
, but you can filter which Streams are to be passed into the Scheduler, reducing resource consumptionNOTE both
All
andFiltered
perform identical I/O - they trigger a single DynamoDB Query reading Items beyond the current Read Position in a given$AppendsEpoch
stream, but they do not need to read the Table that stores the events themselves -
Hydrated
: as perFiltered
, but the Reader loads the fullITimelineEvent
information, adding: the event bodyData
,Meta
andTimestamp
NOTE this obviously induces latency in the reading process (which may be OK as that runs concurrently with processing of batches that have already been hydrated), and consumes Read Request Capacity on your DynamoDB Table.
As the Propulsion Projector completes the processing of the full set of items in a batch of items submitted to it,
checkpointing (asynchronously) moves the Position in the $ReaderCheckpoint
forward.
$ReaderCheckpoint
maintains the current Position (as an int64
that encodes the Epoch index and the offset
within that Epoch). Each consumer group has a single position.
The state lives in its own stream in the Index Table, named $ReaderCheckpoint-dynamoStore_0_<consumerGroup>
, where:
dynamoStore
is the well-knownFeed.SourceId
, another example iseventStoreDb
0
is the well-knownFeed.TrancheId
consumerGroup
is the application-supplied consumer group (equivalent to theProcessorName
in CosmosDB ChangeFeedProcessor parlance or consumer group name in Kafka terms)
By convention, the $AppendsIndex-0
, $AppendsEpoch-0_<epoch>
and $ReaderCheckpoint-dynamoStore_0_<consumerGroup>
streams are maintained in a separated <tableName>-index
Table in DynamoDB. The reasons for this separation include:
- being able to independently scale (and ring-fence) the activity in order to vary the ingestion and consumption capacity
- removing ripple effects where an inactive system keeps reading checkpoint update events, which then self-perpetuate as the updated position is written
- (future proofing: e.g. being able to re-index an entire dataset based on an S3 backup dataset)
Taking an example case where you have a single Container in CosmosDB / Table in DynamoDB, and 5 reactor applications that consume events as they are written, we can illustrate the differences by looking at the moving parts in involved.
With CosmosDB's ChangeFeed and Propulsion.CosmosStoreSource
:
- an
-aux
Container that maintains 5 sets of checkpoints and leases (a document per physical partition within the container). - a polling loop that reads every document from the (single) Events Container (there is an individual loop per physical partition). NOTE the leasing mechanism and the fact that processing is split by physical partition also means one can distribute processing activity across multiple processing nodes.
- because the CosmosDB ChangeFeed read APIs don't provide for filtering etc, every document that's updated triggers 5 sets of reads of the entire document (even if you only appended one event or only updated an unfold). This implies you need to be particularly careful about limiting how many readers you have and/or how large documents/Items in the store get.
- if you use Azure Functions to process the ChangeFeed, it's pretty much the same equations in terms of activity, charges and scaling but you can conceptually service all your reactions without a hosting application.
- There is nothing special you need to do to enable the ChangeFeed.
- There is no buffering of events involved at any level; it's all query loops driven by consumers.
- Architecture Astronauts frequently jump to an incorrect conclusion, that the single correct use of the ChangeFeed is thus to push events to Kafka.
With DynamoDB Streams, Propulsion.DynamoStore.Lambda
and Propulsion.DynamoStore.DynamoStoreSource
:
- The DynamoDB Table that stores the events must have DynamoDB Streams configured (in either
NEW_IMAGE
orNEW_AND_OLD_IMAGES
mode). - The DynamoDB Index Table does not require DynamoDB Streams configured (if you wanted to use it to drive replication,
you'd simply use
DynamoStoreSource
to tail the epochs directly). - The Lambda package must be provisioned, with permissions to write to the Index Table (and alerting so you will know if
it's
IteratorAge
metric exceeds your latency requirements and/or is approaching the 24h limit after which the guaranteed delivery of notifications would be lost). - The Lambda must coupled to the Events Table by establishing a DynamoDB Streams Trigger
- The indexing activity runs in AWS Lambda. Aside from throughput being limited by the read and write capacity of the Index Table, it's self managing. The running costs are pretty trivial.
- Each of the 5 reactor applications read from the Index table. Because the format is pretty compact, tens or hundreds of competing readers are conceivable as long as you configure the table to provide the relevant read capacity units (AutoScale mode may make sense).
- You will ideally want to have your reactions be predicated on just Stream Names and Event Types. Failing that, the use
of
Hydrated
mode will induce load on the Events Table in proportion to the number of reactor applications that require that (although a low number of readers doing that is unlikely to be problematic).
As described in AWS Lambda Supports Parallelization Factor for Kinesis and DynamoDB Event Sources and New AWS Lambda scaling controls for Kinesis and DynamoDB event sources, there is a powerful feature that enables one to configure a Lambda per stream to process reactions in parallel. This works up to a point and clearly has fewer moving parts. It should thus be seriously considered. The key weaknesses of the approach relative to a more idiomatic event sourcing approach are:
- the lack of ability to replay
- the fact that the DynamoDB Streams facility realistically limits you to 1/2 such Lambdas.
The stores supported by Equinox are primarily intended to house Domain Events (Facts) from an event-sourced model. Such events are retained indefinitely in an immutable form.
Often, the management of Ephemeral Events (that one might equivalently record on a bus, queue or a topic in systems such as Apache Kafka) involves needs that overlap significantly with those of managing Domain Events. However, there's a point at which maintaining equivalent levels of access to such data is of significantly lesser value than it is for Domain Events.
In theory, it can be argued that events with an ephemeral aspect are not True Event-Sourcing Events, and as such should be considered entirely separately.
In practice, for myriad reasons, stores such as EventStoreDB, CosmosDB and SqlStreamStore become candidates for and/or victims of the blurring of the divide between ephemeral events and Domain Events.
For the above reasons, a key aspect of designing, maintaining and evolving an event-sourced system involves the management of the overall set of events comprising the system's state:
- grouping events into streams in accordance with the goals of the system as a whole (i.e. how one models the system in terms of aggregates), with consideration for how well a given structure aligns with the characteristics of a given Store
- implementing policies reflecting the relevance of a stream and/or its events over time via various mechanisms: from shifting them to lower performance storage, archiving them to a separated store that's not accessible from the current online system all the way to outright deletion
- drawing the line with regard to ephemeral events representing state that truly does not belong alongside your Domain Events
While the store's capabilities and restrictions are where the rubber meets the road in your streams/events layout, it should not be the primary driver.
When considering which events should be united in a given stream-category, some key factors are:
- is there an invariant that the Aggregate is seeking to uphold? (remember, the stream is the principal unit of consistency control)
- do all the events relate to a meaningful atomic structure within your system?
- when making decisions based on grouping the events in a given way, is the resulting state a reasonable size? (this feeds into whether it's feasible to snapshot the state)
- is the state cohesive, or is it possible to partition the grouping even further? (think SRP and ISP)
- is there a natural characteristic of the aggregate that bounds the number of events that will occur over its lifetime? (e.g., "the appointments" vs splitting by day/month/year/facility)
When you don't load and fold events to arrive at a state.
In some cases, a stream may not even have a meaningful state, invariant or a business process that it's supporting:
- example: a stream is used to queue up commands and/or post outcomes as part of some process. In such a case, the 'state' boils down to checkpointing how far a given consumer has walked along the topic (as opposed to maintaining a rolling state derived primarily from the events that one queries, renders or uses to support a decision flow).
Such topic-streams are not aggregates as such, and are not addressed as a primary use case in the Equinox Programming Model.
However, such topic-streams are nonetheless subject to almost identical considerations in terms of how we deal with managing the lifetime of the data.
Across both Aggregate and Topic use cases, there are specific facilities afforded (and restrictions imposed) by the specific store you're using. For instance:
- Stream size limits - EventStoreDB: EventStore does not impose any limitations on the maximum size or event count that a single stream can bear. This allows one to maintain a perpetual queue and/or an ordered sequence of events, with or without using a retention policy to control the trimming of expired/excess events.
- Stream size limits - CosmosDB: The total size of the events and Tip-document of a stream must adhere to the CosmosDB logical partition limit of 20GB.
- Retention policies - EventStoreDB: Streams can have retention policies defined via each stream's metadata stream. The server cluster manages the application of these rules. The scavenging process removes the events, compacting the data by rewriting chunks with deleted, extraneous or aged-out events elided.
- Retention policies - CosmosDB: the CosmosDB TTL facility allows one to define a TTL at the document level. CosmosDB removes expired items automatically (whenever residual RU capacity allows).
NOTE: Equinox does not presently expose specific controls to allow specification of either a CosmosDB TTL or EventStoreDB stream metadata.
You don't rewrite events or streams in a Store, for reasons
For Domain Events in an event-sourced model, their permanence and immutability is typically considered axiomatic; readers expect to be able to cache them forever, rely on their index on a stream remaining fixed etc. Some (rare) corner cases where one might wish to deviate from such axioms in terms of Domain Events in a model include:
- rewriting streams as an expedient solution to a bug etc: as with the rewriting in history in git, the first rule is: DONT. (But it's technically possible and in some cases this nuclear option can solve a problem)
- intentionally removing data: for GDPR or CCPA reasons, you may opt to mutate or remove events as part of addressing a need to conclusively end-of-life some data (many better solutions are available...)
It should be noted with regard to such requirements:
- EventStoreDB does not present any APIs for mutation of events, though deleting events is a fully supported operation (although that can be restricted). Rewrites are typically approached by doing an offline database rebuild.
Equinox.CosmosStore
includes support for pruning events (only) from the head of a stream. Obviously, there's nothing stopping you deleting or altering the Batch documents out of band via the underlying CosmosDB APIs directly (Note however that the semantics of document ordering within a logical partition means its strongly advised not to mutate any event Batch documents as this will cause their ordering to become incorrect relative to other events, invalidating a key tenet that Change Feed Processors rely on).
No matter what the vendor tells you, it's literally not going to scale linearly...
A more typical concern for an event-sourced model is managing what should happen when an Aggregate falls out of scope. For instance, a pick ticket entity in a warehouse is only of historical interest after a certain period of time (the customer's Order History maintains long-lived state pertaining to orders and/or associated returns etc.)
With regard to such needs, here are some store-specific considerations:
-
EventStoreDB caches only in-use streams and events. Hosting streams that are no longer relevant is considered a completely normal use case:
- streams and/or regions of streams that are no longer relevant don't induce a major cost on the system
- each client maintains a single connection to the server cluster; there is no incremental cost in terms of the potential network or client process resource consumption related directly to the size of your dataset
- however, there is a non-zero cost; the overall dataset needs to be colocated and backed up as a whole (there are also internal index structures maintained alongside the event chunk files, with rebuild times directly related to the store's event count etc).
-
For CosmosDB, the costs and impacts of retaining events and/or streams that are no longer relevant are more direct; ways they manifest include:
- the billing model imposes a linear cost per GB that applies equally to all event-batch documents in your store, plus the size of the associated indexes (which strongly relate to the number of items stored). These costs are multiplied by the number of regions to which you replicate.
- the total size of your dataset affects the minimum number of nodes across which the data will spread. i.e. 1 TB of data will require at least 10,000 RU/s to be allocated to it regardless of traffic
- the more nodes you have, the more TCP connections and other related fixed resources each client instance requires
- the RU/s allocated to your container can only be spread equally across all nodes. Thus, if you have 100GB spread over 5 nodes and allocate 10,000 RU/s to the Container, each node gets 2,000 RU/s and callers get 429s if there happen to be more than that incurred for that given node in that second (with significant latency impact as all such rate-limited clients need to back off for >= 1s for each rate-limited attempt).
- the cost of over-provisioning to ensure appropriate capacity for spikes in load and/or to handle hotspots (where one node happens to host a stream that's accessed disproportionately heavily relative to data on other nodes) is multiplied by the number of nodes. Example: if you have a single node with 5GB of data with 2,000 RU/s allocated and want to double the peak capacity, you simply assign it 4,000 RU/s; if you have 100GB over 5 nodes, you need to double your 5x2,000 to 5x4,000 to achieve the same effect
- there are significant jumps in cost for writes based on the indexing cost as the number of items in a logical partition increases (empirically derived data; subject to change: for instance inserts of a a minimal (<100 bytes) event that initially costs ~20RU becomes > 40RU with 128 items, > 50RU with 1600 items, >60 at 2800 items and >110RU at 4900 items as snapshots or event sizes hit certain thresholds).
There are myriad approaches to resolving these forces. Let's examine the trade-offs of some relevant ones...
Perhaps we can Just leave it all behind and switch to a new blank database?
In some systems, where there's a relevant natural cycle in the domain, the answer to managing database growth may be simpler than you think. For instance:
- you may be able to start with a blank database for each trading day for the bulk of the events your system operates on.
- your domain may have a natural end of year business process that dictates a formal closing of accounts with selective migration of relevant summarized data to be carried forward into a successor epoch. In such instances, each closed year can be managed as a separated (effectively read-only) dataset.
As a fresh epoch of data becomes the active dataset, other options open up:
- one might migrate the now-of-secondary-importance data to cheaper hardware or network resources
- one might archive the database once you've validated the transition has been effected completely
Replace a perpetual stream with a series of finite epoch-streams, allowing superseded ones to be archived or deleted
As covered above, long streams bring associated costs. A key one that hasn't been mentioned is that, because the unit of storage is a stream, there's no easy way to distinguish historic events from current ones. This has various effects on processing costs such as (for Aggregate streams), that of loading and folding the state (or generating a snapshot).
Analogous to how data can be retired (as described in Database epochs), it may be possible to manage the growth cycle of continuous streams by having readers and writers coordinate the state of given stream cooperatively via the following elements:
- one Series aggregate: maintains the current active epoch id for the series
- many Epoch streams: independent streams (sharing a root name), sufficed by the epoch id
- having a deterministic way of coordinating to ensure each (independent) writer will recognize that a given epoch is closed (e.g., based on event count, elapsed time since the epoch started, total event payload bytes, etc.)
Depending on whether there's state associated with a given stream, the system periodically transitions the Series to a new Epoch by algorithms with mechanisms such as:
- Topic-stream: write a
Closed
event; have all writes be contingent on no such event preceding any write to an epoch-stream - Aggregate stream:
- write a
Closed
event to the outgoing epoch-stream, followed by (as a separate action with idempotent semantics) ... - write a
CarriedForward
event to open the new Epoch (again, all writers follow the same rules in order to be able to make writes idempotent even in the face of concurrent writers)
- write a
The writing of the event to move the active Epoch id forward in the Series aggregate can take place at any point after the Closed
event has been written to the outgoing epoch-stream (including concurrently with the writing of the CarriedForward
event). The reason for this is that the active epoch can be inferred by walking forward from any given epoch until one arrives at an epoch that's not Closed.
WIP implementation of a dotnet new
template illustrating the Stream Epochs approach
Move or delete out-of-scope data from a primary (hot) dataset to a cheaper (warm/cold) stream
As with 'Database epochs', once a given 'Stream epoch' has been marked active in a Series, we gain options as to what to do with the preceding ones:
- we may opt to retain them in order to enable replaying of projections for currently-unknown reasons
- if we intend to retain them for a significant period: we can replicate/sync/mirror/archive them to a archive store, then prune them from the primary dataset
- if they are only relevant to assist troubleshooting over some short term: we can delete them after a given period (without copying them anywhere)
When writing to a archival store, there's also an opportunity to vary the writing process from that forced by the constraints imposed when writing as part of normal online transaction processing:
- it will often make sense to have the archiver add a minimal placeholder to the archival store regardless of whether a given stream is being archived, which can then be used to drive the walk of the primary instead of placing avoidable load on the primary by having to continually loop over all the data in order to re-assess archival criteria over time
- when copying from primary to archive, there's an opportunity to optimally pack events into batches (for instance in
Equinox.CosmosStore
, batching writes means less documents, which reduces document count, per-document overhead, the overall data and index size in the container and hence query costs) - when writing to warm archival storage, it may make sense to compress the events (under normal circumstances, compressing event data is rarely considered a worthwhile tradeoff).
- where the nature of traffic on the system has peaks and troughs, there's an opportunity to shift the process of traversing the data for archival purposes to a window outside of the peak load period (although, in general, the impact of reads for the purposes of archival won't be significant enough to warrant optimizing this factor)
Outlining the roles of the
proArchiver
andproPruner
templates
It's conceivable that one might establish a single service combining the activities of:
- copying (archiving) to the archive store in reaction to changes in the primary
- pruning from the primary when the copying is complete
- deleting immediately
- continually visiting all streams in the primary in order to archive and/or prune streams that have fallen out of use
However, splitting the work into two distinct facilities allows better delineation of responsibilities:
- clarifies the relative responsibilities (and allows them to be considered individually)
- allows the load (deletes can be costly in RU terms on CosmosDB) on the primary dataset to be more closely controlled
An archiver tails a monitored store and bears the following responsibilities:
- minimizing the load on the source it's monitoring
- listens to all event writes (via
$all
in the case of EventStoreDB or a ChangeFeed Processor in the case of CosmosDB) - ensuring the archive becomes aware of all new streams (especially in the case of
Equinox.CosmosStore
streams inAccessStrategy.RollingState
mode, which do not yield a new event-batch per write)
The pruner cyclically (i.e., when it reaches the end, it loops back to the start) walks the archive store:
- visiting each stream, identifying the current write position in the archive
- uses that as input into a decision as to whether / how many events can be trimmed from the primary (deletion does not need to take place right away - Equinox will deal with events spread over a Primary/Archive pair of Containers via the Loading Fallback mechanism
- (for
Equinox.CosmosStore
) can optimize the packing of the events (e.g. if the most recent 4 events have arrived as 2 batches, the pruner can merge the two batches to minimize storage and index size). When writing to a primary collection, batches are never mutated for packing purposes both due to write costs and read amplification. - (for
Equinox.CosmosStore
) can opt to delete from the primary if one or more full Batches have been copied to the archive (note the unit of deletion is a Batch - mutating a Batch in order to remove an event would trigger a reordering of the document's position in the logical partition)
This is a very loose laundry list of items that have occurred to us to do, given infinite time. No conclusions of likelihood of starting, finishing, or even committing to adding a feature should be inferred, but most represent things that would be likely to be accepted into the codebase.
- Extend samples and templates; see #57
- Provide a low level walking events in F# API akin to
Equinox.CosmosStore.Core.Events
; this would allow consumers to jump from direct use ofEventStore.Client
->Equinox.EventStore.Core.Events
->Equinox.Decider
(with the potential to swap stores once one gets to usingEquinox.Decider
) - Get conflict handling as efficient and predictable as for
Equinox.CosmosStore
#38 - provide for snapshots to be stored out of the stream, and loaded in a
customizable manner in a manner analogous to
the proposed comparable
Equinox.CosmosStore
facility - Provide a facility in
FsCodec.IEventCodec
to walk the Event DU to generate a list of event types; use that to generate the server-side event loading filter e.g. when a Decider used a highly selective subset of the known Event Types - (If Server started to support it), provide a hint when loading as to the
isOrigin
Event Type so backward load can stop when it meets an Embedded Snapshot or Reset (e.g.CartCleared
) event - Port
MessageDb.AccessStrategy.AdjacentSnapshots
, automatically configuring the metadata of the snapshots stream to amaxCount
of 1 event for automatic purging of superseded snapshots
- Provide support for an
isOrigin
overload that works on the event type string; implement a two phase load algorithm that loads the events first, and then the bodies only from the origin point forward
- Switching to using MS V4 SDK eventually (Parked in #197). See also #232
- Refactor the usage of the Stored Proc to instead use the V3 API's Transactional Batch support (will likely happen as a backport from
Equinox.DynamoStore
) - Enable snapshots to be stored outside of the main collection in
Equinox.CosmosStore
#61 - Multiple writers support for
u
nfolds (at present async
completely replaces the unfolds in the Tip; this will be extended by having the stored proc maintain the union of the unfolds in play (both for semi-related services and for blue/green deploy scenarios); TBD how we decide when a union that's no longer in use gets removed) #108 - low level performance improvements in loading logic (reducing allocations etc)
Propulsion.CosmosStore
: provide a Serverless mode that can be used with Azure Functions to execute batch of projections based on a set of documents from the change feed