Skip to content

Akka.Cluster.Sharding: if remember-entities is enabled, shard won't start for initial persistence transient failures #7399

Closed
@marcotod1410

Description

Version Information
Reproduced with Akka.NET version 1.5.27.1, 1.5.31

Describe the bug
If remember-entities is on and the remember-entities store actor fails the recovery process, due to a transient failure, it is unable to interact with shard actors.

To Reproduce
Paste the following code into a console application and run:

using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Persistence.Snapshot;
using Akka.Persistence;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;

namespace TestAkkaRememberEntitiesErrors
{
    internal class Program
    {

        static async Task Main(string[] args)
        {
            // set both journal and snapshot store in failing state
            FailingJournal.Working = false;
            FailingSnapshot.Working = false;

            var system = ActorSystem.Create("MySystem", Config);

            // start the shard: an error message is expected, since it tries to read which entity needs to be remembered
            var shard = await ClusterSharding.Get(system).StartAsync(
                typeName: "shard-test",
                entityProps: Props.Create(() => new ShardingActor()),
                messageExtractor: new MessageExtractor(100),
                settings: ClusterShardingSettings.Create(system));

            // wait a short period of time before reactivating journal and snapshot
            var seconds = 10;
            Console.WriteLine("Waiting {0} seconds...", seconds);

            await Task.Delay(TimeSpan.FromSeconds(seconds));

            // enable both journal and snapshot
            FailingJournal.Working = true;
            FailingSnapshot.Working = true;

            // try to ask something to an entity inside the shard
            Console.WriteLine("Asking message to shard");
            var responseTask = shard.Ask<string>(new ShardingActor.TestMessage("test-entity", "test-content"));
            Console.WriteLine("Waiting for a response...");     // here it waits indefinitely since the ShardCoordinator is not able to talk to the remember-entities store, which failed
            var response = await responseTask;

            Console.WriteLine("\n\nResponse: {0}\n\n", response);

            await system.Terminate();
        }

        public static readonly Config Config = @$"
akka.loglevel = DEBUG
akka.persistence.journal.plugin = ""akka.persistence.journal.failure""
akka.persistence.journal.failure.class = ""{typeof(FailingJournal).AssemblyQualifiedName}""
                            
akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.failure""
akka.persistence.snapshot-store.failure.class = ""{typeof(FailingSnapshot).AssemblyQualifiedName}""

akka.cluster.sharding {{
    journal-plugin-id = akka.persistence.journal.failure
    snapshot-plugin-id = akka.persistence.snapshot-store.failure

    remember-entities = on
    state-store-mode = ddata
    remember-entities-store = eventsourced
    distributed-data.durable.keys = []
}}

akka.actor.provider = cluster

akka.remote.dot-netty.tcp {{
    port = 12345
    hostname = 127.0.0.1
}}

akka.cluster {{
    seed-nodes = [""akka.tcp://[email protected]:12345""]
}}


";
    }

    #region Actors

    class MessageExtractor : HashCodeMessageExtractor
    {
        public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
        {
        }

        public override string EntityId(object message)
        {
            return message switch
            {
                ShardingActor.TestMessage x => x.EntityId,
                _ => null
            };
        }
    }

    class ShardingActor : ReceiveActor
    {

        public record TestMessage(string EntityId, string Content);

        public ShardingActor()
        {
            Receive<TestMessage>(x =>
            {
                Sender.Tell(x.Content);
            });
        }

    }

    #endregion

    #region Failing journals
    public class FailingJournal : SharedMemoryJournal
    {

        public static bool Working = false;

        public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback)
        {
            if (!Working)
            {
                throw new ApplicationException("Failed");
            }

            return base.ReplayMessagesAsync(context, persistenceId, fromSequenceNr, toSequenceNr, max, recoveryCallback);
        }

        protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
        {
            if (!Working)
            {
                throw new ApplicationException("Failed");
            }

            return base.WriteMessagesAsync(messages);
        }

    }

    public class FailingSnapshot : SnapshotStore
    {

        public static bool Working = false;

        protected override Task DeleteAsync(SnapshotMetadata metadata)
        {
            if (!Working)
            {
                throw new ApplicationException("Failed");
            }

            return Task.CompletedTask;
        }

        protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
        {
            if (!Working)
            {
                throw new ApplicationException("Failed");
            }

            return Task.CompletedTask;
        }

        protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
        {
            if (!Working)
            {
                throw new ApplicationException("Failed");
            }

            return null;
        }

        protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)
        {
            if (!Working)
            {
                throw new ApplicationException("Failed");
            }

            return Task.CompletedTask;
        }
    }
    #endregion
}

Expected behavior
Console should print: Response: test-content, meaning that the entity correctly rises and replies to the test message.

Actual behavior
It hangs indefinitely with two different error log messages, repeated:

[WARNING][11/29/2024 11:51:17.723Z][Thread 0006][akka.tcp://[email protected]:12345/system/sharding/shard-test] shard-test: Requested shard homes [71] from coordinator at [[akka://MySystem/system/sharding/shard-testCoordinator/singleton/coordinator#2081868844]]. [1] total buffered messages.
[ERROR][11/29/2024 11:51:19.654Z][Thread 0016][akka.tcp://[email protected]:12345/system/sharding/shard-testCoordinator/singleton/coordinator] shard-test: The ShardCoordinator was unable to update remembered shard [71] within 'updating-state-timeout': 00:00:05 millis, retrying

Environment
Console application running on Windows 10, dotnet 6.

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions