Skip to content

Lightweight Reliable Messaging Framework using Outbox Pattern / EFCore / AzureServiceBus

License

Notifications You must be signed in to change notification settings

cfrenzel/Eventfully

Repository files navigation

Eventfully .NET

Lightweight Reliable Messaging Framework using Outbox Pattern / EFCore / AzureServiceBus

Eventfully provides a gentle way for enterprises to begin incorporating asynchronous messaging patterns, long running workflows, and eventual consistency into their projects using familiar transactional patterns.

NuGet version (Eventfully.Core) Build status

Why

  • Dispatch Messages within a Transaction/UnitOfWork using an Outbox within your database
  • Simple Configuration
  • Advanced Retry Logic for all your eventual consistency needs
  • No requirement for shared message classes between apps/services
  • EFCore SqlServer support
  • Azure Service Bus support
  • Dependency Injection support
  • Delayed Dispatch and Timeouts
  • Easy to customize message deserialization
  • Encryption support (AES)
    • Azure Key Vault support out of the box
  • Simple Sagas
  • Configurable Message Processing Pipeline
  • Pluggable Transports, Outboxes, MessageHandling, Encryption, Dependency Injection
  • Supports Events, Command/Reply

Events

Events implement IIntegrationEvent. A base class IntegrationEvent is provided. Overriding MessageType provides a unique identifier for our new Event type.

 public class OrderCreated : IntegrationEvent
 {
     public override string MessageType => "Sales.OrderCreated";
     public Guid OrderId { get; private set; }
     public Decimal TotalDue { get; private set; }
     public string CurrencyCode { get; private set; }
 }

Event Handlers

Event handlers implement IMessageHandler<Event>.

public class OrderCreatedHandler : IMessageHandler<OrderCreated>
{
      public Task Handle(OrderCreated ev, MessageContext context)
      {
          Console.WriteLine($"Received OrderCreated Event");
          Console.WriteLine($"\tOrderId: {ev.OrderId}");
          Console.WriteLine($"\tTotal Due: {ev.TotalDue} {ev.CurrencyCode}");
          return Task.CompletedTask;
      }
}

Publishing Events

To Publish an OrderCreated event only if saving the Order Entity succeeds - inject an IMessagingClient into your constructor. Use the IMessagingClient to publish the event before calling DbContext.SaveChanges. This will save the event to the Outbox within the same transaction as the Order. The framework will try (and retry) to publish the event to the configured Transport in the background.

  public class OrderCreator
  {
            private readonly ApplicationDbContext _db;
            private readonly ILogger<Handler> _log;
            private readonly IMessagingClient _messagingClient;

            public OrderCreator(ApplicationDbContext db, ILogger<Handler> log, IMessagingClient messagingClient)
            {
                _db = db;
                _log = log;
                _messagingClient = messagingClient;
            }

            public async Task<Guid?> CreateOrder(CreateOrderCommand command, CancellationToken cancellationToken)
            {
                try
                {
                    Order newOrder = new Order(command.Amount, command.OrderedAtUtc);
                    _db.Add(newOrder);
                    _messagingClient.Publish(
                        new OrderCreated.Event(newOrder.Id, newOrder.Amount, "USD", null)
                     );
                     await _db.SaveChangesAsync();
                     return r.Id;
                }
                catch (Exception exc)
                {
                    _log.LogError(exc, "Error creating rate", null);
                    return null;
                }
            }

Configure Messaging

The simplest way to configure Transports (think AzureServiceBus) and Endpoints (think a specific queue or topic) is to implement a Profile.

  • Configure an Endpoint by providing a name: "Events"

  • Specify whether the endpoint is

    • Inbound - we want to receive and handle messages from it
    • Outbound - we want to write messages to it
    • InboundOutbound - we want to do both. Useful for apps that asynchronously talk to themselves
  • For Outbound endpoints we can bind specific Message Types to the endpoint. This allows us to publish Events without specifying an Endpoint

    • .BindEvent()
    • .AsEventDefault() to make the endpoint the default for all Events
    • Convention: use Endpoint names: "Events", "Commands", "Replies" to automatically make the endpoint a default

  • Specify a Transport

    • .UseAzureServiceBusTransport()
    • .UseLocalTransport() only uses the Outbox and dispatches messages locally without a servicebus
public class MessagingProfile : Profile
    {
        public MessagingProfile(Microsoft.Extensions.Configuration.IConfiguration config)
        {
            ConfigureEndpoint("Events")
                .AsInboundOutbound() //for our example will be reading and writing to this endpoint
                .BindEvent<PaymentMethodCreated>()
                .UseLocalTransport()
                ;
        }
    }

Configuring Transient Dispatch for EFCore and SqlServer

Often when you're sending a small number of messages within a Transaction, it makes sense for the messages to dispatch immediately after committing to the outbox. This avoids any delays normally incurred by polling the outbox. This feature is enabled by default and is referred to TransientDispatch. Until we figure out a better way to detect a successful save in EFCore you'll need to help out by implementing ISupportTransientDispatch in your DbContext. Simply publish a C# event when the changes ar saved.

public class ApplicationDbContext : DbContext, ISupportTransientDispatch
{     
    public event EventHandler ChangesPersisted;
 protected override void OnModelCreating(ModelBuilder builder)
 {
     base.OnModelCreating(builder);

     /*** Add Outbox Entities ***/
     builder.AddEFCoreOutbox();
 }
 public override int SaveChanges()
 {
     var res = base.SaveChanges();
      _postSaveChanges();
     return res;
 }
      
 public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default(CancellationToken))
 {
      var res = await base.SaveChangesAsync(cancellationToken);
      _postSaveChanges();
      return res;
 }

 private void _postSaveChanges()
 {
      this.ChangesPersisted?.Invoke(this, null);
 }

Registering with Container at Startup

Eventfully plugs into your DI framework. For Microsoft.DependencyInjection

  • services.AddMessaging
  • .WithEFCoreOutbox<ApplicationDbContext> - configure an outbox
  • _serviceProvider.UseMessagingHost() - to enable processing of Inbound endpoints and Outbox
     _services.AddMessaging(
         new MessagingProfile(_config),
         typeof(Program).GetTypeInfo().Assembly
      )
      .WithEFCoreOutbox<ApplicationDbContext>(settings =>
      {
        settings.DisableTransientDispatch = false;
        settings.MaxConcurrency = 1;
        settings.SqlConnectionString = _config.GetConnectionString("ApplicationConnection");
      });
    
    //start messaging processing from outbox and inbound endpoints
    _serviceProvider = services.BuildServiceProvider();
    _serviceProvider.UseMessagingHost();

Encryption for an Event Type

    ConfigureEndpoint("Events")
    .AsInboundOutbound()
    .BindEvent<PaymentMethodCreated>()
        .UseAesEncryption(config.GetSection("SampleAESKey").Value)
    .UseLocalTransport()
    ;            

Encryption with AzureKeyVault KeyProvider

    .UseAesEncryption("keyName", new AzureKeyVaultKeyProvider(config.GetSection("KeyVaultUrl").Value))

Configuring Messages with MessageMetaData

  • Add arbitrary data to a message
   var meta = new MessageMetaData();
   meta.Add("CustomProp", "CustomValue");
  • Predefined Meta Data
    • DispatchDelay
    • CorrelationId
    • SessionId
    • MessageId
    • SkipTransient

Delayed Dispatch

   await client.Publish(new OrderCreated.Event(Guid.NewGuid(), 722.99M, "USD", null),
        new MessageMetaData(delay: TimeSpan.FromSeconds(30))
   );

Publishing Raw Message Data

    dynamic json = new ExpandoObject();
    json.OrderId = Guid.NewGuid();
    json.TotalDue = 622.99M;
    json.CurrencyCode = "USD";
    json.ShippingAddress = new
    {
        Line1 = "456 Peachtree St",
        Line2 = "Suite A",
        City = "Atlanta",
        StateCode = "GA",
        Zip = "30319"
    };

    await client.Publish(
        "Sales.OrderCreated",
        Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(json))
    ); 

Custom Message Extractor

Within your event class implement IMessageExtractor

      public class Event : IntegrationEvent, IMessageExtractor
        {
            public override string MessageType => "Organization.Reputation";

            public Guid OrganizatoinId { get; set; }
            public string EventType { get; set; }
            public string Details { get; set; }

            private Event() { }
            public Event(Guid id, string type, string details)
            {
                this.OrganizatoinId = id;
                this.EventType = type;
                this.Details = details;
            }

            public IIntegrationMessage Extract(byte[] data)
            {
                var textData = Encoding.UTF8.GetString(data);
                dynamic json = JValue.Parse(textData);
                Guid id = json.OrganizationId;
                string type = json.EventType;
                string details = json.EventDetails;
                var @event = new OrganizationReputation.Event(id, type, details);
                return @event;
            }
        }

Bypassing the Outbox - Non Transactional

 await client.PublishSynchronously(new OrderCreated.Event(Guid.NewGuid(), 722.99M, "USD", null));

About

Lightweight Reliable Messaging Framework using Outbox Pattern / EFCore / AzureServiceBus

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages