Notes from integrating RavenDB with Akka.NET Persistence

by Karmel Indych

This article will explore the development journey of building the plugin. If you are interested in how to actually use the RavenDB plugin, you can find it here.

You can also find the complete integration project on GitHub if you want to dig into the code right away.

This article is written from the point of view of a developer that is implementing Akka.NET persistence using RavenDB. It assumes a certain level of familiarity with RavenDB and mostly deals with the implementation details and the approach to building the integration.

Introduction

Akka.NET is a powerful framework for building concurrent and distributed applications on the .NET platform, offering actors as a model for managing concurrency and resilience. RavenDB, on the other hand, is a NoSQL database known for its simplicity and scalability in handling document-oriented data. In the realm of Akka.NET, persistence plugins play a crucial role in ensuring the durability of actor state, allowing applications to recover from failures and maintain state across restarts. This article delves into the process of developing a custom persistence plugin for Akka.NET using RavenDB, providing an insight into the integration of these two technologies.

Before going into the implementation details and the design choices I would like first briefly to introduce all the pieces of the puzzle here that we need to take into account:

  • AsyncWriteJournal the backbone interface for persisting messages to durable storage, ensuring reliable state recovery and fault tolerance in distributed applications.
  • SnapshotStore for optimizing recovery performance by reducing the amount of state replay needed from the journal.
  • The various interfaces for querying and retrieving live or historical data streams from persistent actors.

While reading about those pieces keep in mind that RavenDB is a distributed database and every method that we will implement there will need to be crafted in order to deal with its distributed nature.

If you are already familiar with those concepts you can skip directly to the implementation details

Knowledge Requirements

To effectively follow this article, readers should have a basic understanding of Akka.NET concepts, including actors and persistence as well as basic database concepts.

AsyncWriteJournal

AsyncWriteJournal is the backbone actor used for implementing asynchronous journaling capabilities within Akka.NET’s persistence framework. It defines methods for persisting messages asynchronously to a durable storage. This actor is essential for ensuring that messages sent to actors can be reliably stored and recovered in case of system failures or restarts, thereby maintaining the consistency and durability of actor state across distributed systems.

Each such message is immutable and represented by IPersistentRepresentation, the most important parts of the message are:

// The ID of the actor
string PersistenceId;
// The event data that we want to store. Can be of type 'Tagged'
// which will indicate that this message has additional tags attached
object Payload;
// the sequence number of this message, serves as a gap-less
// unique identifier, ensures order and consistency of the actors state.
long SequenceNr;

Let’s have a brief overview of what methods are required here:

// Persist atomically the messages for a given actor.
Task<IImmutableList<Exception?>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages);

// Upon actor recovery we need to read the message with 
// the highest sequence number of a given actor.
Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr);

// Recover the state of a specific actor.
Task ReplayMessagesAsync(
            IActorContext context,
            string persistenceId,
            long fromSequenceNr,
            long toSequenceNr,
            long max,
            Action<IPersistentRepresentation> recoveryCallback);

// Delete message for an actor upto a given sequence number.
Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr);

SnapshotStore

SnapshotStore is a component responsible for managing the storage and retrieval of snapshots of actor state. Actors, especially those with large or complex states, should periodically create snapshots to capture their current state. These snapshots offer a more efficient mechanism for recovering actor state compared to replaying all past events from the journal.

The methods that we need to implement here are:

// Load the latest snapshot according to the given criteria
Task<SelectedSnapshot> LoadAsync(string persistenceId, 
                        SnapshotSelectionCriteria criteria);

// Store the snapshot
Task SaveAsync(SnapshotMetadata metadata, object snapshot);

// Delete the specific snapshot
Task DeleteAsync(SnapshotMetadata metadata);

// Delete all snapshots by the given criteria
Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria);

IReadJournal interfaces

Allows to query and retrieve live or historical data streams from persistent actors. It’s a powerful tool for performing real-time analytics, monitoring, and querying the state of actors and their persistent messages. Let’s explore the interfaces and concepts related to IReadJournal. All queries come in two flavors, “Real-Time” and “Current”.

  • “Current” is a query result that exists at the querying time.
  • “Real-Time” however, is a long running query that should yield you current results and any new results that are available to the given query.

Among the common queries you’ll find:

  • PersistenceIds Retrieves all actors.
  • EventsByPersistenceId Retrieves a stream of events (messages) for a specific persistence id, starting from a given sequence number.
  • EventsByTag Retrieves a stream of events (messages) tagged with a specific tag.
  • AllEvents Retrieves a stream of all events (messages) stored across all persistence ids.

Another thing worth mentioning here is that for each query we can pass an offset that indicates a position within a stream of events or messages, which we can start the query from.

The Details

Once we have all the necessary components from Akka.NET, our focus shifts to integrating with RavenDB.

RavenDB is a distributed database, so writes, reads and queries can target different nodes. Depending on the scenario, strict consistency may be crucial for recovering the latest actor state, while for queries, the consistency model can be relaxed.

The main challenge with building RavenDB Akka.NET persistence was about making sure that in a distributed system and in the presence of failures, we are still able to abide by the consistency required by Akka.NET. In the following section, we’ll explore some of the challenges that we had to deal with when building this feature.

Complexity of dealing with distributed consistent state

We began the development process with the desire to be performant and available, so we started with a single node transaction. However, ensuring consistency across nodes requires additional effort. Writes can go to different nodes and lead to a situation when Akka.NET writes e1, e2, e3, e4. But e1 & e3 goes to node A, while e2 & e4 are sent to node B. (Node C get the messages by replication from both nodes)

This situation is problematic because it can result not only in an outdated actor state during recovery but also in an incorrect state, as combinations like e1 & e3 affecting the actor have never existed.

We can add a check at commit time to ensure that writes are rejected if dependent writes haven’t been processed (e.g. reject persisting e3 if we haven’t e2), aiming for a state like this:

However, recovery to the most up-to-date state can’t be performed from node B or C, so we needed to get ReadHighestSequenceNrAsync from all nodes, take the maximum and wait for that message to be propagated.

Okay, but what if Node A is down? We can ensure writing to the majority of the database nodes, but what if the majority of nodes is down? should we wait for the write to be propagated to all nodes?

Another challenge arises when for example Akka.NET attempts to write e5. Initially this message is sent to Node A, which accepts and persists the message but fails to return OK status. In that case, RavenDB client will retry and failover sending the request to Node B.

The nodes will eventually sync and figure out that they have a conflict on e5, the conflict in this particular case doesn’t really matter because the message content is identical. In an edge case when we fail to return OK and then recover an actor, it might result in a conflict that matters and can cause an inconsistent actor state.

To address this concern, we need to introduce even more checks to resolve the potential conflicts. At this point it starts to get way too complex to handle things properly with this approach. Akka.NET assumes global consistency in order to operate. RavenDB’s single node transaction model is meant to favor availability over consistency, and trying to “fix” that is going to end up in tears.

RavenDB has a global consistency model, however. You can pick & choose which model you want on a case by case basis. Using cluster-wide transactions, RavenDB is able to ensure global consistency easily and match the requirement from Akka.NET Persistence.

After all, consistency eclipse any other requirement like availability or performance. The most important thing here is to ensure the actor’s correctness upon recovery.

With the usage of cluster wide transactions there are two important caveats to remember

  1. You must have a connected majority of nodes in the cluster, otherwise you will not be able to persist any new events.
  2. Theoretically, upon AsyncWriteJournal crash and restart, on the very first request, it is still possible to read not the latest message, but even then, we will be guarded by the RavenDB atomic guards which will prevent us from persisting any new messages if you aren’t aware of the current state of the system. Which in its turn will cause the actor to crash and restart itself again. Eventually we will recover to the latest state.

I’m pretty proud with this behavior, because we didn’t need to implement anything for a pretty complex and rare scenario. Even if you managed to end up in a pretty bad state, RavenDB will ensure that all invariants hold and the system will be brought back fully when the failure is fixed, without needing any manual intervention.

After we consolidate the persisting process we still need to focus a bit more on the recovery, it also has to be as fast as possible.

Consider you have millions of stored events of many different actors. All of them are stored in the same collection, but now we need to recover actor p-1 from the sequence number 123 onwards.

We can use an index on the sequence number, but RavenDB indexes are BASE and could be stale, however we can elegantly work around this by using a smart ID format for the events, to get optimal performance by reading the events directly from the RavenDB’s main B+ Tree.

We format the event ids like so: <Collection-Name>/<ActorID>/<Leading-Zero-Sequence-Number>

So for an event number 123 of the actor p-1 the ID would be: events/p-1/0000000000000000123

We need leading zeros to ensure the proper lexicographical order of the events at B+ Tree level. Consider the following events:

  • events/p-1/1
  • events/p-1/10
  • events/p-1/2

If we will get them by the prefix of events/p-1/ we will get them out of order, so leading zeros sort this out.

  • events/p-1/0000000000000000001
  • events/p-1/0000000000000000002
  • // ..redacted..
  • events/p-1/0000000000000000010

RavenDB allows us efficient queries using id prefixes, so we are taking full advantage of this capability to ensure quick recovery of actors after a restart.

Here is the usage of the streaming API, to fetch all of events for the actor p-1 starting from the sequence number 123:

session.Advanced.Stream<Event>(
  startsWith: "events/p-1/", 
  startAfter: "events/p-1/0000000000000000123"
  );

The argument startsWith is the prefix of the document, and startAfter is basically setting the cursor to be after the given value.

So far we covered persisting and recovering through the AsyncWriteJournal. The SnapshotStore in that regard is no different and the same logic applies to the snapshots.

Query Events

Now it is time to discuss reading or querying events via the IReadJournal.
Akka.NET offers few interfaces to do so, the most interesting ones are EventsByTag and EventsByPersistenceId, each of them comes in two flavors.

The “Current” flavor simply returns everything that you have right now, it is easy to implement. Just send the query and return the results. However, the “Real-Time” flavor means to return current results and continually return any new results that will answer the query.

To achieve the “Real-Time” mode we subscribe to the Changes API to any node and continuously fetch new results when they are available. Here is what this looks like in code.

var mre = new AsyncManualResetEvent(false);

using var chages = documentStore.Instance
    .Changes()
    .ForDocumentsStartingWith("events/" + persistenceId + "/")
    .Subscribe(_ => mre.Set());

while (true)
{
    mre.Reset();
    await QueryAsync().ConfigureAwait(false);

    await mre.WaitAsync().ConfigureAwait(false);
}

We call this method Continuous Query and it is similar but distinct from a feature that RavenDB already has: Subscriptions.

RavenDB has native support for subscriptions. A query that you can define and will send you all matching results as soon as they are persisted. The difference between Continuous Query and Subscription is that the later is meant for background processing with each change being reported once and the entire state being held on the server.

Continuous Query, on the other hand, may be run a number of times, so the state is managed on the client and it is entirely possible to have concurrent queries on the same topic going on at once (for different purposes).

Every “Real-Time” interface employs the Continuous Query so let’s have a look at the most interesting and challenging implementations.

EventsByPersistenceId

From the documentation about the following interface

Source<EventEnvelope, NotUsed> EventsByPersistenceId(
  string persistenceId, 
  long fromSequenceNr, 
  long toSequenceNr);

Query events for a specific actor identified by persistenceId. It can retrieve a subset of all events by specifying fromSequenceNr and toSequenceNr or use 0L and long.MaxValue respectively to retrieve all events. The returned event stream must be ordered by sequence number. The stream is not completed when it reaches the end of the currently stored events, but it continues to push new events when new events are persisted.

Remember the ID format that we chose for the events? This would be perfect here, since order is guaranteed and we can use the prefix trick once more. We would combine changes API on documents starting with the actor prefix e.g. events/p-1/.

The Changes() code from the previous snippet shows how we can wait for new results using: .ForDocumentsStartingWith("events/" + persistenceId + "/"). This will notify us every time the server stores a document with such a prefix. Once we are notified we simply start streaming from the last sequence number we hold in memory.

EventsByTag

The most interesting and challenging method to implement is the EventsByTag one. In Akka.NET you can tag events and later query on them regardless of which actor they belong to.

Source<EventEnvelope, NotUsed> EventsByTag(string tag, Offset offset);

You can resume the events query stream from a given offset after a crash or restart.

In order to be able to query by tag we obviously need an index, so let’s create one:

public class EventsByTag : AbstractIndexCreationTask<Types.Event>
  {
      public EventsByTag()
      {
          Map = events => 
              from e in events 
              select new
              {
                  e.Tags,
              };
      }
  }

The index definition is simple, we index the Tags array so we can make a query like the following:

from index 'EventsByTag' where Tags in ("blue")

This was easy enough, but we have another argument in that method that we need to take into account, the offset.

A naive approach here would be, that we simply can use skip and since we are using cluster-wide transactions, that could even work, because the order of events is guaranteed to be identical on all nodes. So, skip 5 on node A would yield the exact same result as on node B (eventually).

But what happens if we delete events? If we had 6 events skip 5 would yield us one event, but if we delete the last 5 events, skip 5 would yield zero events, this is obviously not what we want. So we need a way to query from the last document that we had.

Why not simply use timestamps? query the events after the last timestamp?

If you remember the IPersistentRepresentation, it even has a property Timestamp and ordering by it would work if we could guarantee a single AsyncWriteJournal actor and a stable clock or at least no clock drift correction. But, since we can’t guarantee that, we need something else.

Partial and Total Order in RavenDB

Essentially we need to have a way to get the events in deterministic order. Order in distributed systems is known to be a challenging one to deal with. By default RavenDB prefers availability over consistency and it uses Change Vectors (a mixture of version vector and vector clock) to keep track of the document’s version and to maintain a partial order. Personally it is a fascinating topic, but since we are using cluster-wide transactions in the Akka.NET plugin we will focus on the total order that RavenDB provided by the Raft consensus algorithm.

Each document that is persisted via the cluster-wide transaction will get a globally stable change vector with the following format:

RAFT:100, TRXN:150

RAFT is the number of cluster-wide documents that we had so far and is strictly monotonically increasing value, while TRXN is basically the transaction number. Going back to the offset requirements, we simply need to remember the last RAFT value and query for the next one. Easier said than done.

The real change vector looks a bit differently:

RAFT:100-kyXmGSx4DUCgDY7iOjmeXA, TRXN:50-7+RfhJSq60mTCxyPV5M3Aw

The tags RAFT & TRXN aren’t uniquely identifiable between databases and between clusters, so in order to uniquely identify we need to use the BASE64 part. We need to extract it from the change vector during indexing time. The query that we want to generate at the end should look like this:

from index 'EventsByTag' where Tags in ("blue") and 'kyXmGSx4DUCgDY7iOjmeXA' > 100

Our ChangeVectorOffset class should contain the last seen change vector elements

public class ChangeVectorOffset : Offset
{ 
      public Dictionary<string, long> Elements;
}

As a first step we need to create a method that parses the change vector string. We can easily do that with a simple regex

public static class ChangeVectorAnalyzer
{
    public static Regex Pattern = new Regex(
            @"\w{1,4}:(\d+)-(.{22})", RegexOptions.Compiled);
    public static Dictionary<string, long> ToDictionary(string changeVector)
    {
        var dic = new Dictionary<string, long>();
        var matches = Pattern.Matches(changeVector);
        foreach (Match match in matches)
        {
            var databaseId = match.Groups[2].Value;
            var etag = long.Parse(match.Groups[1].Value);
            dic.Add(databaseId, etag);
        }
        return dic;
    }
}

But we have a problem. We can’t know in advance what is the Base64 id here, that is the field upon which we want to query. Luckily RavenDB allows us to create dynamic index fields during runtime with CreateField(field_name, value).

Putting all the pieces together this is the index definition

public class EventsByTagAndChangeVector : AbstractIndexCreationTask<Event>
{
  public EventsByTagAndChangeVector()
  {
    Map = events =>
      from e in events
      let changeVector = MetadataFor(e).Value<string>("@change-vector")
      select new
      {
        e.Tags,
        _ = ChangeVectorAnalyzer.ToDictionary(changeVector)
                .Select(x => CreateField(x.Key, x.Value))
      };
    AdditionalSources = new Dictionary<string, string>()
    {
       {"ChangeVectorAnalyzer",ChangeVectorOffset.Code}
    };
  }
}

Using the _ = ChangeVectorAnalyzer.ToDictionary(changeVector).Select(x => CreateField(x.Key, x.Value)) line we create the index field dynamically. A field per element in the change vector.

We use the AdditionalSources to upload to RavenDB the ChangeVectorAnalyzer class, so RavenDB can recognize our method ChangeVectorAnalyzer.ToDictionary for parsing the change vector.

So we are breaking down the change vector, indexing it. Now it is time to query. Also here we need to build our query dynamically and we are doing so with the low level DocumentQuery API. First let’s create the part of finding by tag:

var q = session.Advanced.DocumentQuery<Event>(nameof(EventsByTagAndChangeVector))
                      .ContainsAny(e => e.Tags, new[] { _tag });

Translating this to RQL will give us

from index 'EventsByTagAndChangeVector' where Tags in ("blue")

Now let’s add the change vector part:

var first = true;
foreach (var element in changeVectorOffset.Elements)
{
    if (first)
    {
        q = q.AndAlso().OpenSubclause();
        first = false;
    }
    else
    {
        q = q.OrElse();
    }
    q.WhereGreaterThan(element.Key, element.Value);
}
if (changeVectorOffset.Elements.Count > 0)
{
    q = q.CloseSubclause();
}

The above code will build us dynamically queries with the following format:

from index 'EventsByTagAndChangeVector' 
where Tags in ("blue") and (
  'kyXmGSx4DUCgDY7iOjmeXA' > 100 or 
  '7+RfhJSq60mTCxyPV5M3Aw' > 50
)

So now we can properly query with an offset in the distributed RavenDB database.

There are few other query interfaces that Akka.NET offers but all of them are reusing the techniques shown here and/or in the EventsByPersistenceId part.

Final Words

As RavenDB developers, we typically focus on server-side aspects such as debugging, performance bottlenecks, and optimizing for the garbage collector. However, engaging with RavenDB from the client side proved to be remarkably enjoyable.

Implementing Akka.NET persistence was a thrilling journey for us, and we’re pleased that, thanks to our product’s feature set, we navigated through it smoothly without major hurdles or complex workarounds.

We believe RavenDB users will find this addition valuable and welcome any feedback or further contribution to the project.

Woah, already finished? 🤯

If you found the article interesting, don’t miss a chance to try our database solution – totally for free!

Try now try now arrow icon