Skip to content

Instantly share code, notes, and snippets.

@jboner
Created November 10, 2017 09:06
Show Gist options
  • Save jboner/4361c710d2e2386be8bddc39edb0528a to your computer and use it in GitHub Desktop.
Save jboner/4361c710d2e2386be8bddc39edb0528a to your computer and use it in GitHub Desktop.

Revisions

  1. jboner created this gist Nov 10, 2017.
    335 changes: 335 additions & 0 deletions OrderManagement.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,335 @@
    package sample.eventdriven.java;

    import akka.actor.*;
    import akka.persistence.AbstractPersistentActor;
    import scala.concurrent.duration.Duration;

    import java.io.Serializable;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;

    // ===============================================================
    // Demo of an Event-driven Architecture in Akka and Java.
    //
    // Show-casing:
    // 1. Events-first Domain Driven Design using Commands and Events
    // 2. Asynchronous communication through an Event Stream
    // 3. Asynchronous Process Manager driving the workflow
    // 4. Event-sourced Aggregates
    //
    // Used in my talk on 'How Events Are Reshaping Modern Systems'.
    //
    // NOTE: This is a very much simplified and dumbed down sample
    // that is by no means a template for production use.
    // F.e. in a real-world app I would not use Serializable
    // but a JSON, Protobuf, Avro, or some other good lib.
    // I would not use Akka's built in EventStream, but
    // probably Kafka or Kinesis. Etc.
    // ===============================================================

    // =========================================================
    // Commands
    // =========================================================
    interface Command extends Serializable {
    }

    class CreateOrder implements Command {
    final int userId;
    final int productId;

    CreateOrder(int userId, int productId) {
    this.userId = userId;
    this.productId = productId;
    }
    }

    class ReserveProduct implements Command {
    final int userId;
    final int productId;

    ReserveProduct(int userId, int productId) {
    this.userId = userId;
    this.productId = productId;
    }
    }

    class SubmitPayment implements Command {
    final int userId;
    final int productId;

    SubmitPayment(int userId, int productId) {
    this.userId = userId;
    this.productId = productId;
    }
    }

    class ShipProduct implements Command {
    final int userId;
    final int txId;

    ShipProduct(int userId, int txId) {
    this.userId = userId;
    this.txId = txId;
    }
    }

    // =========================================================
    // Events
    // =========================================================
    interface Event extends Serializable {
    }

    class ProductReserved implements Event {
    final int userId;
    final int txId;

    ProductReserved(int userId, int txId) {
    this.userId = userId;
    this.txId = txId;
    }
    }

    class ProductOutOfStock implements Event {
    final int userId;
    final int productId;

    ProductOutOfStock(int userId, int productId) {
    this.userId = userId;
    this.productId = productId;
    }
    }

    class PaymentAuthorized implements Event {
    final int userId;
    final int txId;

    PaymentAuthorized(int userId, int txId) {
    this.userId = userId;
    this.txId = txId;
    }
    }

    class PaymentDeclined implements Event {
    final int userId;
    final int txId;

    PaymentDeclined(int userId, int txId) {
    this.userId = userId;
    this.txId = txId;
    }
    }

    class ProductShipped implements Event {
    final int userId;
    final int txId;

    ProductShipped(int userId, int txId) {
    this.userId = userId;
    this.txId = txId;
    }
    }

    class OrderCompleted implements Event {
    final int userId;
    final int txId;

    OrderCompleted(int userId, int txId) {
    this.userId = userId;
    this.txId = txId;
    }
    }

    // =========================================================
    // Top-level service functioning as a Process Manager
    // Coordinating the workflow on behalf of the Client
    // =========================================================
    class Orders extends AbstractActor {

    final ActorRef client;
    final ActorRef inventory;
    final ActorRef payment;

    public Orders(ActorRef client, ActorRef inventory, ActorRef payment) {
    this.client = client;
    this.inventory = inventory;
    this.payment = payment;
    }

    @Override
    public Receive createReceive() {
    return receiveBuilder()
    .match(CreateOrder.class, cmd -> {
    System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
    inventory.tell(new ReserveProduct(cmd.userId, cmd.productId), getSelf());
    })
    .match(ProductReserved.class, evt -> {
    System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name());
    payment.tell(new SubmitPayment(evt.userId, evt.txId), getSelf());
    })
    .match(PaymentAuthorized.class, evt -> {
    System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name());
    inventory.tell(new ShipProduct(evt.userId, evt.txId), getSelf());
    })
    .match(ProductShipped.class, evt -> {
    System.out.println("EVENT:\t\t\t" + evt + " => " + getSelf().path().name());
    client.tell(new OrderCompleted(evt.userId, evt.txId), getSelf());
    })
    .build();
    }

    @Override
    public void preStart() {
    // Subscribe to Events from the Event Stream
    getContext().system().eventStream().subscribe(getSelf(), ProductReserved.class);
    getContext().system().eventStream().subscribe(getSelf(), ProductOutOfStock.class);
    getContext().system().eventStream().subscribe(getSelf(), ProductShipped.class);
    getContext().system().eventStream().subscribe(getSelf(), PaymentAuthorized.class);
    getContext().system().eventStream().subscribe(getSelf(), PaymentDeclined.class);
    }
    }


    // =========================================================
    // Event Sourced Aggregate
    // =========================================================
    class Inventory extends AbstractPersistentActor {

    @Override
    public String persistenceId() {
    return "inventory";
    }

    int nrOfProductsShipped = 0; // Mutable state, persisted in memory (AKA Memory Image)

    Event reserveProduct(int userId, int productId) {
    System.out.println("SIDE-EFFECT:\tReserving Product => " + getSelf().path().name());
    return new ProductReserved(userId, productId);
    }

    Event shipProduct(int userId, int txId) {
    nrOfProductsShipped += 1; // Update internal state
    System.out.println("SIDE-EFFECT:\tShipping Product => " + getSelf().path().name() +
    " - ProductsShipped: " + nrOfProductsShipped);
    return new ProductShipped(userId, txId);
    }

    @Override
    public Receive createReceive() {
    return receiveBuilder()
    .match(ReserveProduct.class, cmd -> { // Receive ReserveProduct Command
    System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
    Event productStatus = reserveProduct(cmd.userId, cmd.productId); // Try to reserve the product
    persist(productStatus, evt -> { // Try to persist the Event
    getContext().system().eventStream().publish(evt); // Publish Event to Event Stream
    });

    })
    .match(ShipProduct.class, cmd -> { // Receive ShipProduct Command
    System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
    Event shippingStatus = shipProduct(cmd.userId, cmd.txId); // Try to ship the product
    persist(shippingStatus, evt -> { // Try to persist the Event
    getContext().system().eventStream().publish(evt); // Publish Event to Event Stream
    });
    })
    .build();
    }

    @Override
    public Receive createReceiveRecover() {
    return receiveBuilder()
    .match(ProductReserved.class, evt -> { // Replay ProductReserved
    System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name());
    })
    .match(ProductShipped.class, evt -> { // Replay ProductShipped
    nrOfProductsShipped += 1; // Update the internal state
    System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name() +
    " - ProductsShipped: " + nrOfProductsShipped);
    })
    .build();
    }

    }


    // =========================================================
    // Event Sourced Aggregate
    // =========================================================
    class Payment extends AbstractPersistentActor {

    @Override
    public String persistenceId() {
    return "payment";
    }

    int uniqueTransactionNr = 0; // Mutable state, persisted in memory (AKA Memory Image)

    Event processPayment(int userId, int txId) {
    uniqueTransactionNr += 1; // Update the internal state
    System.out.println("SIDE-EFFECT:\tProcessing payment => " + getSelf().path().name() +
    " - TxNumber: " + uniqueTransactionNr);
    return new PaymentAuthorized(userId, uniqueTransactionNr);
    }

    @Override
    public Receive createReceive() {
    return receiveBuilder()
    .match(SubmitPayment.class, cmd -> { // Receive SubmitPayment Command
    System.out.println("COMMAND:\t\t" + cmd + " => " + getSelf().path().name());
    Event paymentStatus = processPayment(cmd.userId, cmd.productId); // Try to pay product
    persist(paymentStatus, evt -> { // Try to persist the Event
    getContext().system().eventStream().publish(evt); // Publish Event to Event Stream
    });

    })
    .build();
    }


    @Override
    public Receive createReceiveRecover() {
    return receiveBuilder()
    .match(PaymentAuthorized.class, evt -> { // Replay PaymentAuthorized
    uniqueTransactionNr += 1; // Update the internal state
    System.out.println("EVENT (REPLAY):\t" + evt + " => " + getSelf().path().name() +
    " - TxNumber: " + uniqueTransactionNr);
    })
    .build();
    }
    }

    // =========================================================
    // Running the Order Management simulation
    // =========================================================
    public class OrderManagement {
    public static void main(String... args) throws Exception {

    // Create the Order Management actor system
    final ActorSystem system = ActorSystem.create("OrderManagement");

    // Plumbing for "client"
    final Inbox clientInbox = Inbox.create(system);
    final ActorRef client = clientInbox.getRef();


    // Create the services
    final ActorRef inventory = system.actorOf(Props.create(Inventory.class), "Inventory");
    final ActorRef payment = system.actorOf(Props.create(Payment.class), "Payment");
    final ActorRef orders = system.actorOf(Props.create(Orders.class, client, inventory, payment), "Orders");

    // Send a CreateOrder Command to the Orders service
    clientInbox.send(orders, new CreateOrder(9, 1337));

    try {
    // Wait for the order confirmation
    Object confirmation = clientInbox.receive(Duration.create(5, TimeUnit.SECONDS));
    System.out.println("EVENT:\t\t\t" + confirmation + " => Client");

    } catch (TimeoutException e) {
    System.out.println("Waited 5 seconds for the OrderCompleted event, giving up...");
    }

    System.out.println("Order completed. Shutting down system.");

    system.terminate();
    }
    }