Akka.Cluster.Sharding: if remember-entities is enabled, shard won't start for initial persistence transient failures #7399
Closed
Description
Version Information
Reproduced with Akka.NET version 1.5.27.1, 1.5.31
Describe the bug
If remember-entities is on and the remember-entities store actor fails the recovery process, due to a transient failure, it is unable to interact with shard actors.
To Reproduce
Paste the following code into a console application and run:
using Akka.Actor;
using Akka.Cluster.Sharding;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Persistence.Snapshot;
using Akka.Persistence;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
namespace TestAkkaRememberEntitiesErrors
{
internal class Program
{
static async Task Main(string[] args)
{
// set both journal and snapshot store in failing state
FailingJournal.Working = false;
FailingSnapshot.Working = false;
var system = ActorSystem.Create("MySystem", Config);
// start the shard: an error message is expected, since it tries to read which entity needs to be remembered
var shard = await ClusterSharding.Get(system).StartAsync(
typeName: "shard-test",
entityProps: Props.Create(() => new ShardingActor()),
messageExtractor: new MessageExtractor(100),
settings: ClusterShardingSettings.Create(system));
// wait a short period of time before reactivating journal and snapshot
var seconds = 10;
Console.WriteLine("Waiting {0} seconds...", seconds);
await Task.Delay(TimeSpan.FromSeconds(seconds));
// enable both journal and snapshot
FailingJournal.Working = true;
FailingSnapshot.Working = true;
// try to ask something to an entity inside the shard
Console.WriteLine("Asking message to shard");
var responseTask = shard.Ask<string>(new ShardingActor.TestMessage("test-entity", "test-content"));
Console.WriteLine("Waiting for a response..."); // here it waits indefinitely since the ShardCoordinator is not able to talk to the remember-entities store, which failed
var response = await responseTask;
Console.WriteLine("\n\nResponse: {0}\n\n", response);
await system.Terminate();
}
public static readonly Config Config = @$"
akka.loglevel = DEBUG
akka.persistence.journal.plugin = ""akka.persistence.journal.failure""
akka.persistence.journal.failure.class = ""{typeof(FailingJournal).AssemblyQualifiedName}""
akka.persistence.snapshot-store.plugin = ""akka.persistence.snapshot-store.failure""
akka.persistence.snapshot-store.failure.class = ""{typeof(FailingSnapshot).AssemblyQualifiedName}""
akka.cluster.sharding {{
journal-plugin-id = akka.persistence.journal.failure
snapshot-plugin-id = akka.persistence.snapshot-store.failure
remember-entities = on
state-store-mode = ddata
remember-entities-store = eventsourced
distributed-data.durable.keys = []
}}
akka.actor.provider = cluster
akka.remote.dot-netty.tcp {{
port = 12345
hostname = 127.0.0.1
}}
akka.cluster {{
seed-nodes = [""akka.tcp://[email protected]:12345""]
}}
";
}
#region Actors
class MessageExtractor : HashCodeMessageExtractor
{
public MessageExtractor(int maxNumberOfShards) : base(maxNumberOfShards)
{
}
public override string EntityId(object message)
{
return message switch
{
ShardingActor.TestMessage x => x.EntityId,
_ => null
};
}
}
class ShardingActor : ReceiveActor
{
public record TestMessage(string EntityId, string Content);
public ShardingActor()
{
Receive<TestMessage>(x =>
{
Sender.Tell(x.Content);
});
}
}
#endregion
#region Failing journals
public class FailingJournal : SharedMemoryJournal
{
public static bool Working = false;
public override Task ReplayMessagesAsync(IActorContext context, string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> recoveryCallback)
{
if (!Working)
{
throw new ApplicationException("Failed");
}
return base.ReplayMessagesAsync(context, persistenceId, fromSequenceNr, toSequenceNr, max, recoveryCallback);
}
protected override Task<IImmutableList<Exception>> WriteMessagesAsync(IEnumerable<AtomicWrite> messages)
{
if (!Working)
{
throw new ApplicationException("Failed");
}
return base.WriteMessagesAsync(messages);
}
}
public class FailingSnapshot : SnapshotStore
{
public static bool Working = false;
protected override Task DeleteAsync(SnapshotMetadata metadata)
{
if (!Working)
{
throw new ApplicationException("Failed");
}
return Task.CompletedTask;
}
protected override Task DeleteAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
if (!Working)
{
throw new ApplicationException("Failed");
}
return Task.CompletedTask;
}
protected override async Task<SelectedSnapshot> LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria)
{
if (!Working)
{
throw new ApplicationException("Failed");
}
return null;
}
protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot)
{
if (!Working)
{
throw new ApplicationException("Failed");
}
return Task.CompletedTask;
}
}
#endregion
}
Expected behavior
Console should print: Response: test-content
, meaning that the entity correctly rises and replies to the test message.
Actual behavior
It hangs indefinitely with two different error log messages, repeated:
[WARNING][11/29/2024 11:51:17.723Z][Thread 0006][akka.tcp://[email protected]:12345/system/sharding/shard-test] shard-test: Requested shard homes [71] from coordinator at [[akka://MySystem/system/sharding/shard-testCoordinator/singleton/coordinator#2081868844]]. [1] total buffered messages.
[ERROR][11/29/2024 11:51:19.654Z][Thread 0016][akka.tcp://[email protected]:12345/system/sharding/shard-testCoordinator/singleton/coordinator] shard-test: The ShardCoordinator was unable to update remembered shard [71] within 'updating-state-timeout': 00:00:05 millis, retrying
Environment
Console application running on Windows 10, dotnet 6.