SEN PRODUCT BLOG

千株式会社のエンジニアによるブログ

これから始めるイベントソーシング:保育クラスの活動記録で学ぶCQRS+ES×アクターモデル

はじめに

こんにちは|こんばんは|メリークリスマス!
千株式会社CTOのytakeです。

この記事は、SENアドベントカレンダー 2024の25日目、最終日の記事です!

我々のようなEdtech関連企業がデジタルトランスフォーメーションを進めるための、
ドメインイベントやそれらを実現するためのアーキテクチャについて少しだけお話ししたいと思います。

来年の展望などについてはまた別の機会にお話しできればと思います。
現在の組織の取り組みについては前日の記事をご覧ください。

ドメインイベントとは

まずおさらいとして、ドメインイベントとはドメインモデルの中で発生する重要な出来事を表すイベントです。

EdTech、とくに幼保や学校といった領域で考えると、下記のような例が挙げられます。

  • 子どもが散歩に行った
  • 子どもがご飯を食べた
  • 保護者から欠席の連絡があった
  • 保育士が子どもの成長を記録した

これらはビジネスにおける重要な「出来事」そのものであり、
イベントストーミングなどを通して洗い出すことで、ドメインモデルを具体化できます。

加えて、ドメインイベントは利用者とのタッチポイントを明確にする手がかりにもなります。

保護者アプリでの連絡タイミング、保育士がどの場面で何を記録するか… こうした情報は、
プロダクトデザインやサービスデザインのアイデアを得るうえでも大変重要です。

ドメインイベントのシステム観点

システムの観点から見ると、ドメインイベントは「ドメインモデルの中で起きる重要な出来事」であると同時に、
継続的なイベントストリームとして扱うことができます。

これにより、蓄積されたイベントから「現在の状態」を導き出したり、
過去の出来事を参照しながら分析を行ったりできます。

たとえば、「子どもが散歩に行った」というイベントを記録しておけば、

  • 子どもごとにどのくらいの頻度で散歩に行っているか
  • どんな天候のときによく行っているか
  • どの場所に行くことが多いか

などを、後から集計・分析することが可能になります。

さらにはクラス全体・園全体での散歩頻度や傾向を把握したり、天候や曜日との関連を見たり、
さまざまな切り口でタイムリーな洞察を得ることができます。

従来の状態保存アプローチとの比較

ドメインイベントを用いず、現在の状態(最新の在庫数や人数、記録など)だけをデータベースに保存する場合は、
細かな変化や過去の履歴を振り返るには別途ログや履歴テーブルなどが必要になりがちです。

また、頻度やパターンを分析しようとするたびに、追加の仕組みを整備しなければならない場合もあります。

一方でドメインイベントを中心に据えた設計であれば、積み重ねられたイベントをそのまま活用することで、

  • 細かな出来事の追跡
  • 新しい分析観点の追加
  • 状態の再構築(いつでも「このときの状態」に戻せる)

といった恩恵を受けやすくなります。
これにより、たとえば子どもの小さな変化を捉えやすくなったり、
短期間のイベントの偏りを見つけられるなど、
教育・保育現場の質を向上させるための示唆を得ることができます。

つまりドメインイベントは、幼保や学校などのEdTech領域においても、「ビジネス上の大切な出来事」を明確にして、
モデリングやサービスデザインの土台となる重要な要素です。

イベントストリームとして蓄積されたデータは、分析・可視化・将来の再解釈など、多方面に活用できます。
従来の「状態を上書き保存する」アプローチでは見えにくかった微細な変化や履歴を、
イベントドリブンな設計を採用することで捉えやすくなります。

こうした観点は、EdTech領域以外でも有効ですが、日々の細かな行動や記録が重要視される保育・教育では、
とくにドメインイベント中心のアプローチが大きなメリットをもたらすのが理解できるかと思います。

では実際にどうやって実装するか?

ドメインイベントを実装するためには、いくつかの方法があります。
せっかくなのでここでは
CQRS+ESとアクターモデルの組み合わせについて、
簡単に触れてみたいと思います。

保育クラスの活動をProto Actor(Go)で管理してみよう

このサンプルでは、以下のようなドメインモデルを想定しています。

  • 保育クラス(ひまわり組、もも組など)
  • クラス内の園児の活動(観察記録・昼寝の記録・食事の記録)

そして、これらの「活動(アクティビティ)」をイベント(ドメインイベント)として管理し、 Proto Actorの永続化(persistence)機能を使ってイベントの蓄積と
イベントのリプレイできるようにしています。

さらに、CQRSの構成の一部として、コマンドが発行された際のドメインイベントを
読み取りモデル(Read Model)へ伝達するための仕組み
(ここではメッセージの返信先としてのreplyToとそれを受け取るstream(アクター)を利用)を簡易的に実装します。

コードを見ながら、CQRS+ESの基本的な流れを確認していきましょう。

1. 主要な要素の概観

(A) ChildcareClassActor

ひまわり組などクラスを表すアクターで、
コマンド(AddObservation, AddNap, AddMealなど)を受け取り、
対応するイベント(ObservationAdded, NapAdded, MealAdded)を発行しイベントストアに保存します。

イベントを保存したら、状態を更新(applyEvent)しさらにread model updaterアクター的存在にイベントを転送しています。

package main

import (
    "fmt"
    "time"

    console "github.com/asynkron/goconsole"
    "github.com/asynkron/protoactor-go/actor"
    "github.com/asynkron/protoactor-go/persistence"
    "github.com/asynkron/protoactor-go/stream"
    "github.com/acme/sample/command"
    "github.com/acme/sample/event"
    "google.golang.org/protobuf/proto"
    "google.golang.org/protobuf/types/known/anypb"
    "google.golang.org/protobuf/types/known/timestamppb"
)

func NewChildcareClassActor() actor.Actor {
    return &ChildcareClassActor{
        state: new(event.ActivityAdded),
    }
}

type ChildcareClassActor struct {
    persistence.Mixin
    state *event.ActivityAdded
}

// Receive is the entry point for messages
func (a *ChildcareClassActor) Receive(ctx actor.Context) {
    switch msg := ctx.Message().(type) {
    case *persistence.RequestSnapshot:
        a.PersistSnapshot(a.state)
    case *persistence.ReplayComplete:
        fmt.Printf("Replay complete for Child %s\n", ctx.Self().Id)
    case *command.AddObservation:
        evt := &event.ObservationAdded{
            ChildID:  msg.ChildID,
            Note:     msg.Note,
            DateTime: timestamppb.New(msg.DateTime),
        }
        sender := ctx.Sender()
        a.applyEvent(evt)
        ctx.Send(msg.ReplyTo, evt)
        ctx.Send(sender, "Observation recorded.")
    case *command.AddNap:
        evt := &event.NapAdded{
            ChildID:         msg.ChildID,
            DurationMinutes: msg.DurationMinutes,
            StartTime:       timestamppb.New(msg.StartTime),
        }
        sender := ctx.Sender()
        a.applyEvent(evt)
        ctx.Send(msg.ReplyTo, evt)
        ctx.Send(sender, "Nap recorded.")
    case *command.AddMeal:
        evt := &event.MealAdded{
            ChildID:  msg.ChildID,
            MealType: msg.MealType,
            DateTime: timestamppb.New(msg.DateTime),
            Menu:     msg.Menu,
        }
        sender := ctx.Sender()
        a.applyEvent(evt)
        ctx.Send(msg.ReplyTo, evt)
        ctx.Send(sender, "Meal recorded.")
    case *command.PrintActivities:
        fmt.Println(a.state.Details)
    case *event.ObservationAdded, *event.NapAdded, *event.MealAdded:
        a.applyEvent(msg.(proto.Message))
    }
}

func (a *ChildcareClassActor) applyEvent(msg proto.Message) {
    if !a.Recovering() {
        a.PersistReceive(msg)
    }
    switch e := msg.(type) {
    case *event.ObservationAdded:
        // 実際にはドメインロジックなりいろんな処理が必要です
        r, _ := anypb.New(e)
        a.state.Details = append(a.state.Details, r)
    case *event.NapAdded:
        r, _ := anypb.New(e)
        a.state.Details = append(a.state.Details, r)
    case *event.MealAdded:
        r, _ := anypb.New(e)
        a.state.Details = append(a.state.Details, r)
    }
}

構成はこのようなイメージです。

構成のイメージ

(B) Provider (InMemoryPersistence)

  • Proto Actorのpersistenceを使うためのプロバイダーです。
  • 本番環境ではDBやイベントストアを使いますが、このサンプルではインメモリで動作させ、スナップショットのタイミングやイベントの蓄積をシミュレーションしています。
type Provider struct {
    providerState persistence.ProviderState
}

func NewProvider(snapshotInterval int) *Provider {
    return &Provider{
        providerState: persistence.NewInMemoryProvider(snapshotInterval),
    }
}

func (p *Provider) GetState() persistence.ProviderState {
    return p.providerState
}

(C) main() 内での流れ

