Skip to content

Commit

Permalink
Fixed: IWrappedMessage + IDeadLetterSuppression handling (#7414)
Browse files Browse the repository at this point in the history
* WIP dead letter suppression testing

* added `WrappedMessage.IsDeadLetterSuppressedAnywhere`

* cleaned up message-handling for `DeadLetterActorRef`

* added checks for wrapped messages in DeadLetter handling

* fixed issue with recursive dead letter detection

* added test cases for `ActorSelection` also

* checking in `SuppressedDeadLetter` API changes
  • Loading branch information
Aaronontheweb authored Dec 20, 2024
1 parent 96645a5 commit 0cf567a
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3686,7 +3686,7 @@ namespace Akka.Event
}
public sealed class SuppressedDeadLetter : Akka.Event.AllDeadLetters
{
public SuppressedDeadLetter(Akka.Event.IDeadLetterSuppression message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public SuppressedDeadLetter(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
}
public class TraceLogger : Akka.Actor.UntypedActor
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3676,7 +3676,7 @@ namespace Akka.Event
}
public sealed class SuppressedDeadLetter : Akka.Event.AllDeadLetters
{
public SuppressedDeadLetter(Akka.Event.IDeadLetterSuppression message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
public SuppressedDeadLetter(object message, Akka.Actor.IActorRef sender, Akka.Actor.IActorRef recipient) { }
}
public class TraceLogger : Akka.Actor.UntypedActor
{
Expand Down
47 changes: 44 additions & 3 deletions src/core/Akka.Tests/Actor/DeadLettersSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace Akka.Tests
{

public class DeadLettersSpec : AkkaSpec
{
[Fact]
Expand All @@ -23,6 +22,48 @@ public async Task Can_send_messages_to_dead_letters()
Sys.DeadLetters.Tell("foobar");
await ExpectMsgAsync<DeadLetter>(deadLetter=>deadLetter.Message.Equals("foobar"));
}
}
}

private sealed record WrappedClass(object Message) : IWrappedMessage;

private sealed class SuppressedMessage : IDeadLetterSuppression
{

}

[Fact]
public async Task ShouldLogNormalWrappedMessages()
{
Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
Sys.DeadLetters.Tell(new WrappedClass("chocolate-beans"));
await ExpectMsgAsync<DeadLetter>();
}

[Fact]
public async Task ShouldNotLogWrappedMessagesWithDeadLetterSuppression()
{
Sys.EventStream.Subscribe(TestActor, typeof(AllDeadLetters));
Sys.DeadLetters.Tell(new WrappedClass(new SuppressedMessage()));
var msg = await ExpectMsgAsync<SuppressedDeadLetter>();
msg.Message.ToString()!.Contains("SuppressedMessage").ShouldBeTrue();
}

[Fact]
public async Task ShouldLogNormalActorSelectionWrappedMessages()
{
Sys.EventStream.Subscribe(TestActor, typeof(DeadLetter));
var selection = Sys.ActorSelection("/user/foobar");
selection.Tell(new WrappedClass("chocolate-beans"));
await ExpectMsgAsync<DeadLetter>();
}

[Fact]
public async Task ShouldNotLogActorSelectionWrappedMessagesWithDeadLetterSuppression()
{
Sys.EventStream.Subscribe(TestActor, typeof(AllDeadLetters));
var selection = Sys.ActorSelection("/user/foobar");
selection.Tell(new WrappedClass(new SuppressedMessage()));
var msg = await ExpectMsgAsync<SuppressedDeadLetter>();
msg.Message.ToString()!.Contains("SuppressedMessage").ShouldBeTrue();
}
}
}
54 changes: 54 additions & 0 deletions src/core/Akka.Tests/Actor/WrappedMessagesSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// -----------------------------------------------------------------------
// <copyright file="WrappedMessagesSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using Akka.Actor;
using Akka.Event;
using Akka.TestKit;
using Xunit;

namespace Akka.Tests;

public class WrappedMessagesSpec
{
private sealed record WrappedClass(object Message) : IWrappedMessage;

private sealed record WrappedSuppressedClass(object Message) : IWrappedMessage, IDeadLetterSuppression;

private sealed class SuppressedMessage : IDeadLetterSuppression
{

}


[Fact]
public void ShouldUnwrapWrappedMessage()
{
var message = new WrappedClass("chocolate-beans");
var unwrapped = WrappedMessage.Unwrap(message);
unwrapped.ShouldBe("chocolate-beans");
}

public static readonly TheoryData<object, bool> SuppressedMessages = new()
{
{new SuppressedMessage(), true},
{new WrappedClass(new SuppressedMessage()), true},
{new WrappedClass(new WrappedClass(new SuppressedMessage())), true},
{new WrappedClass(new WrappedClass("chocolate-beans")), false},
{new WrappedSuppressedClass("foo"), true},
{new WrappedClass(new WrappedSuppressedClass("chocolate-beans")), true},
{new WrappedClass("chocolate-beans"), false},
{"chocolate-beans", false}
};

[Theory]
[MemberData(nameof(SuppressedMessages))]
public void ShouldDetectIfWrappedMessageIsSuppressed(object message, bool shouldBeSuppressed)
{
var isSuppressed = WrappedMessage.IsDeadLetterSuppressedAnywhere(message);
isSuppressed.ShouldBe(shouldBeSuppressed);
}
}
35 changes: 24 additions & 11 deletions src/core/Akka/Actor/BuiltInActors.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,18 @@ public static object Unwrap(object message)
}
return message;
}

