コネヒト開発者ブログ

コネヒト開発者ブログ

Dataformの開発環境を晒してみる(Visual Studio Code with Dev Containers)

みなさんこんにちは。たかぱい(@takapy0210)です。

今回は、コネヒトを支えるデータ基盤の後編ということで、Dataformの開発環境について晒そうと思います紹介しようと思います。

「ワイのところはこんな感じの環境で開発しているよ!」などあればコメントいただけると大変嬉しいです!

前回の記事はこちら

tech.connehito.com


目次


はじめに

前回の記事では、コネヒトが抱えていた課題感やそれを解消するためにDataformでどのようにデータ基盤を構築しているのか?について紹介しました。

本記事では、以下のトピックに焦点を当てて紹介します。

  • Visual Studio Code(VS Code) with Dev Containersによる開発環境
  • Dataform tools拡張機能の活用

Dataform toolsに関しては、Google Cloud上で行えたことは殆ど網羅していたり、SQLFluff によるフォーマットが使えたりと、開発体験はかなり良くなったと実感しています!

開発環境の課題と解決策

DataformはGoogle Cloudのコンソール上で開発を行うことができます。 Githubと連携することで、ワークスペースという、いわゆるブランチのようなものを切って開発し、PRを出す、といったフローを踏むことができます。

メリットとしては、特に開発環境などを整備しなくともSQLさえ知っていれば誰でも開発しやすい点が挙げられると思います。

しかし、運用していくと以下のような課題がでてきました。

  • エディタが使いづらい
    • タブの左右分割表示ができない
    • フォーマッターの自動フォーマットがいけてない
    • サジェスト機能が貧弱、など
  • コードレビュー時、毎回コンソールにアクセスし対象のワークスペースを開き、SQLのコンパイルなどが問題ないか確認する必要がある
  • Claude Codeなど、Coding Agentの恩恵を受けられない

特にこれからの時代、Coding Agentの恩恵を受けられないことは重要な課題と判断し、ローカルで開発できる環境を構築していきました。

VS Code with Dev Containersによる開発環境

上記の課題を解決するために、VS Code with Dev Containersで開発環境を整備しました。 メリットとしては、ざっくり以下のようなことが挙げられると思います。

  • チームメンバー全員が同じ環境で開発できる
  • Dockerイメージにツールがプリインストール済みなので、セットアップが容易
  • コンパイルチェックやデータリネージなど、VS Codeの拡張機能が使える

Dockerfileの構成

Dev Containerで利用している dockerfile は以下のようなイメージです。

FROM mcr.microsoft.com/devcontainers/javascript-node:20-bookworm

RUN apt-get update && apt-get install -y --no-install-recommends \
  ...
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*

# Google Cloud CLI のインストール
RUN curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg \
      | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg \
    && echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" \
      > /etc/apt/sources.list.d/google-cloud-sdk.list \
    && apt-get update \
    && apt-get install -y --no-install-recommends \
         google-cloud-cli=548.0.0-0 \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

# Dataform CLI をインストール
RUN npm install -g @dataform/[email protected]

# SQLFluff のインストール
RUN python3 -m pip install sqlfluff==4.0.4

devcontainer.jsonの設定

devcontainer.json は以下です。

VS Codeの拡張機能であるDataform toolsをインストールして使えるようにしています。

{
  "name": "Dataform Dev Environment",
  "build": {
    "dockerfile": "Dockerfile"
  },
  "mounts": [
    ...
  ],
  // VS Codeの設定
  "customizations": {
    "vscode": {
      ...
      "extensions": [
        "ashishalex.dataform-lsp-vscode"
        ...
      ]
    }
  },
  ...
}

Dev Container起動後、以下の手順でGoogle Cloud認証を行えば、開発環境が整います。

# Google Cloud認証
gcloud auth application-default login
gcloud auth login

# プロジェクトの設定
gcloud config set project [PROJECT NAME]

Dataform tools拡張機能の活用

Dataform toolsとは、VS Code向けのDataform拡張機能です。以下のような機能を提供しています。

  • コンパイル後のクエリと Dry Runの統計情報の表示
  • 依存関係グラフ(Data Lineage)の表示
  • クエリ結果のプレビュー表示
  • CLI または Dataform APIを用いたジョブの実行
  • SQLFluff を使用して .sqlx ファイルのフォーマット

詳細は以下のページをご覧いただければと思いますが、中でもコンパイルやDry Runの統計情報がサクッと確認できたり、SQLFluff を利用してフォーマットできる点に助けられています。

dataformtools.com

コンパイル後のクエリと Dry Runの統計情報の表示

モザイク多めではありますが、VS Code上では以下のように情報を見ることができます。

コンパイル後のクエリはもちろん、Incremental / Non incrementalモードでこのsqlxを実行した時に、どのくらいのコストがかかるのか?も算出してくれています。

VS Code上での見え方

全sqlxに対して、コンパイルが行えるかのチェックも可能です。

# 全てのスキーマをコンパイルチェック
$ dataform compile

出力例

Compiled xxx action(s).
xxx dataset(s):
  hoge_table [table]
  fuga_table.bq_daily_scan_cost [incremental]
  ...

xxx assertion(s):
  hoge
  ...

xxx operation(s):
  fuga
  ...

また、dry-runに関してもまとめて実行することもできます。

# 全スキーマをdry-run
dataform run --dry-run

# タグを指定してdry-run
dataform run --tags [tag_name] --dry-run

# 特定のスキーマとその依存関係を含めてdry-run
dataform run --actions "hoge_table" --include-deps --dry-run

依存関係グラフ(Data Lineage)の表示

以下のように、依存関係グラフ(Data Lineage)を確認することもできます。

対象テーブルをBigQueryのコンソール上でシュッと確認できるように、テーブルURLへの遷移ボタンも付いています。

Dependency Graph

クエリ結果のプレビュー表示

Preview Dataボタンを押下することで、sqlxを実行したときに取得されるデータの中身を見ることもできます。

フィルターなども付いているので、データ確認が捗ります。

データプレビュー

CLI または Dataform APIを用いたジョブの実行

Google Cloudコンソールに行かずともVS Code上からジョブの実行を行うこともできます。

「Run(CLI)」と「Run(API)」の2つの方法があるのですが、それぞれで挙動が違うので注意が必要です。

ジョブの実行

Run(CLI)では、現在ローカルで編集しているファイルがそのまま実行されるのに対して、Run(API)では、現在のブランチをDataform上でコンパイルして実行します。そのためローカルで編集したファイルをPushしていない状態でRun(API)を押下すると、最新の変更が反映されない状態で実行されてしまいます。

Run(CLI)

  • 現在ローカルで編集しているファイルがそのまま実行される。
  • BigQueryのジョブエクスプローラーから、実行されたJOBを確認できる。
    • ただし、JOBが分かれて実行されることもあるので探すのはやや大変。
  • 挙動としては /usr/local/share/npm-global/bin/dataform run "/workspaces/gcp-dataform" --timeout=5m --actions "connehito-dwh.dataset.table" のようなコマンドが実行されるとの同じ。

Run(API)

  • 現在のブランチをDataform上でコンパイルし、実行される。
  • Google Cloud上にあるDataformの、Workflow Execution Logs から実行ログを見ることができる。

SQLFluff を使用して .sqlx ファイルのフォーマット

いつの間にか実装されていたのですが、SQLFluff を使ってフォーマットができます!(感動)

それまでは以下のようなプラグインを使っている方も多かったのではないでしょうか?

zenn.dev

VS Code上では以下の3ヶ所からフォーマットを実行することができます。

SQLFluff でのフォーマット

SQLFluffによるフォーマットを有効にするには .vscode-dataform-tools/.sqlfluff ファイルが必要になるのですが、その内容に以下のプレースホルダーを追記しないとエラーになるため、注意が必要です。

(エラーの理由ですが、Dataform tools拡張機能が一時ファイルを作成する際に、${ref()}などを数字(1,2,3など)に置き換えており、その結果がSQLとして不正になっているために発生しているようです)

...

[sqlfluff:templater:placeholder]
# Dataformの${...}構文をプレースホルダーとして扱う(ネストした{}を2レベルまでサポート)
param_regex = \$\{(?:[^{}]|\{(?:[^{}]|\{[^{}]*\})*\})*\}
# Dataform tools拡張機能が使用する数字プレースホルダーを定義
# 拡張機能は${ref()}などを数字に置き換えるため、それらをテーブル名として認識させる
1 = placeholder_table_1
2 = placeholder_table_2
3 = placeholder_table_3
4 = placeholder_table_4
5 = placeholder_table_5
6 = placeholder_table_6
7 = placeholder_table_7
8 = placeholder_table_8
9 = placeholder_table_9
10 = placeholder_table_10

最後に

本記事では、Dataformの開発環境をGoogle Cloud上のマネジメントコンソールから、ローカルのVS Code with Dev Containers環境に移行し、快適に開発が行えるようになったことについて紹介しました。

Dataformの事例はまだまだ少ないと思いますので、今後も定期的に発信していきたいと思います!

We Are Hiring 🤝

コネヒトではデータを用いてプロダクト・会社を成長させる機械学習エンジニアを募集しています!

興味のある方は以下よりご連絡お待ちしております!

herp.careers

コネヒトにおける機械学習、データ周辺業務に関しては以下の記事で紹介していますので、合わせてご覧ください!

tech.connehito.com

コネヒトは PHPerKaigi 2026 にシルバースポンサーとして協賛します!

こんにちは〜!2月にコネヒトに入社しました まきまき(@_mkmk884)です🦒

今回はコネヒトが協賛する PHPerKaigi 2026 を紹介します!

PHPerKaigi 2026に今年も協賛いたします

コネヒトではメインプロダクトである「ママリ」を始めとして、開発のメイン言語にPHPを活用しており、フレームワークはCakePHPを採用しています(その他、技術スタックを知りたい場合は Connehito Tech Vision をご覧ください)

その縁もあり、PHPerKaigi 2026 にシルバースポンサーとして協賛させていただくこととなりました!

PHPerKaigiにはゴールド・シルバーあわせて、2021年より毎年協賛をさせていただいております。

イベント概要

  • 日時:2026å¹´3月20日(金)〜 3月22日(日)
  • 場所:中野セントラルパークカンファレンス&ニコニコ生放送
  • 対象:PHPエンジニアおよびWeb技術のエンジニア
  • 主催:PHPerKaigi 2026 実行委員会

phperkaigi.jp

PHPerKaigiというイベントにつきまして、以下、公式サイトからの引用です。

PHPerKaigi(ペチパーカイギ)は、PHPer、つまり、 現在PHPを使用している方、過去にPHPを使用していた方、 これからPHPを使いたいと思っている方、そしてPHPが大好きな方たちが、 技術的なノウハウとPHP愛を共有するためのイベントです。

全国からPHPerが集まるウキウキワクワクなイベントですね🐘

どういう内容の登壇があるんだろ…と思った方はぜひタイムテーブルを覗いてみてください〜!

タイムテーブル | PHPerKaigi 2026 #phperkaigi - fortee.jp

ブース出展します

今年は企業ブースも出します!

個人の価値観を回答して、自分の価値観と世の中の価値観のギャップを知る、価値観診断アプリを企画として用意する予定です🌸

回答いただいた方にはノベルティを差し上げます!

ほかほか癒やしグッズや有名セレブグッズを配布予定…Comingsoon…

みなさんぜひコネヒトのブースに来てくださいね〜!!!

メンバーが登壇します

3月22日(日)の 15時55分より、わたくし、まきまきがLTで登壇します!(自分で宣伝) PHP開発で奮闘した経験を話します! ぜひお気軽に聞きに来てほしいです🙇‍♀️

fortee.jp

さいごに

弊社からは13名ほど参加する予定です!エンジニア以外のメンバーもおりますので、ぜひ様々な話題で参加者の皆様とお話したいと思っております🤝

ブースへの立ち寄りもぜひ!!!よろしくお願いします!!!

そして、 PHPerチャレンジ中の皆様、お目当てのPHPerトークンはこちらです!

#takapy

We Are Hiring

コネヒトではPHPerを積極採用中です!ご興味がある方は気軽にご連絡ください〜!

herp.careers

herp.careers

Firebase Analytics × BigQuery × Dataformで構築するデータ基盤

みなさんこんにちは。たかぱい(@takapy0210)です。

最近、ほぼ日手帳アプリで、毎日日記をつけるようにしています。 不思議なことに日記をつけるとなると、何かしら写真を撮りたくなってしまい、毎日何気ない風景などの写真を撮るようになりました。 まだ継続して1ヶ月強くらいですが、過去の日記を振り返るのが楽しいです。

さて今回は、コネヒトを支えるデータ基盤の現在地について前後編の2部構成で紹介していこうと思います。

前編となる本記事では、アーキテクチャの設計やFirebase AnalyticsのRAWデータをどのように扱っているのかについてご紹介します。

後編では、Dataformの開発環境について紹介する予定です。


目次


はじめに

コネヒトではママリというモバイルアプリを開発・運営しています。

ママリのログに関してはFirebase Analyticsに送信しており、BigQuery Export機能を使って、BigQueryにExportしています。

Firebase AnalyticsからBigQueryへのExport処理はざっくり以下のようなフローになっています。

Firebase AnalyticsからBigQueryへのExportフロー

このFirebaseからExportされるRAWデータは以下のような特徴があります。

  • event_paramsカラムがネスト構造になっている
  • event_paramsの型が動的(STRING、INT、DOUBLE、FLOATの4種類)
  • データ到着遅延がある(最大72時間程度)

これらの特徴もあり、RAWデータをそのままクエリするのは非効率で現実的ではないです。 このデータは、スケジュールクエリを使って分析しやすい形に整形はしていたのですが、これはこれで課題感が満載でした(後述)

そのような背景もあり、2024年あたりから少しずつデータ基盤の整備を進めてきました。

以降では、2024年当時の課題感と、それに対してDataformを導入した経緯、データモデルの設計思想について紹介しようと思います。

2024年当時に抱えていた課題

前述したように、RAWデータをそのままクエリするのは現実的ではなかったので、BigQueryにあるスケジュールクエリ機能を使い、分析しやすい形にTransformして分析を行っていました。

しかし、このスケジュールクエリに関しては以下のような課題がありました。

  • クエリ間の依存関係を制御できないので、「テーブルAの更新が終わったらテーブルBを更新する」という定義ができず、「Aã‚’2:00、Bã‚’2:30に実行」といった形で、時刻のバッファで管理していた
  • コード管理ができておらず、履歴管理もできていなかった
  • Data Lineageが見れず、「このテーブルの定義を変えたいけど、どこに影響が出るかわからない」といった、所謂データのサイロ化・ブラックボックス化が起こっていた
    • 特定のクエリが失敗した際のオペレーションとして、その下流にあるクエリだけを芋づる式に再実行する、という操作を手動でやる必要があったが、この影響範囲を探すのも一苦労だった

そこで、スケジュールクエリ撲滅PJを立ち上げ、徐々にデータパイプライン管理ツールであるDataformに移行させていきました。

スケジュールクエリで作られたテーブルがどのJOBから参照されているか?を特定しながら移行作業を行っていたのですが、その際には以下のようなsqlで泥臭く調査し、作業を進めていきました。

SELECT
  creation_time,
  user_email,
  job_type,
  statement_type,
  destination_table.project_id AS dest_project_id,
  destination_table.dataset_id AS dest_dataset_id,
  destination_table.table_id AS dest_table_id,
  query