func main() {
    system := actor.NewActorSystem()
    pipe := stream.NewTypedStream[proto.Message](system)
    pp := pipe.PID()
    provider := NewProvider(3)
    props := actor.PropsFromProducer(NewChildcareClassActor,
        actor.WithReceiverMiddleware(persistence.Using(provider)))
    pid, _ := system.Root.SpawnNamed(props, "ひまわり")
    now := time.Now()
    go func() {
        // 観察記録の追加
        result, _ := system.Root.RequestFuture(
            pid,
            &command.AddObservation{
                ChildID:  "child-123",
                Note:     "砂場で元気に遊んでいた",
                DateTime: now,
                ReplyTo:  pp,
            },
            1*time.Second,
        ).Result()
        // お昼寝の追加
        napResponse, _ := system.Root.RequestFuture(
            pid,
            &command.AddNap{
                ChildID:         "child-123",
                DurationMinutes: 90,
                StartTime:       now.Add(1 * time.Hour),
                ReplyTo:         pp,
            },
            1*time.Second,
        ).Result()
        fmt.Println(napResponse)
        // 給食の記録を追加
        mealResponse, _ := system.Root.RequestFuture(
            pid,
            &command.AddMeal{
                ChildID:  "child-123",
                MealType: "昼食",
                DateTime: now.Add(2 * time.Hour),
                Menu:     "カレーライス",
                ReplyTo:  pp,
            },
            1*time.Second,
        ).Result()
        fmt.Println(mealResponse)
        result, _ = system.Root.RequestFuture(
            pid,
            &command.PrintActivities{},
            1*time.Second,
        ).Result()
        // ひまわり組アクターを停止しリソースを開放
        system.Root.Stop(pid)
        time.Sleep(1 * time.Second)
        // ひまわり組アクターを再起動しイベントのリプレイが行われ、ひまわり組の活動記録を再構築
        pid, _ = system.Root.SpawnNamed(props, "ひまわり")
        result, _ = system.Root.RequestFuture(
            pid,
            &command.PrintActivities{},
            1*time.Second,
        ).Result()
        fmt.Println(result)
    }()
    for range [3]int{} {
        // read model updater(Actor)に送信することで、read modelを更新できる
        fmt.Println(<-pipe.C())
    }
    _, _ = console.ReadLine()
}
  1. ActorSystem、InMemory Providerを初期化
  2. ChildcareClassActorをひまわり組命名してアクターを生成
  3. ChildcareClassActorにコマンドを送信
    • AddObservation(観察記録を追加)
    • AddNap(お昼寝の記録を追加)
    • AddMeal(食事の記録を追加)
    • PrintActivities(現在の活動一覧を表示)
  4. いったんアクターを停止し、再度起動(リプレイ)しても、過去のイベントが復元される様子を確認
  5. replyToに指定されたアクター(ここではpipeというTypedStream)にイベントが送信されイベントを受け取る

ここまでがこのサンプルにおける
読み取りモデルの更新までに相当するイメージです。

2. コマンドとイベントのやり取り

コマンド送信

コマンドは、アクターに対して「何かを行うように指示する」ためのメッセージですので、
下記の構造体を定義して、アクターに送信します。

package command

import (
    "time"

    "github.com/asynkron/protoactor-go/actor"
)

// AddObservation is a struct for add observation.
// 観察記録の追加
type AddObservation struct {
    ChildID  string
    Note     string
    DateTime time.Time
    ReplyTo  *actor.PID
}

// AddNap is a struct for add nap.
// 昼寝記録の追加
type AddNap struct {
    ChildID         string
    DurationMinutes int32
    StartTime       time.Time
    ReplyTo         *actor.PID
}

// AddMeal is a struct for add meal.
// 食事記録の追加
type AddMeal struct {
    ChildID  string
    MealType string // 朝食/昼食/おやつなど
    DateTime time.Time
    Menu     string // 食事内容の記録など
    ReplyTo  *actor.PID
}

type PrintActivities struct {
}

実際には次のようにして、コマンドを送信します。

&command.AddObservation{
    ChildID:  "child-123",
    Note:     "砂場で元気に遊んでいた",
    DateTime: now,
    ReplyTo:  pp, // ここが「read model updater(やpipe)」への転送先PID
}

ChildIDやDateTimeなどのドメイン上必要な情報を詰め込んで
AddObservationコマンドを発行します。

ReplyToにはイベント受信者のPID(ここではpp = Pipe)を指定し、
イベントが起きたらそこにも送ってほしいと依頼します。

イベントの生成・永続化

イベントの永続化を行う場合、
Proto ActorではProtocol Buffersを使ってイベントの構造を定義する必要があります。

syntax = "proto3";

package protobuf;

import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

option go_package = "github.com/acme/sample/event";

message ObservationAdded {
  string childID = 1;
  string note = 2;
  google.protobuf.Timestamp date_time = 3;
}

message NapAdded {
  string childID = 1;
  int32 duration_minutes = 2;
  google.protobuf.Timestamp start_time = 3;
}

message MealAdded {
  string childID = 1;
  string meal_type = 2;
  google.protobuf.Timestamp date_time = 3;
  string menu = 4;
}

message ActivityAdded {
  repeated google.protobuf.Any details = 2;
}

protocを使ってGoのコードを生成するとそれぞれのイベントに対応する構造体が生成されます。

コマンドを受け取ると、対応するイベント(例: ObservationAdded)を作成します
このイベントはapplyEvent(evt) 内でPersistReceive(evt) を呼び出し、
イベントをイベントストア(InMemory)に永続化します。
同時に、アクター自身のstate(Actor内部の現在状態)を更新します。

ここまでがイベントの永続化とアクターの状態更新の流れです。
アクターを再起動すると、過去のイベントがリプレイされ、状態が復元されることを確認できるようになります。

読み取りモデル更新への転送

イベントを永続化したら、それを読み取りモデル(Read Model)に反映させる必要があります。
QueryモデルやViewモデルなどとも呼ばれるこの部分は、
CQRSの一部として、コマンドが発行された際にイベントを受け取り、
それを元に状態を更新する役割を担います。

前述のサンプルコードではイベントを永続化したら、
ReplyToに設定されたアクター(今回はpipe)へ送信しています。
CQRS + ESの世界では、このイベントを受けた別アクターがリードモデルを更新したり、
レポートや通知を行ったりするパターンを取ります。

// この部分が「read model updater」への転送に相当します
ctx.Send(msg.ReplyTo, evt) 

3. TypedStream を用いた簡易的なRead Model受信

pipe := stream.NewTypedStream[proto.Message](system)
pp := pipe.PID()

pipeはProto Actor GoのTypedStream機能で、
特定の型(ここではproto.Message)を受け取るストリームを生成します。
ppはこのpipe自身のPID(アクターID) で、
コマンドにReplyTo: ppを指定することで、イベントがpipeに流れてくるようになります。

for range [3]int{} {
    // pipeから3回分メッセージを受信し、出力
    fmt.Println(<-pipe.C())
}

送られてきたイベントをチャネル経由で取り出していますが、
ここではprintlnしているだけです。
本番では、この部分が 「read model updaterアクター」 などに置き換えられ、
イベントを受け取り次第、読み取りデータベースを更新したりダッシュボードに反映したりする仕組みになります。

ここまでは下記のような流れになります。

4. イベントのリプレイと状態再構築

system.Root.Stop(pid) // アクター停止
// ...
pid, _ = system.Root.SpawnNamed(props, "ひまわり") // 再度起動(Spawn)
system.Root.RequestFuture(pid, &command.PrintActivities{}, 1*time.Second).Result()

一度アクターを停止し、もう一度同じ名前(“ひまわり”)で再起動しています。
そうすると、Proto Actorのpersistenceが過去に保存したイベントをリプレイし、
applyEvent()を再実行して同じ状態を再構築します。

アクターは他アクターとリソース共有などは行わない仕組みとなっていますので、
"もも""あじさい" などのアクターを生成すると、
それぞれのクラス毎の活動記録を永続化することができます。

これがイベントソーシング(Event Sourcing)+アクターモデルの強みで、
システム再起動時や障害復旧時でも、イベントログを辿って状態を再現できるわけです。

5. コードのポイントまとめ

  1. CQRS + ESライクな構成
    • Commandが来る → Eventを永続化 → Stateを更新 → ReplyToへイベント転送
  2. Actorのstateはイベントを適用して構築
    • applyEvent() 内で、スナップショットやイベントの追加ロジックを管理
  3. Read Model Updater(またはそれ相当)アクター
    • 今回はシンプルにTypedStreamを使った実装例になっています
    • 実運用ではこの部分を外部DB更新アクターやダッシュボード通知アクターなどに置き換えが可能です
  4. リプレイ機能
    • PersistReceiveで保存しておいたイベントが、再起動時にReplayCompleteイベントを介して再適用され、クラスアクターの状態を復元できる。

このサンプルを応用すれば、保育園・学校などのクラス単位で起きる出来事(ドメインイベント)を軸にすることで、
新たな分析やサービス拡張に対応できる柔軟なシステムを作ることができます。
(アクターモデルを活用した設計方法はこれ以外にもたくさんあります)

このようなアーキテクチャを活用することで、
多様な領域におけるデジタルトランスフォーメーションを推進していくための一助になれば幸いです。

良いお年を!

大作になってしまった・・・