internal static bool IsDeadLetterSuppressedAnywhere(object message)
{
var isSuppressed = message is IDeadLetterSuppression;
while(!isSuppressed && message is IWrappedMessage wm)
{
message = wm.Message;
isSuppressed = message is IDeadLetterSuppression;
}

return isSuppressed;
}
}

/// <summary>
Expand Down Expand Up @@ -226,19 +238,20 @@ public DeadLetterActorRef(IActorRefProvider provider, ActorPath path, EventStrea
/// <exception cref="InvalidMessageException">This exception is thrown if the given <paramref name="message"/> is undefined.</exception>
protected override void TellInternal(object message, IActorRef sender)
{
if (message == null) throw new InvalidMessageException("Message is null");
var i = message as Identify;
if (i != null)
switch (message)
{
sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody));
return;
}
var d = message as DeadLetter;
if (d != null)
{
if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); }
return;
case null:
throw new InvalidMessageException("Message is null");
case Identify i:
sender.Tell(new ActorIdentity(i.MessageId, ActorRefs.Nobody));
return;
case DeadLetter d:
{
if (!SpecialHandle(d.Message, d.Sender)) { _eventStream.Publish(d); }
return;
}
}

if (!SpecialHandle(message, sender)) { _eventStream.Publish(new DeadLetter(message, sender.IsNobody() ? Provider.DeadLetters : sender, this)); }
}

Expand Down
8 changes: 6 additions & 2 deletions src/core/Akka/Actor/DeadLetterMailbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ public DeadLetterMessageQueue(IActorRef deadLetters)
/// <param name="envelope">TBD</param>
public void Enqueue(IActorRef receiver, Envelope envelope)
{
if (envelope.Message is DeadLetter)
if (envelope.Message is AllDeadLetters)
{
// actor subscribing to DeadLetter. Drop it.
/* We're receiving a DeadLetter sent to us by someone else (which is not normal - usually only happens
* if we were explicitly subscribed to DeadLetters on the EventStream).
*
* Have to terminate here in order to prevent a stack overflow.
*/
return;
}

Expand Down
10 changes: 5 additions & 5 deletions src/core/Akka/Actor/EmptyLocalActorRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ protected virtual bool SpecialHandle(object message, IActorRef sender)
}
else
{
if (actorSelectionMessage.Message is IDeadLetterSuppression selectionDeadLetterSuppression)
if (WrappedMessage.IsDeadLetterSuppressedAnywhere(actorSelectionMessage.Message))
{
PublishSupressedDeadLetter(selectionDeadLetterSuppression, sender);
PublishSupressedDeadLetter(actorSelectionMessage.Message, sender);
}
else
{
Expand All @@ -123,16 +123,16 @@ protected virtual bool SpecialHandle(object message, IActorRef sender)
return true;
}

if (message is IDeadLetterSuppression deadLetterSuppression)
if (WrappedMessage.IsDeadLetterSuppressedAnywhere(message))
{
PublishSupressedDeadLetter(deadLetterSuppression, sender);
PublishSupressedDeadLetter(message, sender);
return true;
}

return false;
}

private void PublishSupressedDeadLetter(IDeadLetterSuppression msg, IActorRef sender)
private void PublishSupressedDeadLetter(object msg, IActorRef sender)
{
_eventStream.Publish(new SuppressedDeadLetter(msg, sender.IsNobody() ? _provider.DeadLetters : sender, this));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Event/DeadLetter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public sealed class SuppressedDeadLetter : AllDeadLetters
/// <exception cref="ArgumentNullException">
/// This exception is thrown when either the sender or the recipient is undefined.
/// </exception>
public SuppressedDeadLetter(IDeadLetterSuppression message, IActorRef sender, IActorRef recipient) : base(message, sender, recipient)
public SuppressedDeadLetter(object message, IActorRef sender, IActorRef recipient) : base(message, sender, recipient)
{
if (sender == null) throw new ArgumentNullException(nameof(sender), "SuppressedDeadLetter sender may not be null");
if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null");
Expand Down

0 comments on commit 0cf567a

Please sign in to comment.