恐竜本舗

エンジニアをしている恐竜の徒然日記です。

アクターモデルの位置透過性を Kotlin/Pekko で試してみる

はじめに

年始にアクターモデルについての理解を深めたく、Kotlin を用いて Pekko でアクターモデルの位置透過性に触れてみたので、その内容をメモしていきます。

Kotlin で pekko-cluster を用いて試した実際のサンプルはこちらです。

github.com

まず、そもそもアクターモデルとは何なのでしょうか?

世の中の要求変化から見ていきます。

世の中の要求の変遷

CPU 危機

90 年代ごろまで、アプリケーションはシングルスレッドで動くのが一般的でした。

そのため、世の中としては高速なCPUが市場に出回るのを待っていたような時代です。(筆者としては生まれて間もないのであまりこの頃のことはよく分かってません)

しかし、2005 年頃に CPU のクロック速度の向上は限界に達しているという論文が提唱されました。

  • The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software
    • もうCPU 待ちでフリーランチを食べてる場合じゃないよ、というなかなかおしゃれなタイトルです
    • アプリケーションの高速化のためには、並行処理に転換していかないといけないよ、といったことが書かれています

同時期(2005 年)に、クラスター化されたマルチプロセッサーサーバ上でアプリケーションを動かす企業が出てきました。マルチコアの登場です。

しかし、マルチコアは単一コアが低速になる傾向があるため、アプリケーションの並行度が低いとパフォーマンスが低下するという問題が出てきました。 では、スレッドプログラミングにすればよいのでは?となると、下記のような問題が発生すると言われてます。

  • デッドロックのリスク
    • 複数のスレッドが互いのリソースの開放を待ち続けるリスク。ロックの順序管理などのしんどさが辛い
  • データの競合状態(race condition) のリスク
    • 複数スレッドが共有メモリ空間にアクセスするため、この問題が生まれやすく、リソースアクセス制御の難易度が高い
  • エラーハンドリングとデバッグの複雑性
    • 複数スレッドで並行するので、これらも同じく辛い

C10K 問題

CPU 危機の傍ら、世の中のクライアント数は膨大に増えていきました。

アクセスするクライアント数が1万を超えると、サーバーのスレッド数が増えて、サーバメモリのリソース不足が発生してしまう問題です。

処理能力に余力があっても、サーバ台数を増やさないといけないという課題が出てきました。

これに対して出てきた解決策として、ノンブロッキングI/O、イベントループで対応したのがNode.js (2009年〜)になります。

Akka(Pekko) が提供するアクターモデルもまた、このような問題を解決するためのものとなっています。

アクターモデルとは?

Node.js はシングルスレッドででノンブロッキングI/O とイベントループを行うことでこれらの問題を解決しましたが、アクターモデルでは、下記のような仕組みでこうした問題を解決しています。

  • マルチスレッド + アクター
    • Akkaはマルチスレッド環境で動作し、多数のアクターを並行して実行します
  • アクター
    • アクターは独立した計算の単位であり、それぞれが自身の状態とメールボックスを持ちます
  • メッセージパッシング
    • アクター間の通信は、非同期的なメッセージパッシングによって行われます

マルチコアを有効活用でき、アクターそれぞれは独立しているため、障害時などの回復性に強いと言われています。

Akka/Pekko とは

Akka(Pekko) は、Lightbend社によって提供されたアクタープログラミングモデルとランタイム、それらに必要な補助ツールを提供する、開発ツールキットです。

Akka/Pekko の違い

Akka は Akka 2.7 からライセンスが Business Source License(BSL) に変わり、本番環境での稼働にはLightbend社の有償ラインセンスが必要になりました。

Pekkoは、 Akka 2.6 からフォークされたオープンソースプロジェクトになります。

ライセンスの話については、こちらの記事がとても参考になりました。

creators-note.chatwork.com

Akka/Pekko の位置透過性について

アクターは、状態と振る舞いをカプセル化するオブジェクトであり、受信者のメールボックスにメッセージを置くことでメッセージを交換し、排他的に通信します。 ある意味では、アクターはオブジェクト指向プログラミングの中で最も厳格な形式ですが、人間にとってはより理解しやすいものです。

akka-ja-2411-translated.netlify.app

Akka/Pekko は各アクター内に状態と振る舞いを持っていて、アクター同士で排他的にメッセージのやり取りをします。

図で示すと下記のような形です。

アクターは

  • メッセージ駆動で動作する
  • 親アクターから子アクターにメッセージを渡すことが可能
  • 障害時は子アクターからのエスカレーションを見ていくことで耐障害性を担保している

というのがざっくりした概観といったところでしょうか?

そして、ようやく本題のAkka/Pekko の位置透過性についてです。

上図で示したこれらのアクターは、どこのサーバで実行されるのかをアプリケーション自体はほとんど意識せずに構築することができるそうです。

Akka/Pekkoは、送受信したメッセージを

  • 有効なスレッドでローカルで処理する
  • 別のサーバに投げてリモートで処理する

というのをあとから設定することができ、設計時にあまり意識せずに実装することが可能です。

これには、pekko-cluster というものを用います。

イメージ図としては下記のような感じで、アプリケーション自体はサーバの状態が左なのか、右なのかといった関心を保つ必要がなく、シンプルな設定だけでサーバ間での分散処理を可能としています。

テキストだけだとあまりイメージがつかないので、実際にやってみます。

やってみる

シンプルにテキストメッセージの送受信をするアプリケーションをKotlinで書き、pekko-cluster を用いて別サーバへのメッセージ送信をやってみます。

Java のスレッドを用いて、並列処理を試す

まずは、Java のスレッドを用いてシンプルに非同期処理を作成します。

下記の記事が非常に参考になりました。

tech-lab.sios.jp

ひとまず作成したJavaスレッドをベースとした並行処理は、ほとんど上記の写経なのでここでは割愛します。

実装は下記の commit 時点を参考ください。

github.com

比較対象として、Koroutine を使った例も作ってみました。

github.com

Pekko に変えて、複数コンテナで起動し、別NodeでReceiveする

では、この処理をPekko に変えて、複数コンテナでのメッセージ送受信を Pekko Cluster を用いて実現してみます。

Pekko Cluster は、 複数のアクターシステムをクラスタとしてまとめ、あたかも単一のアクターシステム上で動作しているかのように振る舞えるようにするためのもの 、になります。

pekko.apache.org

まずは、アプリケーションを複数コンテナで立ち上がるように、Dockerを書き換えます。

Dockerfile

FROM gradle:8.12.0-jdk17 AS builder

WORKDIR /workspace
COPY apps/build.gradle.kts apps/settings.gradle.kts ./
COPY apps/src ./src

RUN gradle clean build --no-daemon

FROM openjdk:17-slim AS base
WORKDIR /app

# ビルド成果物をコピー
COPY --from=builder /workspace/build/libs/*-all.jar /app/app.jar

FROM base AS node1
EXPOSE 2551
CMD ["java", "-jar", "app.jar", "2551"]

FROM base AS node2
EXPOSE 2552
CMD ["java", "-jar", "app.jar", "2552"]

FROM base AS node3
EXPOSE 2553
CMD ["java", "-jar", "app.jar", "2553"]

compose.yaml

name: kotlin-actor-sample

services:
  seed:
    container_name: pekko-cluster-seed
    build:
      context: ..
      dockerfile: docker/Dockerfile
      target: node1
    ports:
      - "2551:2551"
    environment:
      CLUSTER_PORT: 2551
      CLUSTER_IP: seed
  c1:
    container_name: pekko-cluster-c1
    build:
      context: ..
      dockerfile: docker/Dockerfile
      target: node2
    ports:
      - "2552:2552"
    environment:
      CLUSTER_PORT: 2552
      CLUSTER_IP: c1
  c2:
    container_name: pekko-cluster-c2
    build:
      context: ..
      dockerfile: docker/Dockerfile
      target: node3
    ports:
      - "2553:2553"
    environment:
      CLUSTER_PORT: 2553
      CLUSTER_IP: c2

Javaスレッドで試した方では、src をボリュームマウントしてたのですが、同一コードを複数コンテナで見ようとするとファイルハッシュが一緒で競合エラーとなってしまったので、一旦脳死で それぞれ .jar にビルドして実行することにしました。 (並行処理を試す際の開発環境でのいい方法がきっとあるはず・・・)

そして、Application.kt を下記のように変えます。

Application.kt

package com.github.daitasu

import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.cluster.typed.Cluster
import org.apache.pekko.cluster.typed.Join
import com.typesafe.config.ConfigFactory

fun main() {
    val port = System.getenv("CLUSTER_PORT")?.toIntOrNull() ?: 2551
    val hostname = System.getenv("CLUSTER_IP") ?: "127.0.0.1"

    // Pekkoのプロパティを設定
    System.setProperty("pekko.remote.artery.canonical.hostname", hostname)
    System.setProperty("pekko.remote.artery.canonical.port", port.toString())

    // application.conf を読み込む
    val config = ConfigFactory.load()

    // アクターシステムの起動
    val system = ActorSystem.create(
        GreetingActor.create(),
        "MyActorSystem",
        config
    )

    // クラスタにノードをJOIN する
    Cluster.get(system).manager().tell(Join.create(Cluster.get(system).selfMember().address()))

    if (port != 2551) {
        Thread.sleep(5000)
        val message = GreetingMessage("Hello World")
        system.tell(message)
    }

    println("Started ${Cluster.get(system).selfMember().address()}")
}

GreetingActor.kt

package com.github.daitasu

import org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.javadsl.Behaviors
import org.apache.pekko.cluster.typed.Cluster

data class GreetingMessage(val message: String)

object GreetingActor {
  fun create(): Behavior<GreetingMessage> =
    Behaviors.receive<GreetingMessage> { context, message ->
      val member = Cluster.get(context.system).selfMember()
      println("[Address: ${member.address()}] Status: ${member.status()} Message: ${message.message}")

      Behaviors.same<GreetingMessage>()
  }
}

分かりやすさのため、2ファイルに分けていますが、 Application.kt では下記を行っています。

  • アクターシステムのためのいくつかの設定の追加
  • Cluster に各コンテナで動いているnode ã‚’JOIN する
  • seed node でなければ、Messageを送信する

GreetingActor.kt では、単に他のアクターから受け取ったメッセージを自身のアクターの状態などと一緒に println しています。

上記はアプリケーションの挙動ですが、 val config = ConfigFactory.load() で読み込んでいる部分が重要で、ここでpekko-cluster の設定をしています。

src/main/resources/application.conf

pekko {
  loglevel = debug
  version = "1.1.2"
  actor {
    provider = "cluster"
    debug {
      receive = off
      lifecycle = off
    }
  }
  remote {
    log-remote-lifecycle-events = on
    artery {
      canonical {
        hostname = ${clustering.ip}
        port = ${clustering.port}
      }
    }
  }
  cluster {
    seed-nodes = [
       "pekko://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}
    ]
  }
}

clustering {
 ip = seed
 ip = ${?CLUSTER_IP}
 port = 2551
 port = ${?CLUSTER_PORT}
 seed-ip = seed
 seed-port = 2551
 cluster.name = MyActorSystem
}

この設定に追加していくだけで、seed-node の指定と、同一Cluster上に remote-node を設定することができます。

では実際に、 docker compose up してみましょう。

log をかいつまんでみます。

$docker compose up

pekko-cluster-c1    | [MyActorSystem-pekko.actor.default-dispatcher-5] INFO org.apache.pekko.cluster.Cluster -  pekkoMemberChanged Cluster Node [pekko://MyActorSystem@c1:2552] - Node [pekko://MyActorSystem@c1:2552] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
pekko-cluster-c1    | [MyActorSystem-pekko.actor.default-dispatcher-5] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@c1:2552] - is the new leader among reachable nodes (more leaders may exist)
pekko-cluster-c1    | [MyActorSystem-pekko.actor.default-dispatcher-5] INFO org.apache.pekko.cluster.Cluster -  pekkoMemberChanged Cluster Node [pekko://MyActorSystem@c1:2552] - Leader is moving node [pekko://MyActorSystem@c1:2552] to [Up]
pekko-cluster-c2    | [MyActorSystem-pekko.actor.default-dispatcher-3] INFO org.apache.pekko.cluster.Cluster -  pekkoMemberChanged Cluster Node [pekko://MyActorSystem@c2:2553] - Leader is moving node [pekko://MyActorSystem@c2:2553] to [Up]
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Received InitJoin message from [Actor[pekko://MyActorSystem@c1:2552/system/cluster/core/daemon/joinSeedNodeProcess-1#1074045501]] to [pekko://MyActorSystem@seed:2551]
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Sending InitJoinAck message from node [pekko://MyActorSystem@seed:2551] to [Actor[pekko://MyActorSystem@c1:2552/system/cluster/core/daemon/joinSeedNodeProcess-1#1074045501]] (version [1.1.2])
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Received InitJoin message from [Actor[pekko://MyActorSystem@c2:2553/system/cluster/core/daemon/joinSeedNodeProcess-1#1181724472]] to [pekko://MyActorSystem@seed:2551]
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Sending InitJoinAck message from node [pekko://MyActorSystem@seed:2551] to [Actor[pekko://MyActorSystem@c2:2553/system/cluster/core/daemon/joinSeedNodeProcess-1#1181724472]] (version [1.1.2])
pekko-cluster-c2    | Started pekko://MyActorSystem@c2:2553
pekko-cluster-c2    | [Address: pekko://MyActorSystem@c2:2553] Status: Up Message: Hello World
pekko-cluster-c1    | Started pekko://MyActorSystem@c1:2552
pekko-cluster-c1    | [Address: pekko://MyActorSystem@c1:2552] Status: Up Message: Hello World

無事に c1/c2 node から seed node にmessage が送られました。

その他にも、それぞれのnode がClusterのMemberになっているような log などが見えます。

アプリケーションコードの中で分散処理のための実装やリモートサーバを意識した実装などは要らず、 application.conf に設定した内容だけでほとんど実現することができました。

おわりに

この記事では、異なるサーバ間(Docker コンテナ間)をpekko-cluster を通して同一クラスタ上に置いて、メッセージの送受信をする例を作ってみました。

実際にはメッセージの種類で送るアクターを変えたりといったことはしないので、あまり実践的なイメージは湧きませんが、ここではサーバ間をシンプルにまたいでメッセージが送られていることが体感できれば良いかなと考えてます。

普段Node.js やフロントエンドをよく触っていて、JVM系言語自体が初心者なので結構苦戦しましたが、Node.js と違ったアプローチでの解決方法や、位置透過性の面白さなどが学べて良かったです。

アクターモデルは状態と振る舞いを内部にカプセル化しますが、状態の永続化は別途 Pekko Persistenceというものが必要です。

pekko.apache.org

この辺りまで理解しないと、アクターモデルの旨味は分からなそうなので、もう少し色々と調べて触っていきたいなと思います。

Reference

今回の理解、実装にあたり、下記の記事を参考にさせていただきました。

2024年に読んだ書籍まとめ

あまり読書家なほうではないと自分で思っているが、今年は仕事の関係もあって色々な書籍を読む機会が多かった。

まだ積ん読状態の書籍や気持ちだけ読んだ書籍も多々あるが、その辺は割愛して

  • そこそこちゃんと読んだ本達
  • リファレンス的にちょこちょこ読んでる本達

など、今年読んだ書籍をまとめてみる。

読んだ時系列が曖昧だが、だいたい読んだであろう順で書いてみた。

SQL ポケットガイド

www.oreilly.co.jp

しばらくNoSQLに触れてて、転職を機にRDB をProd 環境で久々に触ることになった。

RDBが久々だったので、リファレンスとして手元に置いておきたいなぁと買ったやつ。

基本的なSQL の振り返りはもちろんとして、window, view, sub query とか書こうとしたときに「あれ?」となったりしたので、ちょっとしたときにサッと開いたりして補助輪的に助かった。

プロダクトマネージャーのしごと 1日目から使える実践ガイド

www.oreilly.co.jp

JOINした仕事のチームで最初の頃は PdM が不在で、チーム作りを進めつつ、プロダクトの方向性整理や社内要望整理等のフローを見直したりしていた。

PdM の仕事でいうと、「ビルドトラップ本」を読んだことはあるが、いざ実践となったときにもう少し足元の動き方が知りたいと思って読んだ本。

PdMとしての core スキルに始まり、ハードスキル、ビジネス価値とユーザ価値に目を向けること、各所とのコミュニケーションの重要性などPdM としての動き方が書かれていた。

EM としての「何でもやる」とPdM としての「何でもやる」はクロスする部分もあるなと思いつつ、EM として考えてきた視点とは別の、「ユーザ」「ビジネス」を根幹に置いて考えるので、脳内に新しい視点が1個ストックされた気持ちになった。

でもじゃあ結局 PdM って何するの?を語れるかというと、やっぱり難しいなという思い。

Terraform の教科書

book.mynavi.jp

これも仕事関係。

仕事の中で、まるっと 0からシステムリアーキテクト していくということをやっていて、いままで Frontend を中心に触ってきた身なので、 インフラ周りの知見は痛感する程になかったので、個人でAWS を触りまくってまずは AWS で色々と構築をしていた。

その後、プロダクトとしてAWS の新システム構築に着手していったのだが、 さすがに現代において新規システム作るなら IaC 化してないと辛いので、 Terraform を詳しい人に聞きつつなんとか覚えた。

その時に、体系的に理解したり、レシピ的にも使える部分があって、今もちょこちょことリファレンスとして読んでいる。

AWS の構築に限らず、社内のSaaS アカウント整備とかそういうとこにも効かしたいなーとか汎用性が広いので、知識のブーストとして買ってよかったなと思っている本。

Terraform との悪戦苦闘は会社のブログに書いた。

productblog.sencorp.co.jp

Github CI/CD 実践ガイド

gihyo.jp

「自分が書くCI、美しくない!」という悔しさのもとで買った本。

昨年くらいから、個人開発でも monorepo 構成でモノを作ることが増えている。

そうしたときに、上記のような思いで買った。

一旦積ん読してたのだが、「Github Action でこういうことできないんだろうか?」と思ったときに読み返し始めてガっと読んだ。

workflow_call や workflow_dispatch 、paths や job の分け方など知ってはいるけど上手な使いこなしができていない知識に対して肉付けができて、この本も買ってよかったと思っているやつ。

運用しやすいワークフローの設計 、 アクションのオープンソース化 、各種のレシピがあって少しきれいなCIが書けるようになった。

Apacke Kafka / 分散メッセージングシステムの構築と活用

www.shoeisha.co.jp

社内の輪読会で扱った本。

CDC(Change Data Capture) を使っていく際に、メッセージングシステムの裏側にあるミドルウェアの仕組み、一般的な活用事例などについてを体系的に理解できる。

この本を読んでいるときに、Kafka について実際に触って試したいというのがあり、 Kafka.js でのCDC についてブログを書いた。

daitasu.hatenablog.jp

この本を読んでいて、後述する本にも関わるが、JVM 言語への再燃が湧いて年末の最近は Scala や Kotlin を書いている。

失敗から学ぶ RDB の正しい歩き方

gihyo.jp

こちらも仕事に直結しての課題感からになるが、RDB のアンチパターンについて全然知見が浅く、DB を根本からガっと変えましょう!といったときにもう少し視座が欲しいと思って一気読みした本。

soudai1025 さんが書かれるブログや スライドなどがDB関係に限らず個人的に好きでいつも勝手にたくさん学ばせて頂いている。

そのそーだいさんが執筆された本なのだが、アンチパターンについて事例と起こり得る事故がストーリー的に書かれていて、フロントエンドエンジニアをやっていた自分でも頭の中にとても入っていきやすく、何度か読み直しているがとても理解度が深まって良い。

ユーザビリティエンジニアリング 第2版

shop.ohmsha.co.jp

社内の輪読会で読んだ本。

この本に限った話ではないが、今年は仕事の中でデザイナーさん達と関わる機会が非常に増えた。

今までやってきたデザインシステムの立ち上げ期の議論などでのやりとりもそうだが、デザイナーさんのマネジメントにも関わったこともあり、

  • デザイナーが見ている視点とは?
  • デザイナーのキャリアとは?
  • デザイナーにとってのより良いコミュニケーション/環境とは?

みたいなものが自分の中で命題としてホットになってきたので、今年の後半からはデザイナーの仕事やキャリア、著名人などについて漁ったりしている。

この本では、ユーザインタビューにおける心構えやスタンス、手法の紹介だけでなく、インタビュー先の選定はどうするのか、インタビュー前の事前準備をいかに入念にするか、その後の分析についてなど詳細に書かれている。

オブジェクト指向 UI デザイン

gihyo.jp

こちらも輪読会で読んだ本。(輪読会に読書を頼ってしまっている、、、)

前半はオブジェクト指向UIとタスク指向UI の違いについての説明から始まり、

  • OOUI だと何が良いのか
  • OOUI を考えていく際の具体的な手法

について書かれている。

後半はワークアウトがいくつかのレシピでなされており、これらを考えていくことで実践的に知識を得られるという本。

OOUI を学ぶと、Frontend の知識と紐づいていくのかなと考えながら読み進めていたが、どちらかというと 事業ドメインの理解 が必要で、

といった点が大きく関与するなと感じた。

ドメインモデリングからのアーキテクチャ設計、DB設計と割と対として考えた方が良いなという学びを得た。

UI と Backend が持つべき領域は異なるが、UI が区分するドメインとアーキテクチャ上のドメイン理解がずれて開発が進むとユーザに提供したい価値がチーム内で揃っていないことになる。

→ デザイナーとエンジニア間の連携が、 UI/UX デザイン上の話に限らず、事業ドメイン理解の部分から突き合わせて入っていく(ここにはBisuness サイドも入って深めていく) という工程の重要性を感じ、考え方が切り替わった部分があり、とても良本と感じた。

「財務3表のつながり」で見えて売る会計の勘所

shopping.bookoff.co.jp

新卒のときに読んだ本なのだが、久しぶりに本棚の奥から引っ張り出してきた。

今年はプロダクトマネジメントに関わったり、プロダクトとして考えていく中で、ビジネス側の方向性やマネタイズの解像度を上げていく機会が多かった。

P / L ってどう書くんだっけ?

利益構造の細分化をしたときの事業利益、販管費などの構造や定義が自分の中で曖昧になっていたので、改めて基本を理解しようと久しぶりに読んだ。

この本は図解がとても多く、だいぶ初心者向けなのだが、利益や費用の表し方と区分など、基本的なことを理解するにはおすすめの本で、結構古くて格安で中古本が売られていたりするので、エンジニアの方に推しておきたい一冊。

人事評価の教科書

www.rosei.jp

個人的に、目標設定・評価査定の構造にはとても強めの思いがあって、この辺の分野は定期的に調べたり読んだりしている。

目標設定/評価制度の基盤が整っていることで、事業としてもスケールする土壌になると思っていて、これは 個人目標 といったメンバーのケーパビリティを引き上げる役割に限らず、 事業全体をOKR を立てて組織的に追っていくといった、 組織的な目標設定 としてもこの辺の仕組みの整備は重要と思っている。

この本は2年前に読んだ本なのだが、そのときは理解しきれなかったので、またこの分野への感情が昂ったこのタイミングでまた読んだ。

エンジニア組織に限らない、一般的な人事評価の構造、等級制度や目標設定の存在意義、評価査定フローにそれらをどう落とし込んでいくかなど、手堅く詳細に書かれていて、この本である程度の「 人事制度ってこうあるべきだよね? 」という話はできるようになる。

最近はいろんな企業が公開している等級制度や各等級の定義、人事評価フローなどを調べたりしていて、自分の中でTOPクラスにホットな部分になっている。

来年はもう少し業務の中でもこの辺の知識を活かしてアウトプット側に繋げていきたい。

子どもが中心の「共主体」の保育へ: 日本の保育アップデート!

www.shogakukan.co.jp

まだ読み途中の書籍。

「幼保業界に入ったので、幼保業界のことを知らねば!」と思って読んでいる。

保育園という領域に踏み込むのが初というのはありつつ、自分の娘も今年の4月から保育園に通い始めたため当事者でもある。

保育園とはいっても千差万別で、そのステークホルダーが保育士や栄養士、看護師といった保育園内だけにとどまらず、保護者、地域、小学校との接続、高校・大学生との交流といった様々な連携が重要視されてきているということを知り、いろんな点で目から鱗がある。

まだちゃんと読めていないので、年末年始の課題図書としている。

最新教育動向2025

www.meijitosho.co.jp

幼保業界に入った、と言いつつ、自分ががっつり関わっているのは小中学校の分野だったりする。

直接的に教育分野と関わる機会はないが、それでも学校の先生をやっている友人などに話を聞くと、昨今の学校事情が自分たちが子どもの頃と全然異なっていることが多く、「今の学校を知りたい」という思いで手に取った。

こちらの本は1-2日で一気読みして、特に今後の学校教育に関するところでの国や各省庁が言及・提言している動向などが書かれていて上記同様目から鱗なことが多かった。

教員の働き方改革に始まり、部活動の廃止、探究学習・自由進度学習といった授業のあり方の変化、幼小連携についてなど、知らないことが多かった。

一方で、「提言」「言及」といった言葉が多くあるので、「実際の現場の声と実態はどうなんだろう?」とかは気になったりした。

これはまた知り合いなどに色々聞いてみたい。

なっとく!関数型プログラミング

www.shoeisha.co.jp

今年はFrontend という今までの主領域から大きく離れて、Frontend の実装はガッツリしつつも、Backend の議論だったり、Infra 構築だったり、CQRS の仕組みの理解だったり、デザイナーの目線、PdMの目線、人事の目線、マーケの目線など色々なことに手を出して色々とやったりした。

その中で、Frontend という領域についても自分の中で還元された部分が多分にあったりした。

そんな中、「TSKaigi Kansai」に出かけて、sadnessOjisan さんの下記の発表を聞く。

今年は一周回って学びになるという体験が多かったこともあり、個人的に自分の技術領域をもう少し拡げていきたいという思いから、関数型言語を触り始めた。

現在周りに JVM 言語について相談しやすい環境が割とあるので、始めるなら Scala かなと思って始めている。

この本は、弊CTO に相談しておすすめしてもらった本で、そもそも関数型言語のどんなところが良いの?というのを、考え方の切り替えから入ってくれるので、Scala の理解というよりは関数型言語についての理解として良かった。

まださっと一通り読んだところで、触ってみる中で何度も読む本になりそう。

実践 Scala 入門

gihyo.jp

こちらはScala の言語仕様について特にしっかりと書かれている本。

Scala2 ではあるが、実際書き始めていく中で知りたかった部分が早期に学べたので読めてよかったなと思っている。

こちらもScala を書いていく中で何度も読むことになりそうな印象。

おわりに

今年読んだ書籍をまとめていたつもりが、大半がそのときの仕事や自分の関心に直結していて、今年の行動の振り返りに近いような内容になった。

自分の読書の傾向的に、進む本は一気に読むが、読めない本は序盤ですぐに閉じてしまう。

雑多にいろんな物を読んでいくというよりは、 そのとき一番ホットなもの を手に取ってガっと読むほうが性に合っているのかもしれない。

数年経ってふとホットになるものもあるので難しい。。。

例年よりも書籍に限らずインプット量が増えて、やりたいことなども色々と出てきた。

今年の振り返りは別途書きたいが、結構振り返った気持ちになったので元気があれば書きたい。

Kafka.js + Debezium を使って、Change Data Capture(CDC)でDBの変更検知を行い、別DBに書き込む(後編)

はじめに

前回の続きです。

daitasu.hatenablog.jp

Kafka.jsを用いて、Kafkaの producer と consumer の仕組みを理解しました。

これをもとに、CDCツールである Debezium を用いて、PostgreSQLのデータ変更を検知して、別DBに書き込む様な流れを作ってみます。

debezium.io

やってみる

基盤となるコンテナの立ち上げ

まずは、前回同様に、 Kafka と zookeeper のコンテナ、そして、2つのPostgreSQLと Debezium のコンテナを作成します。

kafka は、コンテナ内部から見る kafka:9092 と、consumer.ts からアクセスするための localhost:29092 を準備しておきます。

compose.yaml

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.7.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    ports:
      - "9092:9092"
      - "29092:29092"

  postgres1:
    image: postgres:16
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: originalDB
    ports:
      - "5432:5432"
    volumes:
      - ./postgres1_data:/var/lib/postgresql/data

  postgres2:
    image: postgres:16
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: newDB
    ports:
      - "5433:5432"
    volumes:
      - ./postgres2_data:/var/lib/postgresql/data

  debezium:
    image: debezium/connect:2.7
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - kafka
      - postgres1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: ""
    depends_on:
      - kafka

コンテナを立ち上げ、先にoriginalDB と newDB のそれぞれに、仮のテーブルを作っておきます。

CREATE TABLE Users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100) UNIQUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE

Debezuim コンテナにコネクタを作成する

次に、Debezuim コンテナに対して、コネクタを作成します。 connector-config.json という名前で下記を作成します。

{
  "name": "postgres-users-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres1",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "originalDB",
    "database.server.name": "dbserver1",
    "table.include.list": "public.users",
    "plugin.name": "pgoutput",
    "topic.prefix": "dbserver1"
  }
}

下記のコマンドで、コネクタを作成できます。

curl -X POST -H "Content-Type: application/json" --data @connector-config.json http://localhost:8083/connectors

もしコネクタを消して、再度作成したい場合は、下記のコマンドを入力して再度やり直してください。

curl -X DELETE http://localhost:8083/connectors/postgres-users-connector

kafka-console-consumer を用いて試す

consumer.ts で別DBへの書き込みを行う前に、Kafkaコンテナに組み込まれている kafka-console-consumer を用いてconsole 出力で正しく動くかを確認します。

まず、Users テーブルにデータを作成し、Topicが正しく追加されているかを見てみます。

DB への追加

$ psql "postgresql://postgres:password@localhost:5432/originalDB"
psql (14.12 (Homebrew), server 16.3 (Debian 16.3-1.pgdg120+1))
WARNING: psql major version 14, server major version 16.
         Some psql features might not work.
Type "help" for help.

originalDB=# INSERT INTO Users (name, email) VALUES ('daitasu', '[email protected]');
INSERT 0 1

topic の確認

[appuser@xxx ~]$ kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
dbserver1.public.users
my_connect_configs
my_connect_offsets
my_connect_statuses

dbserver1.public.users というtopic が確認できます。 この状態で、 Kafkaコンテナに組み込まれているkafka-console-consumer を実行して、message の購読を開始してみます。

$ docker compose exec kafka bash
[appuser@xxx ~]$ kafka-console-consumer   --bootstrap-server kafka:9092   --topic dbserver1.public.users  --from-beginning
{"schema":{"type":"struct","fields":[{ ...

message がすでに溜まっていれば、上記のように取得することができます。 特に、中身の payload.before 、 payload.after でDBの変更前後を追うことができます。

  "payload": {
    "before": null,
    "after": {
      "id": 18,
      "name": "daitasu",
      "email": "[email protected]",
      "created_at": 1723994488034439
    }
}

⚠ 注意点として、PostgreSQLの wal_level が logical でないと、 debezuim の接続が切れてしまいます。

事前に、 postgresql.conf の中身を wal_level = logical に書き換えておきましょう。 (これに気づかず、結構苦戦しました)

Kafka.js で新DBに書き込みを行う

さて、CDCからの message の購読までうまく行ったので、これを Kafka.js を用いて、実際に新DBに書き込むまでを行います。

consumer.ts

import { Kafka } from "kafkajs";
import { Client } from "pg";

const kafka = new Kafka({
  clientId: "cdc-client",
  brokers: ["localhost:29092"],
});

const consumer = kafka.consumer({
  groupId: "cdc-group",
});

const pgClient = new Client({
  user: "postgres",
  host: "localhost",
  database: "newDB",
  password: "password",
  port: 5433,
});

const run = async () => {
  await pgClient.connect();

  await consumer.connect();
  await consumer.subscribe({
    topic: "dbserver1.public.users",
    fromBeginning: true,
  });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const change = JSON.parse(message.value.toString());

      // CDC メッセージに基づいて、新DBに変更を反映させる処理
      if (change.payload && change.payload.after) {
        const { id, name, email, created_at } = change.payload.after;

        const validCreatedAt = isNaN(new Date(created_at).getTime())
          ? new Date()
          : new Date(created_at);

        const query = `
          INSERT INTO Users (id, name, email, created_at) 
          VALUES ($1, $2, $3, $4)
          ON CONFLICT (id) DO UPDATE 
          SET name = EXCLUDED.name, email = EXCLUDED.email, created_at = EXCLUDED.created_at;
        `;
        await pgClient.query(query, [id, name, email, validCreatedAt]);
      }
    },
  });
};

run().catch(console.error);

const { id, name, email, created_at } = change.payload.after で、さきほどのDBの変更後のデータを取得することができます。 このデータをもとに、新DBに書き込みを行う、といった形です。

さて、では新DBの中身を見に行ってみましょう!

psql "postgresql://postgres:password@localhost:5433/newDB"
psql (14.12 (Homebrew), server 16.3 (Debian 16.3-1.pgdg120+1))
WARNING: psql major version 14, server major version 16.
         Some psql features might not work.
Type "help" for help.

newDB=# select * from Users where name = 'daitasu';
 id |  name   |        email        |        created_at
----+---------+---------------------+--------------------------
 18 | daitasu | [email protected] | 56601-03-19 06:47:14.439

実際に、DBへの書き込みがされました!

まとめ

今回、Debezium + Kafka.js を用いて、ローカル環境でのCDCによる別DBへの書き込み処理を試しました。

普段のアプリケーション開発の中でCDCを使う機会がなく、理解になかなか苦戦しましたが、この手法を用いればDB移行などの幅が広がりそうです。

今後は、Amazon MSK などを用いて、より実際の動きに近い形で調査等続けていこうと思います!

今回のコードは、下記リポジトリに置いています。 前回の記事に引き続きで作成しており、本記事については、 packages/04_devezium_cdc/ 配下に配置しています。

github.com

Reference

今回の記事作成にあたり、下記を参考にさせて頂きました。

Kafka.js + Debezium を使って、Change Data Capture(CDC)でDBの変更検知を行い、別DBに書き込む(前編)

はじめに

プロダクト開発がスケールしていくと、事業ドメインが枝分かれしていき、1つに集約されていたデータベースを複数のデータベースへと切り分けていきたい要求が生まれたりします。

そうした場面でDBの移行をする手段はいくつかあります。

  1. 同一データベース内に新規テーブルを作成し、既存テーブルと並行して利用
  2. 旧DBと新DBに分けて、異なるアプリケーションが書き込みを行う
  3. Kafkaを使ったメッセージキューでの処理
  4. Change Data Capture (CDC) を利用してDBの変更を検知し、新DBに書き込む

こうしたパターンの中で、今回はCDCを用いたDB検知での新DBへの書き込みを試してみます。

Kafka とは?

Apache Kafkaは、分散型ストリーム処理プラットフォームです。

複数台のサーバーで大量のデータを処理する分散メッセージングシステムで、送られてきたメッセージ(データ)を受け取り、受け取ったメッセージを別のシステムやデバイスに送るために使われます。

複数存在するシステムやデバイスをつなぐための重要な役割を果たします。

Kafka のユースケース

いくつか、Kafkaのユースケースをあげてみます。

  • リアルタイムデータストリーム処理
    • ex) イベントデータのリアルタイム処理(クリックストリーム、IoTセンサーデータの収集と解析など)
  • データの統合
    • ex) 複数のデータソースからのデータを集約し、中央のデータベースやデータウェアハウスに送信
    • 今回やりたいことはこちらです
  • ロギングと監視
    • ex) サーバーログ、アプリケーションのログ、監視データの集約
  • イベント駆動アーキテクチャ
    • ex) マイクロサービスに置ける、サービス感の非同期通信のためのメッセージング基盤

Kafka の基本用語

まず最初に、Kafka の基本となる、データ中継のためのシステム論理構成について基本的な単語を整理します。

Kafkaシステム概要図

  • Broker
    • データを受診/配信するサービス
  • Message
    • Kafka内で扱うデータの最小単位
    • Message にはKeyとValueをもたせることができ、Message送信時のパーティショニングで利用される
  • Producer
    • データの送信元となり、BrokerへMessageを送信するアプリケーション
  • Consumer
    • Broker からMessage取得を行うアプリケーション
  • Topic
    • Message を種別(トピック)ごとに管理するためのストレージ。Broker上に配置され管理される
    • Producer ã‚„Consumerは特定のTopicを指定してMessageの送受信を行うことで、単一のKafkaクラスタで多種類のMessageの中継を実現する

Kafkaの詳細については、下記の書籍をご参照ください。

amzn.asia

Kafka.js を試す

01 基本の挙動

今回はクライアントとして、Kafka.js を用いて試してみます。

まずは特段細かな設定をせずに、 Dockerで Kafkaブローカーを立ち上げ、producer と consumer を作成してみます。

compose.yaml

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.7.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"

producer.ts

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "daitasu-producer",
  brokers: ["localhost:9092"],
});

const producer = kafka.producer();

const runProducer = async () => {
  await producer.connect();
  console.log("Producer connected");

  // メッセージを送信
  await producer.send({
    topic: "test-topic",
    messages: [{ value: "こんにちは!" }, { value: "さようなら!" }],
  });

  console.log("Messages sent");
  await producer.disconnect();
};

runProducer().catch(console.error);

consumer.ts

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "daitasu-consumer",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "test-group" });

const runConsumer = async () => {
  await consumer.connect();
  console.log("Consumer connected");

  // トピックの購読を開始
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });
};

runConsumer().catch(console.error);

内容はとてもシンプルで、 new Kafka() で Broker のURLを指定します。

Brokerは複数台設置することができるので、配列として渡すようになっています。

今回は、Docker で立てた localhost を向き先にしていますが、Prod 環境では Amazon MSK(Managed Streaming for Apache Kafka)などを向き先にします。

実際に実行してみましょう!

package.json

{
  ...
  "scripts": {
    "start:producer": "tsx producer.ts",
    "start:consumer": "tsx consumer.ts"
  },
  "dependencies": {
    "kafkajs": "catalog:",
    "tsx": "catalog:"
  }
}

TypeScript は tsx で動かすので、上記のように依存パッケージの取得とscripts を作っておきます。

> tsx producer.ts

Producer connected
Messages sent

producer を実行し、上記のようになれば成功です。 送られたMessagesは、 Broker で受信し、Topic に永続化されます。

次に、consumer.ts から取り出します。

> tsx consumer.ts

Consumer connected
{"level":"INFO","timestamp":"2024-08-17T06:45:16.476Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2024-08-17T06:45:43.660Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"test-group","memberId":"daitasu-consumer-534d2799-4dc5-46a8-b1d3-2c0fcb1d812e","leaderId":"daitasu-consumer-534d2799-4dc5-46a8-b1d3-2c0fcb1d812e","isLeader":true,"memberAssignment":{"test-topic":[0]},"groupProtocol":"RoundRobinAssigner","duration":27184}
{ partition: 0, offset: '18', value: 'こんにちは!' }
{ partition: 0, offset: '19', value: 'さようなら!' }

無事に取得することができました。

02 Producer の設定

Producer の細かい設定は、 kafka.producer() で設定を付与できます。

const producer = kafka.producer({
  allowAutoTopicCreation: true,
  transactionTimeout: 30000,
  maxInFlightRequests: 1,
  idempotent: true,
});

一部の例をあげます。

  • allowAutoTopicCreation
    • メッセージを送信する際に、指定されたトピックが存在しない場合は自動的にトピックを作成
    • (開発環境で都度Topicを作成するのが手間なときは true に、本番では false、といった感じでしょうか?)
  • maxInFlightRequests
    • 同時に処理できる未解決のリクエストの最大数を指定する
    • 1 にしておくと順序の保証が担保されますが、スループットは落ちる
  • idempotent
    • 冪等性を確保します。同じメッセージが複数回送信されても、Kafkaはそれを1回だけ処理する

Producer のバッチ送信は、下記の2つで送信の制御が可能です。

  • batch.size: 一度に送信するメッセージの最大バイトサイズを指定します。
  • linger.ms: メッセージが送信されるまでの最大待機時間をミリ秒で指定します。

しかし、調べたところ Kafka.js だとこの設定がまだできないようで、試す場合は他のクライアント( Kafka Java)などを用いる必要がありそうでした。

また、acks については、各messeges のsend 時に設定します。

  await producer.send({
    topic: "test-topic",
    messages: [{ value: "こんにちは!" }, { value: "さようなら!" }],
    acks: -1,
  });

all が -1 になります。

kafka.js.org

03 Offset の設定

KafkaのConsumerでエラーが発生した場合、Offset を手動でコミットすることで、エラー発生時に巻き戻して再試行することができます。

試しに、特定のエラーメッセージのときだけ確率で失敗するようにして、エラー時に再試行するようにします。

consumer.ts

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "daitasu-consumer",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "test-group" });

const runConsumer = async () => {
  await consumer.connect();
  console.log("Consumer connected");

  // トピックの購読を開始
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      try {
        // 特定のメッセージ内容でエラーを発生させる
        if (message.value.toString() === "エラーになるメッセージ") {
          if (Math.random() < 0.7) {
            throw new Error("Intentional error for testing");
          }
        }

        // メッセージの処理
        console.log({
          partition,
          offset: message.offset,
          value: message.value.toString(),
        });

        // メッセージ処理が成功した場合にのみ offset をcommit
        await consumer.commitOffsets([
          { topic, partition, offset: (Number(message.offset) + 1).toString() },
        ]);
        console.log(`Offset committed: ${message.offset}`);
      } catch (error) {
        // エラーが発生した場合、offset は commit しない
        console.error(`Error processing message: ${error}`);

        // エラーが発生した場合、特定のオフセットで再試行する
        await consumer.seek({ topic, partition, offset: message.offset });
        console.log(`Seeking to offset ${message.offset} for retry`);
      }
    },
  });
};

runConsumer().catch(console.error);

特定の offset からの再試行は、 consumer.seek() を用います。

producer で送るメッセージを修正します。

  await producer.send({
    topic: "test-topic",
    messages: [
      { value: "正常なメッセージ1" },
      { value: "エラーになるメッセージ" },
      { value: "正常なメッセージ2" },
    ],
  });

この状態で、再度実行すると、下記のような動きになります。

> tsx consumer.ts

Consumer connected
{"level":"INFO","timestamp":"2024-08-17T07:56:06.531Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2024-08-17T07:56:34.689Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"test-group","memberId":"daitasu-consumer-1100f629-c06b-4f73-b035-fe5c51527016","leaderId":"daitasu-consumer-1100f629-c06b-4f73-b035-fe5c51527016","isLeader":true,"memberAssignment":{"test-topic":[0]},"groupProtocol":"RoundRobinAssigner","duration":28157}
{ partition: 0, offset: '37', value: '正常なメッセージ1' }
Offset committed: 37
Error processing message: Error: Intentional error for testing
Seeking to offset 38 for retry
Error processing message: Error: Intentional error for testing
Seeking to offset 38 for retry
{ partition: 0, offset: '38', value: 'エラーになるメッセージ' }
Offset committed: 38
{ partition: 0, offset: '39', value: '正常なメッセージ2' }
Offset committed: 39

エラーになるメッセージのoffset で再試行が発生し、そこから継続しています。

ちなみに、現在のoffset にいるかは、 kafka-consumer-groups というCLIコマンドで見ることができます。

試しにDockerコンテナに入ってみてみましょう。

$ docker exec -it {container-id} bash
[appuser@xxx ~]$ kafka-consumer-groups --bootstrap-server localhost:9092 --list # consumer group の一覧を表示
test-group
[appuser@xxx ~]$ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --describe test-group # 特定のconsumer group の詳細表示
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
test-group      test-topic      0          40              40              0               daitasu-consumer-xxx /192.xxx.xx.x  daitasu-consumer

このようにして、現在のoffset を確認することが可能です。

kafka.js.org

おわりに

今回はKafka.js を用いて、Kafkaの基本的な仕組みの理解と、Messageの送信の流れを追いかけてみました。

この記事で扱ったコードは、下記に置いてます。

github.com

内容が長くなるので前後編に分けます!

後編では、実際にChange Data Capture(CDC) の仕組みを用いて、PostgreSQL のDB変更検知 → 別DB書き込み 、という流れをやってみようと思います!

daitasu.hatenablog.jp

千株式会社に入社しました!

2024/4/15 より、保育Tech サービスの開発をしている千株式会社さんに JOIN しました!

千株式会社とは

「人の心に火をつける。 世界を動かす会社を創る。」

というミッションを掲げている、保育Tech サービスの会社さんです。

運動会や発表会などの写真撮影からEC販売をトータルで担う インターネット写真販売「はいチーズ!フォト」 、 保育ICT「はいチーズ!システム」 などの保育DX領域を中心に、様々なサービスを展開しています。

docs.google.com

docs.google.com

8122.jp

何をしていくの?

プロダクト組織が数年前から大きく変革してきている組織のため、エンジニア組織としては様々な面で整備されています。

www.wantedly.com

sencorp.co.jp

とはいえ、組織としてもまだまだ発展途上にあり、システムとしても20年近く運用してきているプロダクトなので、課題も多く、コロナ禍というターニングポイントを起点に、事業が一層展開し、事業が求める期待値も高くなっている。

それに答えていくために、山程のやらなければならないことを、一層のスピード感を持ってやっていく必要がある、幅と深さを同時に引き伸ばしつつ、土台も並列で固めていくといったフェーズかなと感じています。

何をしていくのか。「自分に何ができるかな」から考えていかなければですが、フロントエンド周りの技術改善や組織の開発速度の向上・標準化などやれることは貢献しつつ、自分のキャパシティをストレッチしないと立ち行かないような課題もたくさん頂けそうなので、色々なことにチャレンジしていく予定です。

求む

入ってみてまだあまり日は経っていないのですが、社内に優秀なエンジニア・デザイナーの方々が多く、また、プロダクトの歴史の長さと比較して、物事を前に進めるスピードの速さ、現場に求められる期待値の高さがとても良いなと感じてます。

この期待速度の中で仕事をしていくのはとても面白そうです。

おそらく採用もしっかりと関わっていく、かつ、ぜひ一緒に開発組織を盛り上げていける方、プロダクトをより良くしていきたい方にぜひ来てほしいので、採用リンクも貼っておきます!

hrmos.co

今年はフロントやマネジメント業務もですが、バックエンドやインフラなど広い領域に色々と手を出していけそうなのでとても楽しみです。

インプットとアウトプットのギアを上げていかないとなので、0歳の娘が寝たら夜な夜な気合いを入れていこうかなと。。。

ちょうど今月4月から自分の娘も保育園に入り、我が身事でもある事業領域なので、業界にきちんとアウトカムを出せるように頑張っていこうと思います。

前職を退職しました & 3年遅れの新婚旅行に行ってきました

2024年2月を最終出社として、STORES 株式会社を退職しました。

また、3/8-18で、コロナ禍で行けなかった新婚旅行として、アメリカ西海岸のカリフォルニア(シリコンバレー、カリフォルニアディズニー)に行ってきましたので、2つ合わせて書こうと思います!

STORES を退職しました

2月末を最終出社として、4年9ヶ月お世話になった STORES 株式会社を退職しました!

STORES には、入社直後から本当に様々な仕事を経験させていただきました。 約5年の間で、自分が入社した頃より社員数は5倍の何百人という規模の会社になり、社名もたくさん変わったり、プロダクトとしても、組織としても基盤が整備されていき、非常に激しい速度の変遷の中で、面白い仕事をたくさんさせていただきました。

マネージャーになってからは、組織としての開発生産性や予算というものをより一層考えるようになり、中途採用・新卒採用で一喜一憂したり、目標設定・評価査定、人や組織の成長、外部発信、意思決定など、抜けゆく魂を口から入れ直すような日々もあったりと本当に良い挑戦をたくさんすることができました。

STORES は組織としての土壌が整ってきて、これから一層にスケールする面白いフェーズであり、また、今年もまた新しい非常に面白いお仕事を託して頂いたタイミングではあったのですが、昨年子どもが生まれて今後を見つけ直す機会となり、

  • 自分がこれから向き合いたい社会課題のベクトルに変化が生まれた
  • 新たにできた展望に向けて、今持ちうる力で新たな荒野で勝負をしてみたい

というのがあり、この度旅立つことを決心しました。

最後にはいっぱい送別会を頂いたり(子の関係であまり夜外に出れないので、むしろ何個かまとめてもらったり)、色紙や花束を頂き大変うれしかったです。 STORES はプロダクトとしても、技術者としての環境としても、カルチャーとしても好きなところがいっぱいで相当語れる自信はありますが、今回旅立つからには一層面白い人材になってネット上で活き活きと無事を伝えていこうと思います。

今後についてはまた別途記事を書こうと思います。

3年遅れの新婚旅行に行ってきました

そして、3/8-18 にはアメリカのカリフォルニア州へコロナ禍で行けなかった新婚旅行に行きました。

子どもが丁度1歳になる直前で、「子どもが自分で歩けるようになったら」「イヤイヤ期になったら」「2人目ができたら」難易度がとにかく高くなるなど聞き、この機を逃すと次はもうはるか未来になるだろうと、今回の旅を決心しました。

大半の航空会社だと、乳児の席=膝の上、で大人分の席しかないことが多いのですが、ZIPAIR (JALの子会社) というLCCの航空会社だとお安い上に乳幼児分の席が1席無料、かつ、重厚な後ろ向きチャイルドシートが装備されている、というのがあり、この便で行ける場所で決めました。

  • 旦那要望でサンフランシスコ(シリコンバレー)
  • 嫁要望でロサンゼルス(カリフォルニアディズニー)

という点でカリフォルニアへと出発。

受託手荷物 30kg のうち、23kg 超が子どもの荷物(ミルク、離乳食、おむつ等)という重量級装備で挑みました。 (他にも相当色々と調査と工夫を凝らしたので、これでブログ1本はたぶん書けます)

サンフランシスコでは3日 + 3日で6日間。

ロサンゼルスでは4日間。

  • ディズニーダウンタウンでの買い物
  • 「ディズニー・カリフォルニア・アドベンチャー・パーク」と「ディズニーランド・パーク」の2パーク巡り
  • グリーティングができるレストランでご飯

など、密度濃いめの10日間を過ごすことができました。

もともと昔バックパッカーをしていて、あまりツアーというものに慣れていないため、今回も「地球の歩き方」+「Google Map」を相棒に闊歩しました。

今までもアジア圏を中心に色んな国に行ってきましたが、この「カリフォルニア」という旅は自分の中でズンと突き刺さるものがあり、目からウロコのような、焦りのような色んな思いを持つことになる旅になりました。

思いを馳せる

今回の旅を機に、また新たに2つのやりたいことというか、願望・欲望が生まれたので、35歳を目処に叶えられるように動いていこうと思います。 1ヶ月ほどコードにあまり触れずにいたので結構な焦りを感じてますが、Google 等を見てきたことで一層やる気には満ちることができたので良かった?のかなぁと。

今後については、またブログに書きます。

PhaserJSでパンを撃って敵を討つゲームを作った

Summary

  • パンを撃って敵を討つシューティングゲームを作ってみた
  • いつもお世話になっているPdMや社内に向けたネタ作だが、割とちゃんと作った
  • Googleログインでランキング機能を入れている

あそぶ

こちらから遊べます。キーボード操作が必要なため、PCブラウザでのみ遊べます。 推奨環境はChrome。

  • URL

pan-shoot.fly.dev

github.com

Tech Stack

モチベーション

Viteの学習も兼ねて、ブラウザゲームを作ってViteでBuild した結果を、サーバから静的配信したい、というモチベーションでネタを探していた。

ちょこちょこ小ネタのゲームを作るので、今回も小ネタとして作ってみた。

感想もろもろ

Fly.io

個人開発の置き場を Heroku からどこに変えるかを悩んでいて、今回は Fly.io に置いてみた。

  • 無料枠内で DBサーバとして PostgreSQL も使える
  • flyctl でアプリの構築〜デプロイ〜管理すべてコマンドラインベースで勝手が良い
  • Dockerfileに記述しておき、Fly.io 上でDockerイメージを立ち上げてくれる

構成管理が toml なのが慣れないともやもやするが、簡単なサービスなら今後はこれで行こうかなと思った。

Stable Diffusion

今回のメインビジュアルは背景を Stable Diffusion で作ってみた。

ローカルやGoogle Colaboratory に置いて試すのが一般的だが、 mage.space というサービスで今回は作成した。

画像生成系モデルのAIについては全然詳しくないのだが、 Example も多いのでそのプロンプトを見つつ作ってみた。

いい感じのアラビアンな画像ができたと思う。

Google ログイン

もともとはViteでブラウザゲームをビルドしたいだけだったので Firebase Hosting あたりに置くつもりだった。

しかし、やっぱり展開するならランキング機能欲しいなという欲が出てきて、 Google ログインを突っ込むことにした。

Auth0や Firebase Authorication も検討したものの、アカウント情報を基本的には保持したくなかったため、IDと名前以外は持たないように内製した。

スコア保持

ゲームはすべてCanvas 上で動いている。

普段の仕事だとSPAのWEBアプリを作っているのだが、Canvas上での画面をまたぐ際の情報保持がイマイチ分からず苦戦した。

結局、CryptoJS を用いて暗号化して、SessionStorageに保持 → 画面遷移後に認証、スコアを複合→DB保存後、破棄 という手順を踏んだ。

https://github.com/daitasu/pan-shoot/blob/main/frontend/src/scenes/mypage.ts#L32-L46

もっといい方法がありそうだが、そんなに大事な情報でもないのでフロント側で管理することにした。

Static Server 配信のための src リプレイス

今回のVite側の静的ファイルのパス解決とGolangサーバ側のパス解決を合わせるのに中々詰まった。

今回のフロントエンドをビルドした結果は下記のようになる。

dist
â”” assets
   â”” main-xxxxxxxxx.js
â”” images
   └ xxx.png # 各種の画像
â”” index.html

このとき、

  • Golang側で静的ファイルサーバを / 直下には置かず、 /static 配下にしたかった
  • index.html から js ã‚’ <script type="module" src="/static/assets/main-xxx.js"></script> のように取得したい
  • 画像群もまた、 href="/static/images/chocopan.ico" のようにして取得したい

という条件があるのだが、Golang側で /static/ を静的ファイルサーバとして配信すると、画像ファイルは正常に上記のパスになる。

一方、Viteでビルドした時点で index.html の script タグは src="/assets/main-xxx.js" となっており、ここにずれがあった。

prefix をつけると画像側のパスが今度はずれてしまうため、ここだけ個別書き換えるようにした。

pan-shoot/buildPlugin.ts at main · daitasu/pan-shoot · GitHub

これを vite.config.ts 上でプラグインとして入れ込むことで解决した。

import updateIndexHtml from "./src/scripts/buildPlugin";

export default defineConfig({
  // ...
  build: {
    // ...
    rollupOptions: {
      input: {
        main: resolve(root, "index.html"),
      },
    },
  },
  plugins: [updateIndexHtml()],
  server: {
    port: 9000,
  },
});
' }) e.innerHTML = codeBlock; });