FROM
  `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE
  creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
  -- 指定したテーブルを「参照している」ジョブだけに絞り込む
  AND EXISTS (
    SELECT 1
    FROM UNNEST(referenced_tables) AS rt
    WHERE rt.project_id = "sample_project"
      AND rt.dataset_id = "sample_dataset_id"
      AND rt.table_id = "sample_table_id"
  )
ORDER BY
  creation_time DESC

また、mixpanelを用いた分析も行っていたため、データのダブルスタンダード問題も発生していました。

なぜDataformにしたのか

よくETLツールで比較されるのがdbtかなと思います。

各ツールの比較に関しては多くの記事が公開されているためそちらに譲りますが、弊社に関しては以下のような背景がありました。

  • 専属データエンジニアがいない状況であり、なるべく学習・実装・運用コストを下げたい
  • 最低限の機能として、前述した「クエリの依存関係制御」「コード管理(Github)」「Data Lineageの把握」という課題が解決できればOK

Dataformに関しては、SQLベースで記述できる点や、Google Cloud上に統合しておりJOBの実行に関しては料金が発生しないなど(BigQueryのクエリ料金のみ)、導入までのハードルや運用コストの低さに関して優れており、スモールスタートしやすいといった観点で採用しました。

データモデルの設計

Dataformプロジェクトでは、概ね次の4層(+用途別)で整理しています。

  • sources:外部データソースの参照定義(declaration)
  • stg:生データの初期加工(扱いやすい形への整形)
  • warehouse:クレンジング・標準化済みの中間テーブル
  • mart:部門・ユースケース別の集計データマート
  • (必要に応じて)extra:MLや探索用途の作業場

Dataformのディレクトリイメージは以下です。 これはベストプラクティスや各社の事例を参考にしつつ、弊社の状況に合わせて設計しました。

definitions
  ├ dwh
  │   ├ app
  │   │   ├ extra(機械学習やデータプロダクト開発において使用されるデータを定義)
  │   │   │   ├ machine_learning
  │   │   │   │   └ ...
  │   │   │   └ ...
  │   │   ├ mart(データマートを定義)
  │   │   │   ├ marketing
  │   │   │   │   └ ...
  │   │   │   ├ product
  │   │   │   │   └ ...
  │   │   │   ├ sales
  │   │   │   │   └ ...
  │   │   │   └ ...
  │   │   ├ sources(データソースを定義)
  │   │   │   ├ firebase_analytics
  │   │   │   │   └ app_events.sqlx
  │   │   │   │   └ ...
  │   │   │   ├ google_analytics
  │   │   │   │   └ web_events.sqlx
  │   │   │   │   └ ...
  │   │   │   ├ mysql
  │   │   │   │   ├ users.sqlx
  │   │   │   │   └ ...
  │   │   │   └ ...
  │   │   ├ stg(中間テーブルを定義)
  │   │   │   ├ raw_fa_events_page_view.sqlx
  │   │   │   └ ...
  │   │   ├ warehouse(データ変換、クレンジング、メタ情報付与済みデータを定義)
  │   │   │   ├ fa_events_page_view.sqlx
  │   │   │   └ ...
  │   │   └ ...
  │   └ ...
  └ ...
      └ ...

概念図としては以下のようになっており、各種BIツールからはwarehouseまたはmartのデータのみを参照できるようにしています。

Firebase Analytics × BigQuery Export特有の事象についての対応方法

冒頭で挙げた以下の特徴について、どのように対応しているのかについて紹介します。

  • event_paramsがネスト構造になっている
  • event_paramsの型が動的(STRING、INT、DOUBLE、FLOATの4種類)
  • データ到着遅延がある(最大72時間程度)

event_paramsがネスト構造になっている

Firebase AnalyticsからBigQueryにエクスポートされるデータは、以下のようなスキーマになっています。

events_YYYYMMDD
├── event_date: STRING
├── event_timestamp: INT64
├── event_name: STRING
├── event_params: ARRAY<STRUCT<key STRING, value STRUCT<...>>>
├── user_pseudo_id: STRING
├── user_id: STRING
├── device: STRUCT<...>
├── geo: STRUCT<...>
├── app_info: STRUCT<...>
└── ...

この中でも重要なデータが格納されているが、扱いづらいのがevent_paramsです。これは以下のような構造になっています。

event_params: ARRAY<STRUCT<
  key STRING,
  value STRUCT<
    string_value STRING,
    int_value INT64,
    float_value FLOAT64,
    double_value FLOAT64
  >
>>

例えば、page_viewイベントのSTRING型であるpage_titleパラメータを取得するには、以下のようなクエリを記述する必要があります。

SELECT
  (SELECT value.string_value FROM UNNEST(event_params) WHERE key = 'page_title') AS page_title
FROM `dataset.schema.events_20260101`
WHERE event_name = 'page_view'

これに関してはstg層でフラットなテーブルに変換し、イベント毎にテーブルを分けて保存しています。

event_paramsの型が動的(STRING、INT、DOUBLE、FLOATの4種類)

event_paramsに関しては前述した通りARRAY型になっており、各値の型が動的になっています。

そのため、INT型だと思っていたら実はSTRING型でデータが送られてきており、うまくデータ取得できない、という問題が発生しかねません。

そこで以下のクエリをイベント単位で実行することで、このイベントのevent_paramsには、どんな値がどんな型で送信されてきているか?を確認することができます。

もちろん、ログ仕様は事前に定義して実装しているのですが、考慮漏れや実装ミスにより意図しないデータが送られている可能性もあるので、怪しいログデータなどがあった際にはこちらでチェックしています。

WITH base_table AS (
  SELECT
    eventParams.key AS key,
    max(eventParams.value.string_value) AS string_value,
    max(eventParams.value.int_value) AS int_value,
    max(eventParams.value.float_value) AS float_value,
    max(eventParams.value.double_value) AS double_value,
  FROM `PROJECT.DATASET.events_*`,
    UNNEST(event_params) AS eventParams
  WHERE 0 = 0
    AND _TABLE_SUFFIX >= FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL 7 DAY))
    AND event_name = 'YOUR_EVENT_NAME'
  GROUP BY key
)
SELECT *
FROM base_table
WHERE key NOT IN (
  'ga_session_id', 'ga_session_number', 'engaged_session_event',
  'firebase_screen_id', 'firebase_screen_class',
  'firebase_event_origin', 'firebase_event',
  'error_value', 'debug_event'
);

データ到着遅延がある

Firebase Analyticsは、イベントが発生してからBigQueryに到着するまで、最大72時間程度の遅延があります。そのため、単純なincremental更新では、遅延したデータが取り込まれません。

解決策として、過去N日分を毎回削除・再取得する方式を採用しています。

config {
  type: "incremental",
  ...
  columns: {
    ...
  },
  bigquery: {
    partitionBy: "meta_event_date",
    partitionExpirationDays: 4000,
    requirePartitionFilter: true
  },
}

js {
  const REFRESH_DAYS = 3; // 72時間遅延に対応するため
  const LOOKBACK_DAYS = 30
}

-- incrementalで実行する場合、過去REFRESH_DAYS日分を削除
pre_operations {
  ${when(incremental(), `
    DELETE FROM ${self()}
    WHERE meta_event_date >= DATE_SUB(CURRENT_DATE("Asia/Tokyo"), INTERVAL ${REFRESH_DAYS} DAY)
  `)}
}

SELECT
  ...
FROM
  ${ref("table_name")}
WHERE 0 = 0
  ...
  AND event_name = 'page_view'
  AND meta_event_date >= ${when(incremental(),
    // 増分更新時: 更新対象期間より前の最大日付を取得
    `DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL ${REFRESH_DAYS} DAY)`,
    // 初回実行時: 過去LOOKBACK_DAYS日分を取得
    `DATE_SUB(CURRENT_DATE('Asia/Tokyo'), INTERVAL ${LOOKBACK_DAYS} DAY)`
  )}

uniqueKeyを利用した MERGE(Upsert)を使えば良いのでは?

前述のコードを見て、DELETE→INSERTせずに、uniqueKeyを使ったMERGEを使えば良いのでは?と思った方もいると思います。

しかしここには罠があり、実際にMERGEで実装したところ、データ量が1.5倍〜2倍になってしまう、という事態が発生しました。

uniqueKeyを使う場合、「"event_date", "event_timestamp", "event_name", "user_pseudo_id", "event_bundle_sequence_id"」あたりのカラムをKeyに設定する必要があります。

冒頭でも述べた通り、Firebase AnalyticsからBigQueryへのExport処理はざっくり以下のようなフローになっています。 ここで、②で作られるデータと③で作られるデータに関して、event_timestampなどのカラムの値が微妙にずれることが分かりました。

デバイス
↓ ① ログを送信
Firebase Analytics
↓ ② リアルタイムにデータ同期(Streaming Exportで速報値が同期される)
BigQuery(events_intraday_yyyymmdd)
↓ ③ 1日1回、Firebase側でデータをクレンジング・確定させ、日次テーブルとして書き出す
BigQuery(events_yyyymmdd)

例えば、2026年1月1日の日中に②の処理で「events_intraday_20260101」テーブルにリアルタイムでデータ同期されていたとします。そのデータが③の処理で2026年1月2日に日次テーブルとして「events_20260101」に書き出されます。

この時の②、③のそれぞれの時点において、同じ日付データでも「event_timestamp」がずれることがありました。

その結果、前述したuniqueKeyを使ったMERGE処理を行うとデータ量が1.5倍〜2倍に膨れてしまう、ということが起きました。

よって現在は、仮に2日後に遅延して到着したデータがあっても拾えるように、DELETE→INSERT処理を行い、データの品質も担保するようにしています。

最後に

本記事では、Firebase AnalyticsのRAWデータの取り扱いや、Dataformを用いたアーキテクチャ設計についてお話ししました。

後日公開予定の後編では、Dataformの開発環境をGoogle Cloud上のマネジメントコンソールから、ローカルのVS Code with dev containe環境に移行し、快適に開発が行えるようになったことについて紹介しようと思います。

We Are Hiring

コネヒトではデータを用いてプロダクト・会社を成長させる機械学習エンジニアを募集しています!

興味のある方は以下よりご連絡お待ちしております! herp.careers

コネヒトにおける機械学習、データ周辺業務に関しては以下の記事で紹介していますので、合わせてご覧ください!

tech.connehito.com

スマイルリレーで繋ぐ学びの輪。マネージャーの私が「やりたいこと」を考え直して見つけたもの

こんにちは、コネヒトのさとやんです。

コネヒトには、私たちの技術や知見を外に発信する活動を支援する「スマイル制度」というちょっと素敵な仕組みがあります。今回は、そのスマイル制度から生まれた「スマイルリレー」というバトンについて、そしてそこから見えてきた「自分の欲求と会社のミッション」の意外な関係についてお話ししたいと思います。

以前書いたDifyのブログでは「AI導入の試行錯誤」というテクニカルな話をしましたが、今回はもう少し「人の内面」にフォーカスした、理想と現実のお話です。

「スマイルリレー」って? 学びを独り占めしない文化

まずは、ベースとなるスマイル制度について簡単にご紹介。 これは「技術コミュニティになくてはならない開発組織をつくる」ためのアウトプット支援制度です。 登壇やブログ執筆をすると「マイル」が貯まり、それをチームの共有資産として書籍や講座代に使えるという、アウトプットが次のインプットを生むサイクルを大切にしています。

このサイクルを、さらに「人の繋がり」でパワーアップさせたのが「スマイルリレー」です。弊社のエンジニアのaboyが考えてくれた仕組みです!

アウトプットでマイルを貯めた人が、「この本、すごく良かったから誰かに読んでほしいな」というものをマイルでプレゼントする。

プレゼントを受け取った人は、それをインプットしてまたアウトプットに繋げ、次の誰かへバトンを渡していく。

「誰かの学びを応援したい」という善意が連鎖していく、コネヒトらしい温かいリレーです。今回、私もこのリレーに参加し、一冊の本を贈ってもらいました。

届いたのは、自分の「本音」に向き合う一冊

私がバトンとして受け取ったのは、チームが自然に生まれ変わる「らしさ」を極めるリーダーシップ、内発的動機を探るためのガイド本でした。

本のメインメッセージは、「have to(やるべきこと)」を思い切って手放して、「want to(やりたいこと)」を軸に生きていこう!というもの。 「確かにそうだよなぁ」と納得する一方で、読み進めるうちにマネージャーとしての私は「うーん、でもこれって……」と、ちょっとした葛藤を感じるようになりました。

正直、マネジメントの現場では「難しい」ことも

本の内容は素晴らしいのですが、いざ日々の業務やメンバーとの向き合い方に当てはめようとすると、今までの経験からいくつか高い壁が思い出されます。

1. 「have to」をパッと捨てるのは実現できない時もある

本では「have to」を捨てよう!とありますが、組織の成果に責任を持つマネージャーとしては、やるべきことを完全にゼロにするのはなかなか現実的ではありません。書籍の中では外部委託や誰かに任せるという方法が提示されていましたが、これは組織の体制や予算などによっては、頑張っても実現できないことがあります。

2. メンバーの「want to」を引き出すのは、一筋縄ではいかない

「自分のやりたいこと」を探すのも大変ですが、他人のそれを引き出すのはもっと難しいです。 1on1でじっくり話しても、本にある通り人はどうしても「コンフォートゾーン(今の安心できる場所)」に居ようとします。

自分の深い本音を探るのは少し怖さもあるので、つい会話を流してしまう。

その場では盛り上がっても、忙しい日常に戻ると熱が冷めてしまう。

「理想はわかるけど、現実で動かすのってめちゃくちゃ難しい……」。これも以前のマネジメント経験から感じていた私は、「まずは自分がやってみるか」という結論に至りました。

自分の中の「好き」を棚卸ししてみた

まずは自分の中の「楽しい」「好き」を素直に書き出してみました。「have to」はすぐには無くせなくても、その中に「want to」を混ぜていければ最高ですよね。

私の「want to」リスト

  • 人を感動させたい!(「楽しい!」「面白い!」というポジティブな感情を届けたい)
  • 栄冠を勝ち取りたい!(難しい課題をクリアしたり、勝負事で一番を目指すワクワク)
  • 勝ちたい人を支えたい!(自分が出るだけでなく、目標を持つ人を支えるコーチのような役割、スポーツのコーチがイメージが近い)
「これって仕事の楽しさと同じだ」という発見

こうして書き出してみると、自分が普段「この仕事、面白いな」と感じている瞬間と重なる部分があることに気づきました。

  • メンバーの成長支援:目標に向かっている人を支え、その人が壁を越えた時の喜び!
  • ミッション攻略の作戦会議:難易度の高い課題に、「どうやって挑むか」を考えている時間は、まるでゲームの戦略を練っているような高揚感があります。
  • 採用チームでの活動:候補者の方の未来とコネヒトの未来をどう繋ぐか、その「実現方法」を模索すること自体が、私にとっての「want to」だった。

仕事だからやるのではなく、「攻略法を考えて、実現していくこと」そのものが、私のやりたいことだったんだ、と腑に落ちた瞬間でした。

会社のビジョンと、自分のwant toの重なる部分

最後に、この個人的な欲求を会社のビジョンと重ねてみました。コネヒトが大切にしているビジョン詳細には、こんな言葉があります。

「人の数だけ叶えたい未来があり、家族の数だけありたい姿があるはず」

この言葉の中には「人の叶えたい未来を、全力で応援する」という意味が含まれています。 それは、私が掲げた「何かを叶えたい人を支援したい」という欲求と重なる部分がありました。

その他に本を読みながら考えて気付いたこと:want toの「鮮度」

今回、本を読みながら自分と向き合う中で、もう一つ面白い気づきがありました。それは「want toは放っておくとhave toに化けることがある」し、「want toは時代とともに変わる」ということです。

例えば、私の筋トレの話。 もともとは「かっこいい体になりたい!」という純粋なwant toで始めたはずでした。 でも、いつの間にかそれが「健康のため、体型維持のためにやらなきゃいけない」というhave toに変わっていたんです。

昔は「かっこよくなりたい!」というwant toだったものが、今では「健康・体型維持」という形に変わっている。でも、この変化って、じっくり本を読んで振り返るまで自分でも気づかなかったんですよね。

自分のやっていることが「義務(have to)」になっていないか、今の自分にとっての本当の「やりたい(want to)」は何なのか鮮度を保つためにも、定期的な振り返りが必要だなと痛感しました。

今後について:自分を突き詰めることが、組織の力になる

本を読んで自分のwant toを再定義した今、これから意識していきたいことがあります。 それは、普段の業務において「良い意味で周りを気にしすぎず、自分のやりたいことを突き詰める」ことです。

一見すると「わがまま」に見えるかもしれませんが、自分のwant toが会社のミッションと重なっているのなら、自分が一番ワクワクする形で突き抜けることが、結果として会社や周囲のみんなへの最大の貢献になるはず。まずは私自身が、誰よりも楽しんでミッションを攻略する姿を見せていきたいと思っています。

最後に

コネヒトは今、さらなる成長のために事業も組織も変革が生まれるフェーズに入っています。 日々の忙しさに追われると、自分のやりたい事が薄れてしまい、何のために仕事をするのかを忘れがちになってしまうので この本で得た教訓を胸に自分のwant toを犠牲にせず、会社や一緒に働く仲間たちの成長を両立していきたいです。

そんなコネヒトでは、一緒に新たな事業の1ページを作っていく仲間を絶賛募集しているので ちょっとでも興味を持ってくれた方は、是非お気軽にご連絡ください!

herp.careers

最後まで読んでいただき、ありがとうございました。

Difyを使った問い合わせ回答 AI チャットBOT作成で見えた課題(ラスボスはデータ準備)

こんにちは、コネヒトのさとやんです。 私は社内でRun with Techという社内のAI導入やDX推進を行う活動を行っていました。 今年、その一環でAIを活用したチャットBOTをDifyで作成したので、その時の話をブログとして書きたいと思います。

また、こちらはコネヒトのアドベントカレンダーの23日目の投稿でもありますので アドベントカレンダーの他の記事も良ければご覧ください

Difyってなに?

Dify自体をご存知ない方もいらっしゃると思うので、簡単にDifyについて説明します。

Dify(ディファイ)とは、プログラミング知識がなくても、直感的な操作(ノーコード/ローコード)で生成AIアプリケーションを開発・運用できるオープンソースのプラットフォームです。

画像も貼っているので、そちらを見てもらえると分かりますが、Webの画面上で様々なパーツを組み合わせてAIを活用したアプリケーションを作ることができます。 それでは、ここから制作過程とその中で発生した課題について書いていきます。

プランA:最初に考えたAIチャットBOTの構成

この構成を考えた理由
  • エージェントブロックの活用: エージェントブロックにツールを設定し、MCPクライアントとMCPサーバーのような動作を実現できないかと考えました。
  • 1ブロックでの完結: 問い合わせたユーザーが具体的に何を知りたいのか、どんなキーワードで検索すればよいかをAIに自律的に考えさせることで、ひとつのブロックで処理が完結できると想定しました。
  • 検索先の振り分け: 社内の規定・マニュアルを検索すべきか、Web検索で一般的な情報を調べるべきかを、MCPクライアントがサーバーを選ぶようにAIに判断させられるか試したい意図がありました。
  • Notionを直接参照させればデータの最新化に追従できるのでAIが参照するデータ更新の必要性がなくなる
結果

下記の通り上手くいかなかったため、断念しました。

  • エージェントブロックの精度不足

    • 最初から用意されているエージェントブロックやプラグインの「エージェンシー戦略」では、期待していた賢さには程遠い結果となりました。
    • エージェントブロックを使用するには「エージェンシー戦略」の設定が必須なためプラグインで追加しましたが、プロンプトの指示を実行しなかったり、精度が低かったりしました。
    • 具体例:
      • 質問から検索用キーワードを抽出させようとしたが、質問内容と異なるキーワードを作成してしまう。
      • 内容に応じて「Notion検索」か「Web検索」かを判断するようプロンプトを記述したが、検索自体を行わずに回答してしまう。
  • 評判と代替案の失敗

    • 調査したところ、デフォルトのエージェントブロックは評判があまり良くなく、私と同様に精度の低さを指摘している人もいました。
    • 代替案として紹介されていた「エージェントブロック自体を自作し、そのAPIã‚’HTTPリクエストブロックで叩く」という方法も試しましたが、こちらも思考やアクションの精度が安定しなかったため、一旦不採用としました。
  • Notion連携の課題

    • 参照させるNotion DB内のページ構造がバラバラだったため、うまく検索ができませんでした。その結果、問い合わせ文からのキーワード抽出や作成が安定せず、取得してほしいページが検索結果に出てこない事象が発生しました。
    • DBプロパティの問題:
      • キーワードに合致する情報が含まれているはずのNotion DBページが、なぜか検索できないケースがありました。
      • 調査の結果、NotionツールのDB検索機能では「ページ本文」の内容は検索できるものの、「プロパティに登録されている情報」は検索対象外であることが発覚しました。(カスタマイズ要素であるプロパティの有無に合わせて実装されていない点は納得できますが、検索できないのは痛手でした)

プランB(採用した方法):Difyのナレッジを使う方式

これを作っていた当初では最も確実な方法を採用しました。Notion上にある規約やマニュアルをDifyの「ナレッジ」に取り込み、知識検索ブロックで「ハイブリッド検索(全文検索+ベクトル検索)」を行う設定です。 この検索結果を、AIエージェントを設定しているLLMブロックに渡して回答させる方式が、最も精度の高い回答を得られました。

作成過程

1. Difyナレッジの作成

全工程において最も大変だった作業です(正直、もうやりたくありません)。 前述の通り、ページ構成のばらつきや、DBプロパティ上の情報はナレッジに取り込めないという課題がありました。これを解消するために、以下の作業が必要となりました。

  1. Notion DB内ページのプロパティに書かれている情報を、ページ本文に転載する。
  2. ナレッジに取り込みやすく、AIが理解しやすい構造に整形する。

1に関しては、もはや手作業でやるしかありませんでした。対象のNotion DBの中から該当ページを見つけ、ひとつずつ修正していきました。

2に関して、すべて手作業で行うのは気が遠くなる作業でしたが、幸いナレッジに取り込みたいページがすべてNotion DBページだったため、Zapierを使って以下のように半自動化しました。

これで「ナレッジ取り込み専用のNotion DB」を作成し、これをDifyへ連携させました。あとはDify上でナレッジを作成する際にこのDBを指定し、チャンクを設定して取り込みを実施しました。

  • 親子分割モード(親): 設定を「全文」にしています。規約やマニュアルはページ全体でひとつの意味を成すデータであるため、後述する検索処理において文章全体を見て検索を行えるようにするためです。
  • 子チャンクの設定: 改行2つで段落の切れ目としてある程度うまく分割できたこと、また「512 char」であれば規約やマニュアルのひとつの段落が収まる範囲だったため、この設定にしました。

2. 検索設定(ハイブリッド検索)

検索設定には、全文検索とベクトル検索を両方使う「ハイブリッド検索」を採用しています。 質問文に含まれるキーワードを「全文検索」で拾いつつ、質問内容の意味合いを「ベクトル検索」で補完する組み合わせが、最も精度良く検索できました。

Rerankモデルは以下の動作を行い、ハイブリッド検索の長所を活かしてくれるため採用しています。

  1. キーワード検索で候補を探す。
  2. ベクトル検索で候補を探す。
  3. 「ユーザーの質問に最も適切なのはどれか?」を改めて採点し直し、並べ替える。

これにより、「キーワードは合っているが内容は無関係」といったノイズを除去し、本当に役立つ情報だけを回答に使うことができます。

  • トップK: 「検索結果の上位何件をAIに読ませるか」の設定です。数値が多すぎるとノイズとなるデータが増えてしまうため、実際に動かして検索ヒット数を見ながら調整しました。
  • スコア閾値: ヒットしたデータの「関連度がどれくらい高ければ参照させるか」の設定です。当初は設定していましたが、適切なデータであってもスコアが低く判定されることがあったため、途中で設定を外しました。
3. LLMブロック

こちらは特別な設定はしておらず、プロンプトもシンプルな内容です。 ごく稀に英語で回答することがあったため、日本語での回答を強制するとともに、ハルシネーション(嘘の回答)を防ぐため、「回答に使えるデータが見つからない場合は回答しない」という指示をプロンプトに含めています。

あなたは、社内規定に関する質問に答える優秀なAIアシスタントです。 提供されたコンテキスト(ナレッジ)に書かれている情報のみを使い、ユーザーの質問に日本語で 回答してください。 もしコンテキストに答えが書かれていない場合は、無理に答えを作らず 「その情報は見つかりませんでした。」と回答してください。

以上の手順と構成で、Difyを使ったAIチャットボットが作成できました。 実際の回答画面には社内規定などの情報が含まれるため画像は載せられませんが、試験版としては問題ない精度で回答してくれています。最後に、実際に作ってみて感じたことや課題をまとめておきます。

感じたことや課題

  • AIが参照するデータの原本が直接利用できない場合や、AIが理解しづらい構造の場合は、どうしても「参照用データ」を別途作成する必要があります。その場合、そのデータを最新化するための仕組み作りも必要になります(今回は試験的なものだったので作っていません)。
  • AIツールの作成において、参照データの準備(整形など)に手作業が発生する場合もあり、この「データ準備」が最も大変な作業であり壁となります。
  • どういったデータをどんなセグメント(塊)で参照させたいかによってチャンク設定が変わるため、作りたいものや期待する結果に合わせて適切なチャンク設計をすることが重要です。
  • ナレッジに追加データを投入した際、新規作成時のチャンク設定を引き継がずに取り込んでしまう仕様があるため、意図せず不適切なチャンク構造のデータが混入することがあります。
  • 現状のエージェントブロックは、Dify上でゼロから自作してもまだ安定性に欠ける印象です。LLMブロックで事足りる要件であれば、LLMブロックを使うのが無難だと感じました。

以上がDifyを活用したAIチャットBOT作成過程と課題でした。 データ準備が大変ですが、技術的には難しいものではありません。 今後は継続的な運用を見据えて、AIが参照するデータの最新化の仕組みを整えていきたいと思います。 ここまで読んでいただき、ありがとうございました。

【iOS】URLSessionWebSocketTaskを用いたリアルタイムチャット機能の実装パターン

本記事はコネヒト Advent Calendar 2025の17日目のエントリーになります。

adventar.org

こんにちは、iOSエンジニアのyoshitakaです!

この記事ではAdvent Calendar 2025の13日目のエントリー「API Gateway WebSocket API と Lambda で作る、ママリのリアルタイムチャット機能(サークル機能)を支えるサーバーレスなインフラ設計」で紹介された、ママリのリアルタイムチャット機能について、iOSアプリ開発の視点で知見を共有したいと思います。

tech.connehito.com

はじめに

ママリの「サークル機能」とは、出産育児に関するさまざまなテーマのサークルに参加し、ユーザー同士がチャット形式でコミュニケーションを取れるというものです。

ママリではQ&A形式でユーザー同士がやり取りできる機能を提供していますが、このサークル機能ではよりリアルタイム性を重視しました。

「API Gateway WebSocket API と Lambda で作る、ママリのリアルタイムチャット機能(サークル機能)を支えるサーバーレスなインフラ設計」で詳しく紹介されていますが、リアルタイムチャット機能にはWebSocketを使っています。

WebSocketを使ったリアルタイム通信をSwiftでどのように実装しているのか、具体的なコードとともに紹介します。

WebSocketをiOSアプリで使うには

iOS13以降ではURLSessionWebSocketTaskを実装することで実現できます。

developer.apple.com

WebSocketの接続と送受信周りの処理を紹介します。 なお、今回のサークル機能では常に1つのコネクションのみを使用する仕様のため、紹介するコードも複数接続は考慮していません。

ハンドシェイク(接続)部分

URLSessionでwebSocketTaskを生成しresumeすることで接続が開始できます。

webSocketTask(with:) | Apple Developer Documentation

func webSocketTask(with request: URLRequest) -> URLSessionWebSocketTask

今回の設計では接続はサークルごとに確立しています。サークルIDをリクエストに含めてwebSocketTaskを作成します。

func setup() {
        let request = WebSocketRequestProvider.createRequest(circleId: circleId)
        let session = URLSession(
            configuration: .default,
            delegate: self,
            delegateQueue: nil
        )
        socket = session.webSocketTask(with: request)
        socket?.resume()
    }

接続の結果はURLSessionWebSocketDelegateで受け取ります。

URLSessionWebSocketDelegate | Apple Developer Documentation

ハンドシェイクが成功した場合urlSession(_:webSocketTask:didOpenWithProtocol:)が呼ばれるので、接続状態のステートを更新します。

※ 異常系は接続管理部分で書きました

extension CircleChatWebSocketService: URLSessionWebSocketDelegate {
    nonisolated public func urlSession(
        _ session: URLSession,
        webSocketTask: URLSessionWebSocketTask,
        didOpenWithProtocol protocol: String?
    ) {

        // 接続成功
        Task { @MainActor in
            connectionState = .connected
            resetReconnectAttempts()
        }
    }

受信部分

無事接続が確立できると、次はWebSocketから値が流れてくるようになります。

値の受け取りはwebSocketTaskのreceive()を使用します。

注意点として、このメソッドは再帰的に呼ぶ必要があります。

func receive() async throws -> URLSessionWebSocketTask.Message

https:// https://developer.apple.com/documentation/foundation/urlsessionwebsockettask/receive()

completionHandler版の方には解説があります。

receive(completionHandler:) | Apple Developer Documentation

WebSocketから受け取る値は、String or Dataの形になります。

startReceivingは初回接続時以外にも接続が切れた際の再接続時には常に呼び出す形になります。

    private func startReceiving() {
        stopReceiving()

        receiveTask = Task { [weak self] in
            while !Task.isCancelled {
                do {
                    guard let webSocket = self?.socket else { break }

                    let message = try await webSocket.receive()

                    switch message {
                    case .string(let string):
                        self?.handleTextMessage(string)

                    case .data(let data):
                        self?.handleBinaryMessage(data)

                    @unknown default:
                        break
                    }
                } catch {

                    // エラーハンドリング
                    if !Task.isCancelled, self?.connectionState == .connected {
                        try? await Task.sleep(for: .seconds(1))
                        continue
                    }
                    break
                }
            }
        }
    }

    private func stopReceiving() {
        // 受信待機停止
        receiveTask?.cancel()
        receiveTask = nil
    }

送信部分

値の送信はwebSocketTaskのsend(_:)を使用します。

func send(_ message: URLSessionWebSocketTask.Message) async throws

send(_:) | Apple Developer Documentation

completionHandler版の方には解説があります

send(_:completionHandler:) | Apple Developer Documentation

送信前には接続を確認しておき、接続が切れている場合は再接続をするように処理を入れました。

    public func send(request: CircleWebSocketRequest) async throws {
        guard connectionState == .connected else {
            switch connectionState {
            case .disconnected:
                // リクエスト送信前に接続を確認、切断されている場合は再接続トライ
                connect()
                throw WebSocketError.notConnected
            case .connecting:
                throw WebSocketError.connecting
            default: return
            }
        }

        let dataString = try request.createDataString()

        try await socket?.send(.string(dataString))
    }

この送信処理では、REST APIとは異なりレスポンスを受け取りません。 つまり、送信したデータをサーバー側が受信したかどうかは別途確認応答を送ってもらう必要があります。

サークル機能ではテキストや写真を投稿できるため、投稿の成功・失敗をユーザーにフィードバックする必要があります。

そこで、すべての送信データにクライアント側でリクエストIDを発行し、そのIDを確認応答で受け取ることで成功・失敗を判定するようにしました。

接続管理

接続が確立してから一定期間送受信がないと接続がタイムアウトしてしまいます。 タイムアウトの時間はサーバー側の設定次第ですが、意図しないタイムアウトを避けるため、pingを一定間隔で送るようにします。

sendPing(pongReceiveHandler:) | Apple Developer Documentation

    func setupPingTimer() {
        pingTimerCancellables.removeAll()

        Timer.publish(every: pingInterval, on: .main, in: .common)
            .autoconnect()
            .sink { [weak self] _ in
                self?.ping()
            }
            .store(in: &pingTimerCancellables)
    }

    func ping() {
        // ping送信前に接続状況を確認、切断されている場合は再接続をトライ
        if connectionState == .disconnected {
            connect()
        }

        socket?
            .sendPing(pongReceiveHandler: { error in
                // 必要に応じてエラーハンドリング
            })
    }

また、意図しない切断が発生した場合は、再接続を行います。

接続が切断される場合は、接続成功時と同じくURLSessionWebSocketDelegateにて切断理由とともに値を受け取ることができます。

urlSession(_:webSocketTask:didCloseWith:reason:) | Apple Developer Documentation

また、電波遮断等が原因の切断を検知できるようにURLSessionTaskDelegateのdidCompleteWithErrorの実装も追加しておくと安心です。

urlSession(_:task:didCompleteWithError:) | Apple Developer Documentation

意図した切断以外は再接続をトライ(遅延を入れて数回行う)するようにしました。

    // サーバーからの切断
    nonisolated public func urlSession(
        _ session: URLSession,
        webSocketTask: URLSessionWebSocketTask,
        didCloseWith closeCode: URLSessionWebSocketTask.CloseCode,
        reason: Data?
    ) {
        let reasonString = reason.flatMap { String(data: $0, encoding: .utf8) } ?? "No reason"

        Task { @MainActor in
            let wasConnected = connectionState == .connected
            connectionState = .disconnected
            socket = nil

            if wasConnected && closeCode != .normalClosure {
                // 接続状態からの異常切断、再接続を試みる
                attemptReconnect()
            }
        }
    }

    // 電波断絶やタイムアウトなど
    nonisolated public func urlSession(
        _ session: URLSession,
        task: URLSessionTask,
        didCompleteWithError error: Error?
    ) {
        guard let error else { return }

        // 接続失敗時の再接続
        Task { @MainActor in
            let wasConnecting = connectionState == .connecting
            connectionState = .disconnected
            socket = nil

            if wasConnecting {
                attemptReconnect()
            }
        }
    }

おわりに

URLSessionWebSocketTaskを使った具体的なリアルタイムチャット機能の実装方法について紹介しました。

WebSocketを使ったデータのやり取りにおけるリクエスト成功/失敗の管理は想定よりもクライアント側でやることが多く大変でした。

リアルタイムチャット機能の開発ではUI側の実装で苦労した部分も多かったので、別の記事で紹介できればと思います!

API Gateway WebSocket API と Lambda で作る、ママリのリアルタイムチャット機能(サークル機能)を支えるサーバーレスなインフラ設計

本記事はコネヒト Advent Calendar 2025の13日目のエントリーになります。

adventar.org

こんにちは、コネヒトでインフラエンジニアをしております @sasashuuu です!

私ごとではありますが、一年前くらいに小田原へ引っ越しまして、自然に囲まれたのどかな生活を送っております。

地方の良さをしみじみと感じている今日この頃です!

はじめに

さて、本日は2025年にリリースされたママリのリアルタイムチャット、その名も「サークル機能」に関するインフラ設計に関する記事をお届けします。

※以下は最近の開発環境のキャプチャです。本機能のイメージを掴んでいただければと思います。

開発環境の UI

サークル機能の施策背景や詳細なコンセプト、アプリケーションの実装側に関する内容については、本記事では割愛させていただきますのでご了承ください。

また、メインで利用しているクラウドプラットフォームが AWS となりますので、AWS を用いた内容が中心となります。

これから AWS を用いたリアルタイム性の高いアプリケーションを作る方のご参考となれば幸いです。

アプリケーション開発の要件・想定するワークロード

複数人が参加する空間でのリアルタイムチャットを既存のアプリケーション実装していくというのがざっくりとした要件となります。悩めるママさん同士のつながりや、継続的な交流をより強く意識していくことがテーマとなっていたので、従来の掲示板形式のコミュニケーションからアップデートしていく必要がありました。

ワークロードに関しては、結論として正確に見積もるのが難しいという状況でした。そのため、PdM 主導のもと、一定の KPI(メッセージ投稿数、UU 投稿数等)を定める形とし、その指標を前提として耐えうる設計を進めていく形となりました。また、考慮すべき変数などはいくらかありつつも、開発時点のママリ登録者数約400万を最大値とし、既存のアクティブユーザー数に耐えうる基盤も視野に入れる必要がありました。

アーキテクチャ

はじめに全体のアーキテクチャをお見せします(※既存のママリのインフラに関しては一部を除き基本的に割愛しています)。

全体のインフラアーキテクチャ

今回、サークル機能のために採用した主要なマネージドサービスは以下となります。

  • API Gateway WebSocket API
  • Lambda
  • RDS Proxy
  • SNS(Simple Notification Service)

終端に API Gateway の WebSocket API(※以下、WebSocket API と称します)を置いて WebSocket 通信を行い、チャット機能そのものに関する API サーバーやデータストアには Lambda と DynamoDB を利用しています。データストアに一部ママリの既存データ(ユーザーの属性データ等)を保持している Aurora MySQL や Elasticache Redis を利用する必要があり、Aurora では特に負荷に対する懸念の対策を打ちたかったため、コネクションプールとして RDS Proxy を置いてデータベースへのコネクション数による負荷を軽減するという構成になっています。また、Push 通知には SNS を利用しています。

実際の処理を抜粋して解説すると以下のようなフローです(クライアントA、B がメッセージを送受信する例)。

接続時

  1. クライアント A、B から WebSocket API へ接続リクエスト
  2. Lambda Authorizer による認可処理(ステートフルのため認可処理はこのタイミングのみ)
  3. WebSocket API によるコネクションの確立
  4. WebSocket API への接続リクエスト成功をトリガーに Lambda が起動、コネクション確立時の処理の実行
    • DynamoDB へコネクション ID の保存等

メッセージ送受信時

  1. クライアント A から WebSocket API へメッセージ送信リクエスト(API サーバー向け)
  2. WebSocket API から Lambda へのルーティング、メッセージの処理
    • DynamoDB へメッセージの保存
    • DynamoDB からコネクション ID の取得
    • 対象のコネクション ID をもとに WebSocket API へメッセージ送信リクエスト(クライアント向け)
  3. クライアント B は WebSocket API を通じてメッセージを受信

切断時

  1. クライアント A、B によるコネクションの切断
  2. コネクション切断をトリガーに Lambda が起動、コネクション切断時の処理の実行
    • DynamoDB からコネクション ID の削除

※他にもユーザー同士のブロックに関する除外処理があるなど、もう少し細かな実装になっていますがここでは割愛しています。

少し補足をしておくと、WebSocket の接続そのものは WebSocket API を通じて、クライアントと WebSocket API 間で完結しており、具体のビジネスロジック(メッセージ送信等)は Lambda が担っているというイメージです。Lambda が接続中のクライアントに向けて REST API を WebSocket API へリクエストし、WebSocket API を経由してメッセージがクライアントに届くという仕組みになっています(ただし、接続に関するコネクション ID の管理という意味ではある種 Lambda も WebSocket の接続に関わってはいるかなと思います)。図解すると以下のようなイメージです。

WebSocket を用いた処理の補足

技術選定の背景

ここからは技術選定について解説をしていきます。データストア、WebSocket サーバー、API サーバーなどの観点から分けて解説します。

データストア

DynamoDB を採用しました。ママリで利用していた既存の Aurora MySQL に相乗りする形の案もあがっていました。社内で馴染みのある RDB を用いた方が開発速度が出せること、コンピューティングリソースの使用状況とリリースの初期フェーズで想定するワークロードを加味すると既存の RDB でも耐えられないことはないだろうという見解となっていました。しかし、書き込みに対する水平スケーリングへの課題感、他のサブシステムやプロダクトからも利用される共用の RDB であるという影響範囲の広さ、中長期で見た将来的な可用性やスケーリング、移行コストを考えるとこの時点で NoSQL をベースに作り込んでおいた方が良いだろうという結論となり、Aurora MySQL の採用は見送る結果となりました。

WebSocket サーバー

API Gateway の WebSocket API を採用しました。こちらは観点を分けて説明します。

そもそもの通信方法に関する観点

ポーリング形式で HTTP 通信を採用する案もあがっておりました(既存のママリの API である CakePHP に相乗りする形で実装するという方針)。しかし、ユーザー体験(メッセージ送受信の遅延)や将来を見据えた機能の拡張性(サーバー負荷等)を踏まえ、あらたに WebSocket 通信を採用し、最適な実行環境を選定していくという方針となりました。

どのように WebSocket 通信を実現するかの観点

社内に WebSocket を用いたプロダクト開発の知見が豊富ではなかったため、なるべく WebSocket に関する関心ごとの負担を減らし、プロダクト開発へよりフォーカスできるようにすることなども加味し、WebSocket API を採用しました(もちろん WebSocket に対する技術的好奇心はメンバー間で話題にあがっており、WebSocket そのものにフォーカスした技術検証なども楽しんで取り組んでいました)。

今回のケースで WebSocket API を利用することのメリットはざっくりと以下の点です。

  • WebSocket サーバーそのものの管理が不要
    • コンピューティングリソースの消費やスケーリング、可用性観点での懸念がない(※クォータの制限等は除く)
  • WebSocket の接続管理が不要
    • 既に用意されているルート(\$connect、\$disconnect)を元に簡単に WebSocket の接続・切断のハンドリングが可能
    • ルーティング先の Lambda のビジネスロジックの実装に集中できる

余談として他にも WebSocket 接続を抽象化してくれるマネージドサービスとして、AppSync や Cloud Firestore(データストアも含めた採用案) などの案もあがっていましたが、社内での採用実績がなかった GraphQL(※AppSync は GraphQL がベースとなる)のキャッチアップコストや、Firestore のコスト面の懸念の理由により、どちらも採用には至りませんでした。

API サーバー

Lambda を採用しました。既存のママリの API サーバー(CakePHP と Fargate)を使う案もありましたが、以下の点で見送り、Lambda を採用する方針となりました。

  • 常駐型のアプリケーションよりイベント駆動のようなアーキテクチャが適している
  • コスト面
  • WebSocket API に併せたサーバーレスな構成
  • 可用性のためのスケーリング速度

ちなみにランタイムは Go が採用されました。Lambda では Go のマネージドランタイムが非推奨となっていますが、OS 専用ランタイムを用いてバイナリの実行環境を用意すれば問題なく稼働することができるため、問題なく採用することができました。

その他のサブシステム

Push 通知には SNS を利用しています。SNS を用いた Push 通知自体は既にママリでの利用実績があるため、あえて他の選択肢を取るという判断にはなりませんでした。既に運用中である現行のPush 通知基盤に相乗りする形となりました。

コスト面について

本記事では詳しく解説しておりませんが、今回採用したマネージドサービスは、今回のようなケースではコストパフォーマンスに優れていると感じます(少なくともリリース初期〜安定するまでなど)。特に WebSocket 接続の部分に関しては、利用者数がはっきりと見込めず、ワークロードが安定しないようなシチュエーションであれば WebSocket API を軸に置いたアーキテクチャから始めるという選択肢は有力かと思います。もちろんトレードオフではあると思うので、初めから多額のコストがかかることが分かっている、開発工数や運用にそれなりのコストをかけられるという場合は WebSocket に特化したマネージドサービス以外の選択肢もあるかなと思います。また、データストアで採用した DynamoDB に関しては、オンデマンドとプロビジョニングといった料金体系の使い分けもありますので、まずはオンデマンドから始め、傾向が見えてきたらプロビジョニングへ切り替えるといった形で段階を踏むことでコスト対策を打っていけるかと思います。

おまけ

Distributed Load Testing を用いた負荷試験も行いました。本記事では詳細や使い方などは割愛させていただきますが、過去に弊社のテックブログでも取り上げられておりますので、詳細や使い方等気になる方はそちらもご覧ください。

tech.connehito.com

どの程度耐えうるのか限界性能試験を行いたかったのですが、API Gateway のクォータ制限に引っかかってしまい、ベストエフォートでの実施となりました。可能な範囲で行えた試験(クォータ制限の影響が出る前のテスト)を一部ご紹介します。

ざっくりとしたテスト内容

  • Distributed Load Testing 側の各種設定
    • TASK COUNT(負荷試験リクエスト元の Fargate タスク数):5
    • CONCURRENCY(タスクごとの同時接続数):100
    • Ramp Up(CONCURRENCY に達するまでの時間):5m
    • Hold for(CONCURRENCY の持続時間):1m
  • jmx ファイルのシナリオ
    1. コネクション接続
    2. 単調なメッセージ送信リクエスト(例:「これは負荷試験のテストです」等)
    3. Ping/Pong
    4. 1000ms 秒待機
    5. メッセージ削除リクエスト
    6. コネクション切断

結果

テスト項目 結果
リクエストの総数 434,648
成功したリクエストの総数 434,635
エラーの総数 13
秒間平均リクエスト 1,210

上記の試験では、99.99%以上の成功率となり、安定した稼働となっていました。

おわりに

WebSocket API と Lambda のクォータ(同時実行数等)や、Lambda に関するコールドスタートのパフォーマンス等については状況により気にする必要がありそうですが、基本的にはサーバーレス構成の恩恵を受けているので、コンピューティングリソースや可用性、スケーリング等に関する懸念が少なく済んでいます。また、リアルタイムチャットなどの開発における実績や知見が社内になかったことも相まって、皆で協力して意見を出し合い調査を行ったり、AWS ソリューションアーキテクトの方への技術相談を行ったりと開発のプロセス自体も価値のある取り込みだったと思います!以降のアドベントカレンダーではアプリケーション側の実装に関する記事も公開予定ですのでお楽しみに!