Data Contractに向けたProtocol Buffersの調査

背景: データ品質を担保するにはデータソースの品質が重要

私はデータエンジニアをしており、DWHやデータマートのデータ品質について考えることが多い。BigQueryなどにデータが取り込まれた後のレイヤリングやテスト、改善に向けたデータ品質の可視化について、以前発表した。

データが取り込まれた後の整理は進んでいるものの、やはりデータソースの品質が重要であることは言うまでもないだろう。

  • データソースの可用性が低ければ、後段の可用性はそれ以上にはできない
  • データソースのdescriptionが不十分であれば、後段のデータ活用は不正確になる場面が出てくる
  • データソースにカラム名の変更、型の変更、カラムの削除など、データ基盤チームに連携がなくスキーマに破壊的な変更があると大抵事故の原因になる
  • 値の取り得る範囲などが明示されていないと、DWHのビジネスロジックが正確に書き切れない

など、データソースの品質が担っている役割は大きい。

データソースの品質を担保する手段としてのData Contract

データソースに色々注文を付けたくなるのはデータエンジニアあるあるだが、一方でそれは暗黙的な期待になっていることも多い。ここでは、データソースを提供している側をデータ生産者、データソースを利用してDWHやデータマートを作っている側をデータ消費者と呼ぶことにする(データ消費者にはアナリストを含めて考えてもよい)。データ生産者の立場としては

  • ただの調査のためのログで出力しているデータをそんな大事な用途で使わないでよ...
  • 可用性の担保って言われても、分析のためだけに動いてるわけじゃなくて、プロダクト自体の可用性の担保とかもあるんだよ...
    • セキュリティやQAなどの文脈でもシフトレフト叫ばれていて、やらないといけないことはどんどん増えてるんだよ
  • 「XXXのデータが欲しい」って言われたからデータ出力するようにしたけど、監査ログ見たら全然見てないじゃん...
    • 見てないなら認知負荷下げるためにも消したいのに「いつか見るかもしれないから」って削除にはなかなか協力してくれないし...
  • 「鮮度が高いデータが大事!」っていうけど、それを使ったダッシュボード一日に何回見てるの?
    • dailyからニアリアルタイムに近づけるために運用どれだけ大変になるか分かってるの?
  • 色々データ出してるけど、分析でちゃんと価値出せてるの?
    • 工数をかけるに値するROIは出てるの?

などなどの心境になることもあるだろう*1

このように期待値が擦り合っていなかったり、暗黙的になっている場合、データ品質の改善は難しい。また、データエンジニアが作ったデータをCRMなどにData Activationする機会なども増えており、この場合はデータエンジニアがデータ生産者側になる。データエンジニアとCRMの活用者(データ消費者)とのコミュニケーションや期待値合わせが大事になってくる。

こういったことを背景に、近年Data Contractと呼ばれる枠組みがよく聞かれるようになってきた。詳細については、以下の書籍を読んで欲しい。

この本ではData Contractを以下のように定義している。

A data contract is an agreed interface between the generators of data and its consumers. It sets the expectations around that data, defines how it should be governed, and facilitates the explicit generation of quality data that meets the business requirements.

この定義には以下の4つの原則が含まれている。

  • An agreed interface between the generators of data, and its consumers
  • Setting expectations around that data
  • Defining how the data should be governed
  • Facilitating the explicit generation of quality data

Data Contractではチーム間の協力の体制や進め方などは当然重要なトピックになってくるが、それについては本書を読んでもらうとして、今回はData Contractを結ぶ表現方法としてどういうフォーマットややり方がいいかについて、自分で考えたことをまとめてみたい。

Data Contractの表現方法の一つとしてのProtocol Buffers

Data Contractは特定のツールやプラットフォームに依存した考え方ではないので、ぶっちゃげ何でcontractを結んでもよいが、実際にやるとなると

  • Protocol Buffers
  • Avro
  • Json Schema
  • yaml

などに帰着することが多い。では、どれを選ぶかという話になるが、私が所属している組織ではgRPCがWeb APIとして採用されており、Protocol Buffersでスキーマを定義する(データストアとしてはFirestoreを採用している)。

開発者としてはスキーマ言語をいくつも覚えたり、複数箇所で記述しないといけないのは大変であるから、Protocol BuffersをベースにData Contractをやる方法について考えてみる。なお、実際に組織としてやることが決定したわけではなく、仮にやるとなった場合にはどうできるか、の思考実験的な側面でこのエントリを書いている。

Data ContractとしてProtocol Buffersを使う

題材があったほうが分かりやすいため、何かしらのユーザー情報を扱うprotoを考えてみよう。例えばこういった感じである。

syntax = "proto3";

import "descriptor.proto";

enum UserRank {
  USER_RANK_UNSPECIFIED = 0 [(custom_options.enum_description) = "defaultのランク"];
  USER_RANK_TRIAL = 1 [(custom_options.enum_description) = "トライアル中のユーザーランク"];
  USER_RANK_FREE = 2 [(custom_options.enum_description) = "無料ユーザーのランク"];
  USER_RANK_PREMIUM = 3 [(custom_options.enum_description) = "プレミアウムユーザーのランク"];
}

message User {
  option (custom_options.message_description) = "ユーザーの情報を保持します";
  option (custom_options.owner_team) = "team-A";

  int32 id = 1 [
    (custom_options.field_description) = "ユーザーのIDです",
    (custom_options.unique) = true,
    (custom_options.not_null) = true
  ];
  string nickname = 2 [
    deprecated = true,
    (custom_options.field_description) = "ユーザーのニックネームです。現在は使われていないので、nameを代わりに使ってください"
  ];
  string name = 3 [
    (custom_options.field_description) = "ユーザーの名前です",
    (custom_options.not_null) = true
  ];

  UserRank rank = 4 [
    (custom_options.field_description) = "ユーザーのランクです",
    (custom_options.not_null) = true
  ];
  string email = 5 [
    (custom_options.field_description) = "ユーザーのメールアドレスです",
    (custom_options.not_null) = true,
    (custom_options.contains_pii) = true
  ];
}

プログラムが読み書きできる人なら、まあ見たままという感じだろう。

あえて触れる点としては、カスタムオプションの存在だろうか。各フィールドのdescriptionをコメントで書くこともできるが、コメントで記載してしまうと後からコードで触れない情報になってしまう。カスタムオプションとして定義しておけば、データ消費者も触れる情報となり、料理がしやすい。ここでは、分かりやすさのためだけにdescriptionなど簡単なものしか書いていないが、Data Contractに必要な情報は同じ要領で書いていけばいい。

Protocol Buffers自体にはfield_descriptionという定義はなく、自前のカスタムオプションとして用意する必要がある。例えばこういう形である。

syntax = "proto3";

package custom_options;

import "google/protobuf/descriptor.proto";

extend google.protobuf.MessageOptions {
  string message_description = 50000;
}

extend google.protobuf.MessageOptions {
  string owner_team = 50001;
}

extend google.protobuf.FieldOptions {
  string field_description = 50000;
}

extend google.protobuf.FieldOptions {
  bool contains_pii = 50001;
}

extend google.protobuf.FieldOptions {
  bool unique = 50002;
}

extend google.protobuf.FieldOptions {
  bool not_null = 50003;
}

extend google.protobuf.EnumValueOptions {
  string enum_description = 50000;
}

唐突に出てくる50000という謎の数字があるが、自前で拡張したフィールドは公式のものと被らないように50000から99999を使うことが推奨されているようだ。詳しくはyuguiさんのエントリを見るとよいだろう。

Protocol Buffers自体は特段Data Contractのためのものではないが、カスタムオプションの存在により、Data Contractの用途への拡張性を備えた存在になっている。また、データ生成者が型を書いていくその現場でData Contractに関する情報を記載できるため、SSoTを実現しやすいというのも重要である。

カスタムオプションのより強力な例としては、validateの情報を持たせ、その情報を使って実際にvalidateする拡張などがあった。特にPub/Subの用途などでは便利そうである。

データ生産者によってProtocol Buffersで記述された情報をData Contractとしてデータ消費者がコードから取り扱いたい場合、以下のようにコマンドを打つとuser_pb2.pyのようなデータ消費者側が扱いやすいコードが自動生成される。この例ではpythonファイルを出力させているが、Protocol Buffers自体は特定の言語に依存しているツールではないため、GoなりRubyなりデータ消費者側の得意な言語を使えばいいだろう。

protoc --python_out=. user.proto descriptor.proto

自動生成されたuser_pb2.pyと後述するちょっとしたpythonスクリプトを書いてあげれば、例えばdbtで使うデータソースのyamlに対して、Data Contractのメタデータをリッチに載せた状態で自動生成することができる。

version: 2
sources:
  - database: my_project
    schema: my_dataset
    tables:
      - name: User
        meta:
          owner: team-A
        description: ユーザーの情報を保持します
        columns:
          - name: id
            type: INT64
            data_tests:
              - not_null
              - unique
            description: ユーザーのIDです
          - name: nickname
            type: STRING
            meta:
              deprecated: true
            description: ユーザーのニックネームです。現在は使われていないので、nameを代わりに使ってください
          - name: name
            type: STRING
            data_tests:
              - not_null
            description: ユーザーの名前です
          - name: rank
            type: INT64
            data_tests:
              - accepted_values:
                  quote: false
                  values:
                    - 0 # USER_RANK_UNSPECIFIED (defaultのランク)
                    - 1 # USER_RANK_TRIAL (トライアル中のユーザーランク)
                    - 2 # USER_RANK_FREE (無料ユーザーのランク)
                    - 3 # USER_RANK_PREMIUM (プレミアウムユーザーのランク)
              - not_null
            description: ユーザーのランクです
          - name: email
            type: STRING
            meta:
              contains_pii: true
            data_tests:
              - not_null
            description: ユーザーのメールアドレスです

protoファイルからdbt用のyamlに自動変換するスクリプト(クリックで開きます)。

import sys
import user_pb2
import re
from google.protobuf.descriptor import FieldDescriptor
import descriptor_pb2 as custom_options_pb2
from ruamel.yaml import YAML
from ruamel.yaml.representer import SafeRepresenter
from ruamel.yaml.comments import CommentedMap, CommentedSeq


def protobuf_message_to_dict(message):
    return {field.name: field for field in message.DESCRIPTOR.fields}


def field_descriptor_to_dict(field):
    field_types = {
        FieldDescriptor.TYPE_DOUBLE: "FLOAT",
        FieldDescriptor.TYPE_FLOAT: "FLOAT",
        FieldDescriptor.TYPE_INT64: "INT64",
        FieldDescriptor.TYPE_UINT64: "INT64",
        FieldDescriptor.TYPE_INT32: "INT64",
        FieldDescriptor.TYPE_FIXED64: "INT64",
        FieldDescriptor.TYPE_FIXED32: "INT64",
        FieldDescriptor.TYPE_BOOL: "BOOLEAN",
        FieldDescriptor.TYPE_STRING: "STRING",
        FieldDescriptor.TYPE_GROUP: "RECORD",
        FieldDescriptor.TYPE_MESSAGE: "RECORD",
        FieldDescriptor.TYPE_BYTES: "BYTES",
        FieldDescriptor.TYPE_UINT32: "INT64",
        FieldDescriptor.TYPE_ENUM: "INT64",
        FieldDescriptor.TYPE_SFIXED32: "INT64",
        FieldDescriptor.TYPE_SFIXED64: "INT64",
        FieldDescriptor.TYPE_SINT32: "INT64",
        FieldDescriptor.TYPE_SINT64: "INT64"
    }

    field_dict = {
        "name": field.name,
        "type": field_types.get(field.type, "unknown"),
        "meta": {},
        "data_tests": [],
    }

    if field.GetOptions().HasExtension(custom_options_pb2.field_description):
        field_dict["description"] = field.GetOptions().Extensions[custom_options_pb2.field_description]

    if field.GetOptions().deprecated:
        field_dict["meta"] = {"deprecated": True}

    if field.GetOptions().HasExtension(custom_options_pb2.contains_pii):
        field_dict["meta"]["contains_pii"] = field.GetOptions().Extensions[custom_options_pb2.contains_pii]

    if field.type == FieldDescriptor.TYPE_ENUM:
        field_dict["data_tests"].append({
            "accepted_values": enum_descriptor_to_accepted_values(field.enum_type)
        })

    if field.GetOptions().HasExtension(custom_options_pb2.not_null):
        field_dict["data_tests"].append("not_null")

    if field.GetOptions().HasExtension(custom_options_pb2.unique):
        field_dict["data_tests"].append("unique")

    if not field_dict["meta"]:
        del field_dict["meta"]

    if not field_dict["data_tests"]:
        del field_dict["data_tests"]

    return field_dict


def enum_descriptor_to_accepted_values(enum):
    accepted_values = CommentedMap()
    accepted_values["quote"] = False
    accepted_values["values"] = CommentedSeq()

    for value in enum.values:
        comment = value.name
        if value.GetOptions().HasExtension(custom_options_pb2.enum_description):
            comment += f" ({value.GetOptions().Extensions[custom_options_pb2.enum_description]})"

        # 数値とコメントをセットで格納
        accepted_values["values"].append(value.number)
        accepted_values["values"].yaml_add_eol_comment(comment, key=value.number)

    return accepted_values


def protobuf_to_yaml_dict(message):
    message_dict = {
        "name": message.DESCRIPTOR.name,
        "meta": {},
    }
    if message.DESCRIPTOR.GetOptions().HasExtension(custom_options_pb2.message_description):
        message_dict["description"] = message.DESCRIPTOR.GetOptions().Extensions[custom_options_pb2.message_description]

    if message.DESCRIPTOR.GetOptions().HasExtension(custom_options_pb2.owner_team):
        message_dict["meta"]["owner"] = message.DESCRIPTOR.GetOptions().Extensions[custom_options_pb2.owner_team]

    message_dict["columns"] = [field_descriptor_to_dict(item) for item in protobuf_message_to_dict(message).values()]
    return message_dict


def represent_str(dumper, instance):
    if "\n" in instance:
        instance = re.sub(" +\n| +$", "\n", instance)
        return dumper.represent_scalar("tag:yaml.org,2002:str", instance, style="|")
    else:
        return dumper.represent_scalar("tag:yaml.org,2002:str", instance)


class CustomRepresenter(SafeRepresenter):
    def represent_str(self, data):
        if "\n" in data:
            data = re.sub(" +\n| +$", "\n", data)
            return self.represent_scalar("tag:yaml.org,2002:str", data, style="|")
        else:
            return self.represent_scalar("tag:yaml.org,2002:str", data)


yaml = YAML()
yaml.indent(mapping=2, sequence=4, offset=2)

user_message_yaml_dict = protobuf_to_yaml_dict(user_pb2.User())

result = {
    "version": 2,
    "sources": [
        {
            "database": "my_project",
            "schema": "my_dataset",
            "tables": [
                user_message_yaml_dict,
            ],
        }
    ],
}

yaml.dump(result, sys.stdout)

こうしたData Contract中心の世界のメリットは、例えば以下のようなものだ。

  • データについて一番詳しいのはデータ生産者だが、そのデータ生産者が書いた情報がDWHやデータマートを作るAnalytics Engineerに対してほぼ直接的に届く
    • Protocol Buffersの定義が更新されれば、yaml側にも自動的に反映できるため、データ生産者 / データ消費者の双方にとって楽であり、便利である
  • リッチな情報をyamlで表現できる
    • Enumで入ってくる情報はそもそもどういう種類を取り得るのか、などの情報も欲しい
    • それに加えて、それぞれの値がどういった意味なのかも知りたい
    • Protocol Buffersのカスタムオプションで定義されたdescriptionをyamlのコメント上にも反映できるため、dbtでSQLを書く人にとってはクエリを書く上で必要な情報がかなりyamlに集約できる
      • 余談だが、私はこういった必要な情報を人力でコピペして回っていた人間なので、こういった情報の転記が自動化されるのはめちゃくちゃ助かるのである
  • 自動生成される先が何かはデータ生産者が知る必要がない
    • 今回はdbtを例にしているが、Data Contractを結ぶ先をprotoファイルであると定義すれば、その先はデータ消費者の好きにすればよいし、データ生産者は気にする必要はない
    • 例: transformのツールをdbtからDataformに乗り換えたとしてもデータ生産者は特に何も変える必要はない
    • 例: データ消費者がSQLを書くエンドユーザーであれば、人間が読みやすいMarkdownファイルの自動生成にしてもよい

それぞれのチームが疎に動けるようになっていつつ、必要な情報はリッチな構造化された形でやり取りできるのが特に大きい。

今回のスクリプトのようにData Contractを中心とする入出力を色々なツールに対応させることに興味がある人はData Contract CLIなどを見ても面白いと思う。なお、本エントリを執筆時点ではData Contract CLIによるProtocol Buffersのimportは未対応だったため、自前でスクリプトを書くことにしたのであった。Data Contractがもっと当たり前になってくると、この辺りのエコシステムが充実してくるだろう。

データの入出力を一箇所に集約、Protocol Buffersで抑えるパターン

私の組織では、Protocol BuffersはgRPCをメインスコープとして使っているが、Data Contractを結ぶ際にはProtocol Buffersのいくつかの利用パターンがあると思ったので、それについて書く。


典型的なパターンとしては、KafkaやPub/Subのようなリアルタイムなデータのやり取りをする場合だろう。おおむね、以下のような流れになる。

  • データのスキーマ(仕様)をprotoファイルで書く
  • protoファイルを元に、データの送り手 / 受け手がそれぞれに必要な言語向けのシリアライズ / デシリアライズ用のコードを自動生成する
  • 自動生成されたコードを元にデータの送受信をデータ生成者 / データ消費者が行なう

こういったスキーマを様々なチームで使いやすくするツール(スキーマレジストリ)としては、Confluentが有名だ。Confluent自体はProtocol Buffersに限らず、AvroやJSON Schemaなどのスキーマ言語をサポートしており、スキーマのバージョン管理をできる。


バッチやファイルベースの場合、GCSやS3などにファイルを置いてやり取りすることも多いだろう。その場合でもProtocol Buffersを使って、シリアライズ / デシリアライズできる。BigQueryへのデータ取り込みを考えると、Avroのほうがいいケースなどはもちろんあるが、入出力をProtocol Buffersで抑えてしまえばData Contractはやりやすくなる。


また、Firestoreをストレージに使っている場合も同じ形として捉えることができる。Firestore自体はドキュメントデータベースであり、カチっとした型を持っているわけではない。しかし、アプリケーションとしては型を持ったオブジェクトとして扱いたいことが多いし、その場合はfromFirestore / toFirestore / withConverterなどのデータ変換用の関数を通じてデータを読み書きすることが多い。

FirestoreからBigQueryへの同期はgcloud firestore exportなどを使うことが多いと思うが、Firestoreに読み書きをする際の型をProtocol Buffersで定義し、必ずその型に従ってデータの読み書きをデータ提供者で行なうようにすれば、Protocol Buffersで定義されたData Contractに従ったデータをデータ消費者が手に入れることができる。データ生産者はスキーマを変えたかったら必ずProtocol Buffersを修正する必要があるし、データ消費者はSSoTであるProtocol Buffersの修正を見てBigQueryのスキーマの変更を知ることができる。

ストレージのスキーマをProtocol Buffersで抑えるパターン

「データの入出力を一箇所に集約、Protocol Buffersで抑えるパターン」は色々な場面で使えるが、難しい場面もある。RDBMSをBigQueryなどに同期し、それに対してData Contractを結ぶ場合、出力となるパターンだけでも例えば複数のテーブルをJOINさせる場合などが考えられ、Firestoreのように入出力を一箇所で抑え込むのは難しい。

MySQLなりPostgreSQLなりRDBMSを扱う場合、ストレージ側にスキーマが存在し、これをData Contractに使うやり方を考えたい。RDBMSはprimary keyがどれか、foreign keyはどれか、どういった制約があるか(check制約)、カラムの意味(カラムのdescription)といったメタデータをスキーマとして保持できる。

とはいえ、Data Contractを結ぶ際にもっとリッチにメタデータが欲しくなる場面もある。例えば

  • テーブルのオーナーはどのチームなのか
  • データの同期の頻度
  • DEPRECATEDなカラムかどうか
  • PIIな情報を含むカラムかどうか
  • Enumの値の意味

などなどだ。テーブルやカラムのdescriptionには自然言語を書くことができるので、これらの情報をdescriptionに収めていくこと自体は可能ではある。社内でData Contractが一箇所しか存在しないのであればdescriptionで妥協してもよいが、様々な箇所でData Contractをやる可能性があるならば、こうしたメタデータは構造化して共通の仕組みで処理できるようにしておきたい。

また、SSoTとなるスキーマ情報をどこにどう持たせるか、という問題がある。productsというテーブルにdescriptionというカラムを追加したい場合、ALTER TABLE products ADD COLUMN description text;のようなmigration用のDDLを発行することになるが、SSoTとなるスキーマ情報はRDBMS自体が持つことになる。悪くはないが、migration用のDDLは積み重ねていく差分の情報であるし、宣言的かつコードで取り扱いやすい形が望ましい。

世の中には便利なツールを作っている人がいて、Schemas as Codeを実現しているものがある。一例としてAtlasがある。

AtlasはTerraformで使われるHCLのフォーマットをしておりコードで取り扱いやすく、コードの一部を修正すればどういったスキーマの変更が起きるかterraform planのように分かったり、その修正に対応するようなmigrationクエリを生成してterraform applyのようにスキーマに適用することもできる。

やり方は色々あるだろうが、Protocol BuffersにData Contractの情報をまとめつつ、その情報からAtlasのHCLを自動生成して、RDBMSのスキーマと一致することを担保しつつ、Protocol BuffersをData Contractとしてデータ消費者に提供する、というパターンもありえるかもしれない。

発展的な話題 & 読書会の案内

今回は割とシンプルなパターンを想定したが、現実はもう少し複雑な場合があるだろう(私も以下のパターンは経験している)。例えば、こういったパターンを考えてみよう。

  • チームAに所属する開発者がProtocol Buffersを使いつつ、データはMySQLに書き込む
  • MySQLのbinlogの情報を元に、チームBではDebeziumを使ってBigQuery上でCDCを実現させたり、ある時点のスナップショットテーブルを作成する
  • BigQueryにきたデータを元に、チームCではdbtでデータマートを作成する

この場合、データに関する知識は圧倒的にチームAが詳しいであろう。しかし、CDCを実現する上で何かしらの制約で型の変換が起きてしまう場合やデータの可用性を担保するのはチームBであったりする。この場合はチームCはどうData Contractを結ぶのがよいだろうか。データの受け渡しとしてはチームBと結ぶべきであろうが、データのより中身に近い部分はチームAと結ぶのがよさそうである。データが複数のチームをまたいでやり取りされる場合、こうしたData Contractの結び方はまだまだ自明ではない。


Data Contractは最近活発になってきている概念であるが、まだまだ枯れているわけではなく、現実の組織やチーム構成でどう回していけるかはこれからベストプラックティスが出てくることになると思う。datatech-jpでは、冒頭で紹介したDriving Data Quality with Data Contractsの読書会が開催中であり、興味がある人は是非参加してみてはどうだろうか。

Data Contractは私もまだまだ分からないことだらけなので、本エントリに関連することなどで私とディスカッションしてみたい人は気軽に声をかけて欲しい。

参考文献

*1:注意: 私自身が思っているわけでもなく、所属組織でこれらが起きているというわけでもないです