肉球でキーボード

MLエンジニアの技術ブログです

AWS Glueでicebergテーブルのスキーマを動的に変更する


本記事は datatech-jp Advent Calendar 2024 の13日目の記事です。

はじめに

Glue Catalogに登録したiceberg tableにAWS GlueJobでデータの書き込みを行うケースは多いかと思います。

通常、Glue Catalogのスキーマに登録されていないカラムを持つデータをGlue Jobで書き込もうとするとエラーとなります。

そのため、新しいカラムを持つデータを書き込む前に alter table add columns をAthenaなどで実行し、iceberg tableのスキーマ変更を行う必要があります。
ALTER TABLE ADD COLUMNS - Amazon Athena

Glue Jobの処理で新しくカラムを追加した場合や、元データにカラムが追加された場合に毎回上記の対応を行うのは運用コストが高く、操作ミスの危険もあります。

本記事では、扱うデータのカラムに応じて動的にiceberg tableのカラムを追加する方法をまとめます。

Glue Job実行のためのAWSリソース準備

AWS GlueJobを実行するための、S3 Bucket・IAM Role・Glue Database・Glue Jobを作成するTerraformを作成しました。

code-for-blogpost/glue_job_iceberg_schema_change/terraform at main · nsakki55/code-for-blogpost · GitHub

$ cd terraform 
$ terraform init
$ terraform apply

AWS Glue jobでiceberg tableに対する操作を有効化する場合、Glue JobのSpark Configurationを適切に設定する必要があります。
AWS Glue での Iceberg フレームワークの使用 - AWS Glue.

terraformのコードからglue jobのリソースを作成する部分を抜粋すると、以下のようになります。

resource "aws_glue_job" "update_iceberg_table_schema" {
  name              = "update_iceberg_table_schema"
  role_arn          = aws_iam_role.glue_job_role.arn
  glue_version      = "4.0"
  worker_type       = "G.1X"
  number_of_workers = 2
  max_retries       = 0
  execution_property {
    max_concurrent_runs = 10
  }

  command {
    script_location = "s3://${aws_s3_bucket.glue_job.bucket}/scripts/update_iceberg_table_schema.py"
  }

  default_arguments = {
    "--enable-glue-datacatalog"          = "true"
    "--TempDir"                          = "s3://${aws_s3_bucket.glue_job.bucket}/temporary/"
    "--spark-event-logs-path"            = "s3://${aws_s3_bucket.glue_job.bucket}/sparkHistoryLogs/"
    "--enable-job-insights"              = "false"
    "--enable-continuous-cloudwatch-log" = "true"
    "--datalake-formats"                 = "iceberg"
    # conf to enable iceberg format. ref: https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html
    "--conf" = "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse"
  }

Glue Jobで取り込んでicebergテーブルに書き込むためのサンプルデータを2つ用意しました。

https://github.com/nsakki55/code-for-blogpost/tree/main/glue_job_iceberg_schema_change/data

test_data.csv

col1 col2
aaa 1
bbb 2
ccc 3

test_data_new_column.csv

col2 col1 col3
444 ddd XXX
555 eee YYY
666 fff ZZZ

2つのテストデータの差は以下です

  • col1, col2のカラムの順序が異なる
  • col3という新しいカラムが追加されている

今回はS3 Bucketに2つのサンプルデータを配置して、Glue Jobで読み込む元データとします。

アップロード先のS3 Bucket名をterraform applyで生成したS3 Bucket名に変更して下さい。

$ aws s3 cp ./data/test_data.csv s3://schema-change-data-20241208085330834400000002/input/
$ aws s3 cp ./data/test_data_new_column.csv s3://schema-change-data-20241208085330834400000002/input/

icebergテーブルにデータを追加するGlueJobの作成

S3からcsvデータを読み込んだ後、以下の処理を行うGlue Jobを作成しました。

  • テーブルが存在しない場合
    • テーブルを作成
  • テーブルが存在する場合
    • テーブルにデータを追加
import sys
from typing import Dict

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext

S3_BUCKET = "schema-change-data-20241208085330834400000002"
TABLE_NAME = "test_table"
DATABASE_NAME = "test_database"
CATALOG_NAME = "glue_catalog"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame:
    print(f"Start get dynamic frame from S3. {source_s3_path=}")
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={
            "withHeader": True,
            "separator": ",",
        },
        connection_type="s3",
        format="csv",
        connection_options={
            "paths": [source_s3_path],
        },
    )
    print(f"Finished get dynamic frame from S3. {dyf.count()=}")
    return dyf

def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool:
    print(f"Start check table in database. {database_name=}, {table_name=}")
    tables = glue_context.spark_session.catalog.listTables(database_name)
    is_exist = table_name in [table.name for table in tables]
    print(f"Finished check table in database. {is_exist=}")
    return is_exist

def main(args: Dict[str, str]) -> None:
    sc = SparkContext()
    glue_context = GlueContext(sc)
    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)
    print(f"Start update iceberg table schema. {args=}")

    dyf = get_dynamic_frame_from_s3(
        glue_context=glue_context,
        source_s3_path=f"s3://{S3_BUCKET}/input/{args['file_name']}",
    )
    df = dyf.toDF()
    df.printSchema()

    is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME)

    table_path = f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}"
    if is_exist:
        df.writeTo(table_path).append()
    else:
        df.writeTo(table_path).tableProperty("format-version", "2").tableProperty("location", f"s3://{S3_BUCKET}/output").create()

    print(f"Finished update iceberg table schema. {args=}")
    job.commit()

if __name__ == "__main__":

    args = getResolvedOptions(sys.argv, ["JOB_NAME", "file_name"])
    main(args)

Glue Jobの実行スクリプトをS3にアップロードします。S3 Bucketをterraform applyで作成したS3 Bucket名に変更して下さい。

$ aws s3 cp ./src/update_iceberg_table_schema.py s3://glue-job-20241208085330834300000001/scripts/ 

1つ目のサンプルデータであるtest_data.csvを読み込むGlue Jobを実行します。

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data.csv"}'

Glue Catalogのスキーマを見ると、col1, col2が追加されています

glue catalog

icebergテーブルのスキーマを変更する

icebergでは動的にスキーマ変化できる、mergeSchemaというオプションが提供されています。
Writes - Apache Iceberg™.

mergeSchemaオプションを使用する場合、以下の挙動となります。

  • 新しいカラムがデータソースに存在するが、対象のテーブルにカラムが存在しない場合

    → 新しいカラムが対象のテーブルに追加される。既存のレコードの新しいカラムにはnullが設定される。

  • データソースにカラムが存在しないが、対象のテーブルにはカラムが存在する場合

    → 新しいレコードを追加・レコードを更新すると、対象のカラムにnullが設定される。

mergeSchemaオプションを使用するには、対象のテーブルのプロパティ設定に 'write.spark.accept-any-schema'='true' を追加する必要があります。

ALTER TABLE test_database.test_table SET TBLPROPERTIES (
  'write.spark.accept-any-schema'='true'
)

上記のプロパティ追加のクエリをAthena経由で実行すると、サポートされていないプロパティエラーがでます。

Unsupported table property key: write.spark.accept-any-schema

Athenaでは変更可能なicebergテーブルのプロパティに制限があります。
Create Iceberg tables - Amazon Athena

そのため、上記のプロパティ変更をAthena経由では行えません。

AWSが出してるicebergテーブルに関する記事で、'write.spark.accept-any-schema'='true' のプロパティ設定をspark経由で実行してるのを確認できます。
Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg | AWS Big Data Blog

本記事ではこの方法に習い、spark経由で必要なプロパティ設定を行います。

既存テーブルへのデータ追加の実装を以下のように変更しました。

sql = f"ALTER TABLE {table_path} SET TBLPROPERTIES ('write.spark.accept-any-schema' = 'true')"
glue_context.spark_session.sql(sql)
df.writeTo(table_path).option("mergeSchema","true").append()

新しいカラムを持つcsvデータを読み込むGlueJobを実行します

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data_new_column.csv"}'

しかし、上記の設定のGlueJobは失敗します。

原因はcol1, col2のカラムの順番がテーブルスキーマと元データで異なるためです。

2024-12-09 01:21:18,419 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last):
  File "/tmp/update_iceberg_table_schema.py", line 74, in <module>
    main(args)
  File "/tmp/update_iceberg_table_schema.py", line 63, in main
    df.writeTo(table_path).option("mergeSchema","true").append()
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1460, in append
    self._jwriter.append()
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
  1: col1: optional string
  2: col2: optional string
  3: col3: optional string
}
Provided schema:
table {
  2: col2: optional string
  1: col1: optional string
  3: col3: optional string
}
Problems:
* col2 is out of order, before col1

icebergのspark optionには check-ordering という設定があります。

これは入力スキーマとテーブルスキーマが同じかチェックする設定で、デフォルトではTrueとなります。

icebergのGitHubレポジトリのmergeSchemaに関するissueで議論されているように、スキーマ変更するためにcheck-ordering をFalseにする必要があります。
Adding new columns (mergeSchema) · Issue #8908 · apache/iceberg · GitHub

GlueJobのリソースを作成するterraform設定に、以下のconfigを追加します。

--conf spark.sql.iceberg.check-ordering=false

GlueJobの変更を反映します

$ terraform apply

新しいカラムを持つcsvデータを読み込むGlue Jobを再び実行します。今度はJobが成功します。

$ aws glue start-job-run \
    --job-name update_iceberg_table_schema \
    --arguments '{"--file_name": "test_data_new_column.csv"}'

Glue Catalogのスキーマを確認すると、スキーマが変更されcol3が追加されているのを確認できます。

glue catalog with new column

test_tableの中身を確認してみると、既存のレコードには新しく追加されたcol3のカラムにnullが設定されています。

test_table

GlueJobから動的にiceberg tableのスキーマを変更できることができるようになりました。

参考

Pythonプロジェクトでflat layoutではなくsrc layoutが推奨される理由を理解する

本文中コード
github.com

flat layoutとsrc layoutについて

Pythonプロジェクトのディレクトリ構成について調べてたところ、flat layoutとsrc layoutという2種類のディレクトリ構成が存在することを知りました。
src レイアウト対フラットレイアウト - Python Packaging User Guide

flat layout

flat layoutはパッケージフォルダをプロジェクトのルート直下に配置するスタイルです。
flat layoutの有名なpythonプロジェクトだと、 pytorch, django, tensorflow があります。

.
├── README.md
├── pyproject.toml
└── my_package/
    ├── __init__.py
    └── module.py

src layout

一方、src layoutはsrcサブディレクトリにパッケージフォルダを配置するスタイルです。
src layoutの有名なpythonプロジェクトだと、transfomers, flask, black があります。

├── README.md
├── pyproject.toml
└── src/
     └── my_package/
        ├── __init__.py
        └── module.py

Pythonパッケージを開発する上ではsrc layoutが推奨されている

pytestの公式ドキュメントでは、src layoutが推奨されています。
Good Integration Practices - pytest documentation

Generally, but especially if you use the default import mode prepend, it is strongly suggested to use a src layout. Here, your application root package resides in a sub-directory of your root, i.e. src/mypkg/ instead of mypkg.

PyPA(Python Packaging Authority)のPython Packaging User GuideのGitHubレポジトリでも、src layoutを好むユーザーが多いことが伺えます。
https://github.com/pypa/packaging.python.org/issues/320

Python Packaging User Guideからsrc layoutとflat layoutの特徴のポイントを抜粋すると、以下のように書かれています。
src レイアウト対フラットレイアウト - Python Packaging User Guide

  • ソースコードを実行するために、src layoutはインストールステップが必要となるが、flat layoutはインストールステップが不要
  • Pythonインタープリタがカレントワーキングディレクトリをインポートパスの先頭に含むため、flat layoutでは開発中のコードを使用してしまう危険があるが、src layoutではインストール済みパッケージを使用することが保証されている

自分はこれらの説明を読んだ時に、何となく雰囲気は分かるけど、自分事として理解できていないモヤモヤがありました。
実際にflat layoutとsrc layoutでパッケージ開発の流れを再現してみて、src layoutがパッケージテストの上で安全であることを理解してみようと思います。

flat layoutでパッケージ開発

パッケージ構成

code-for-blogpost/src_vs_flat_layout/flat_layout at main · nsakki55/code-for-blogpost · GitHub

.
├── mypkg_flat
│   ├── __init__.py
│   └── math.py
├── tests
│   ├── __init__.py
│   └── test_math.py
├── pyproject.toml
├── requirements-dev.txt
└── tox.ini

mypkg_flat というディレクトリをプロジェクトルート直下に作成しました。

パッケージ内のモジュールであるmath.py には、足し算と引き算を行うadd, substract関数を用意します。

def add(a: float, b: float) -> float:
    return a + b

def substract(a: float, b: float) -> float:
    return a - b

tests/test_math.py には mypkg_flat パッケージのテストを記述します。

from mypkg_flat.math import add, subtract

def test_add():
    assert add(2, 3) == 5

def test_subtract():
    assert subtract(5, 3) == 2

pyproject.tomlを使用してパッケージビルドを行います。
pyproject.tomlによるパッケージビルドの方法はnikkieさんの記事を参考にしました。
Pythonで自作ライブラリを作るとき、setup.pyに代えてpyproject.tomlを使ってみませんか? - nikkie-ftnextの日記
以下の内容のpyproject.tomlに記述します。mypkg_flat ディレクトリをビルド対象としています。

[project]
name = "mypkg-flat"
version = "0.1.0"
description = "Example package using flat layout"
requires-python = ">=3.11"
dependencies = [
    "pytest",
]

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
packages = ["mypkg_flat"]

mypkg_flat パッケージのビルドを行います。distフォルダ内にビルド済みのパッケージファイルが作成されます。

$ python -m build

$ ls dist/
> mypkg_flat-0.1.0-py3-none-any.whl  mypkg_flat-0.1.0.tar.gz

requirements-dev.txtに mypkg_flat を含めます。

pytest
mypkg_flat

パッケージの作成ができたので、toxで作った仮想環境にビルド済みパッケージをインストールしてテストを実行します。
tox.iniに skipdist=true を設定することで、requirements-dev.txtのインストール時にビルドが走らないようにします。
Configuration - tox

[tox]
envlist = py312
skipsdist = true

[testenv]
install_command = pip install --find-links=dist {opts} {packages}
deps = -r requirements-dev.txt
commands =
    pytest  tests

tox -r コマンドでテストを実行します。 -r オプションをつけて仮想環境を作り直してます。

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout
configfile: pyproject.toml
collected 2 items

tests/test_math.py ..                                                                                                                                                         [100%]

================================================================================= 2 passed in 0.00s =================================================================================
  py312: OK (1.07=setup[0.95]+cmd[0.12] seconds)
  congratulations :) (1.09 seconds)

開発中コードがパッケージ外から直接importされるのを確認する

パッケージに含まれない開発中のコードが意図せず使用される状況を再現してみます。

math.pyモジュールに掛け算を行うmultiple関数を開発中のコードとして追加します。

def add(a: float, b: float) -> float:
    return a + b

def substract(a: float, b: float) -> float:
    return a - b

def multiple(a: float, b: float) -> float:
    return a * b

パッケージをテストする tests/test_math.pyにmultiple関数のテストを追加します。

from mypkg_flat.math import add, substract, multiple

def test_add():
    assert add(2, 3) == 5

def test_subtract():
    assert substract(5, 3) == 2
    
def test_multiple():
    assert multiple(2, 5) == 10

パッケージのビルドを行っていない状態から、 mypkg_flat をインストールしてmultipleをimportしようとすると、パッケージに含まれていない関数を読み込もうとしてるのでエラーが発生します。

$ cd dist
$ pip install mypkg_flat-0.1.0.tar.gz 
$ python
>>> from mypkg_flat.math import multiple
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ImportError: cannot import name 'multiple' from 'mypkg_flat.math'

この状態でテストを実行すると奇妙なことが起きます。
開発中コードを含めてビルドを実行してないにも関わらず、テストが通ってしまいます。

$ cd .. # プロジェクトのルートディクトリに移動
$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout
configfile: pyproject.toml
collected 3 items

tests/test_math.py ...                                                                                                                                                        [100%]

================================================================================= 3 passed in 0.01s =================================================================================
  py312: OK (1.32=setup[1.19]+cmd[0.13] seconds)
  congratulations :) (1.36 seconds)

何が起きてるか確認します。
toxで作成されたpython仮想環境に入って、 mypkg_flat パッケージの読み込み先を見てみると、ライブラリディレクトリ内ではなく、mypkg_flatディレクトリ中のコードを直接読みに行ってることがわかります。

$ source .tox/py312/bin/activate 
$ python
>>> import mypkg_flat
>>> mypkg_flat.__file__
'/Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/mypkg_flat/__init__.py'

pythonのモジュール読み込みパス一覧を取得すると、カレントディレクトリが先頭にあるのを確認できます。

>>> import sys
>>> sys.path
['', '/Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312/lib/python3.12/site-packages']

ドキュメントに記載されているように、pythonではデフォルトのPYTHONPATH設定では、モジュールの読み込みはライブラリディレクトリより、カレントディレクトリが優先されます。
6. Modules — Python 3.13.0 documentation

The directory containing the script being run is placed at the beginning of the search path, ahead of the standard library path. This means that scripts in that directory will be loaded instead of modules of the same name in the library directory.

そのため、パッケージを読み込んでるつもりが、実は開発中のコードを直接読み込んでいる状態が発生してしまいます。
この問題をsrc layoutで解決できることを確認します。

src layoutでパッケージ開発

パッケージ構成

code-for-blogpost/src_vs_flat_layout/src_layout at main · nsakki55/code-for-blogpost · GitHub

.
├── src
│   └── mypkg_src
│       ├── __init__.py
│       └── math.py
├── tests
│   ├── __init__.py
│   └── test_math.py
├── pyproject.toml
├── requirements-dev.txt
└── tox.ini

mypkg_src というディレクトリをsrcサブディレクトリ以下に作成しました。
パッケージ内のコードとテストは mypkg_flat と同じにするので省略します。

pyproject.tomlは以下の設定としました。パッケージのビルド対象をsrcディレクトリにしています。

[project]
name = "mypkg-src"
version = "0.1.0"
description = "Example package using src layout"
requires-python = ">= 3.12"
dependencies = [
    "pytest",
]

[build-system]
requires = ["setuptools >= 61.0"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
package-dir = {"" = "src"}

mypkg_src パッケージのビルドを行います。

$ python -m build

$ ls dist/
mypkg_src-0.1.0-py3-none-any.whl  mypkg_src-0.1.0.tar.gz

requirements-dev.txtに mypkg_src を含めます。

pytest
mypkg_src

flat layoutの場合と同様の設定でtoxによるテストを実行します。
mypkg_src パッケージがインストールされ、pytestのコードから mypkg_src パッケージが読み込まれているを確認できます。

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout
configfile: pyproject.toml
collected 2 items

tests/test_math.py ..                                                                                                                                                         [100%]

================================================================================= 2 passed in 0.00s =================================================================================
  py312: OK (1.62=setup[1.50]+cmd[0.12] seconds)
  congratulations :) (1.64 seconds)

開発中コードをimportできずテストが失敗することを確認する

flat layoutと同様に、開発中のコードであるmultiple関数を加えた場合の挙動を確認します。

multiple関数を含めたパッケージビルドを行う前に、テストを実行します。
パッケージに含まれていないmultiple関数の読み込みエラーがでて、開発中のコードが使われないことを確認できます。

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout
configfile: pyproject.toml
collected 3 items

tests/test_math.py ..F                                                                                                                                                        [100%]

===================================================================================== FAILURES ======================================================================================
___________________________________________________________________________________ test_multiple ___________________________________________________________________________________

    def test_multiple():
>       assert multiple(2, 5) == 10
E       NameError: name 'multiple' is not defined

tests/test_math.py:10: NameError
============================================================================== short test summary info ==============================================================================
FAILED tests/test_math.py::test_multiple - NameError: name 'multiple' is not defined
============================================================================ 1 failed, 2 passed in 0.01s ============================================================================
py312: exit 1 (0.13 seconds) /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout> pytest tests pid=17455
  py312: FAIL code 1 (2.17=setup[2.04]+cmd[0.13] seconds)
  evaluation failed :( (2.20 seconds)

toxで作成された仮想環境に入って、 mypkg_src の読み取り先を見ると、ライブラリディレクトリ内からパッケージが読み込まれているのを確認できます。

$ source .tox/py312/bin/activate 

$ python
>>> import mypkg_src
>>> mypkg_src.__file__
'/Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/src_layout/.tox/py312/lib/python3.12/site-packages/mypkg_src/__init__.py'

パッケージをビルドし直してテストを実行すると、テストが成功します。

$ python -m build

$ tox -r
py312: remove tox env folder /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout/.tox/py312
py312: install_deps> pip install --find-links=dist -r requirements-dev.txt
py312: commands[0]> pytest tests
================================================================================ test session starts ================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
cachedir: .tox/py312/.pytest_cache
rootdir: /Users/satsuki/github/code-for-blogpost/src_vs_flat_layout/flat_layout
configfile: pyproject.toml
collected 3 items

tests/test_math.py ...                                                                                                                                                        [100%]

================================================================================= 3 passed in 0.00s =================================================================================
  py312: OK (1.92=setup[1.54]+cmd[0.38] seconds)
  congratulations :) (1.96 seconds)

src layoutではパッケージコードを実行するためにインストールが必要となり、開発中のコードが意図せず実行されるのを防げることを確認できました。

pythonプロジェクトはsrc layoutにすべきなのか?

pythonのパッケージ開発のテスト観点からは、src layoutの方が安全であることは分かりました。
パッケージ管理ツールであるpoetryでプロジェクト作成すると、 デフォルトではflat layoutで作成される一方、src layoutで作成するオプションも提供されています。
Commands | Documentation | Poetry - Python dependency management and packaging made easy

同じくpythonのパッケージ管理ツールであるuvでプロジェクトを作成すると、アプリケーションの場合はflat layout、パッケージの場合は src layoutで作成されます。
Projects | uv

uvの思想に従うなら

  • アプリケーションの場合 : flat layout
  • パッケージの場合 : src layout

で使い分けるのが今のpython界隈のデファクトスタンダードと言えるのでしょうか?

GitHub Star数が上位のpytonプロジェクトを見てみると flat layoutの構成をとってるものも多い印象です。
ML界隈でよく使われるプロジェクトで探してみると、以下のプロジェクトはflat layoutとなっていました。

これらのプロジェクトがflat layoutを採用してる思想を自分はまだ分かってないです。

参考

pythonのテスト環境作成ツールNoxを使う

本文中コード

github.com

Noxとは

Noxはテスト用のpython仮想環境を作成し、テストを自動化するコマンドラインツールです。

Welcome to Nox — Nox 2024.10.9 documentation

Noxを利用することで以下のことを実現できます。

  • チームメンバーのローカルPC上の環境差分の解消
  • CIとローカルPCでの環境差分の解消
  • 複数のPython・ライブラリバージョンでの動作確認

Pythonのテスト環境作成ツールとして tox が有名です。
tox

Noxとtoxの簡単な比較です。

tox Nox
リリース年 2010 2018
設定ファイルフォーマット INI python
GitHubスター数 3664 1310

GitHub Start数の推移を見ても、toxの方が知名度の高さが伺えます。

github star history (tox vs nox)

Noxとtoxの大きな違いは、設定ファイルをtoxはDSLで記述するtox.iniで管理するのに対し、Noxはpythonファイルであるnoxfile.pyで管理することです。

NoxはPythonを用いた柔軟なテスト自動化を行えるのが、toxと比較した特徴です。

GitHub - wntrblm/nox: Flexible test automation for Python

インストール

Noxはプロジェクトの仮想環境ではなく、グローバルの環境にインストールするよう設計されています。

そのためNoxの公式ドキュメントではpipxによるインストールが推奨されてます。

$ pipx install nox

自分はuvを使ってpython環境を用意してるので、uv toolとしてnoxをinstallしました。

$ uv tool install nox 

実行方法

以下の一連のテストをnoxで自動化してみます

  • ruff check / format
  • mypy
  • pytest

サンプルプロジェクトとしてロジスティック回帰でirisデータセットを学習するコードを用意しました。

nox-github-actions-example/src/train_lr.py at main · nsakki55/nox-github-actions-example · GitHub

作成したnoxfile.pyです

import nox

@nox.session(venv_backend="uv", python=["3.12"], tags=["lint"])
def lint(session):
    session.install("ruff")
    session.run("uv", "run", "ruff", "check")
    session.run("uv", "run", "ruff", "format")

@nox.session(venv_backend="uv", python=["3.12"], tags=["lint"])
def mypy(session):
    session.install("pyproject.toml")
    session.install("mypy")
    session.run("uv", "run", "mypy", "src")

@nox.session(venv_backend="uv", python=["3.12"], tags=["test"])
def test(session):
    session.run("uv", "sync", "--dev")

    if session.posargs:
        test_files = session.posargs
    else:
        test_files = ["tests"]

    session.run("uv", "run", "pytest", *test_files)

Noxでは@nox.sessionデコレータがついた関数を作成して、静的解析やpytestなど実行したいテストをそれぞれ記述します。
sessionごとに仮想環境が作成されるため、session単位で仮想環境の設定を変更できます。
noxは20240年3月のリリース以降、python仮想環境のバックエンドにuvを指定することができるようになりました。
Release 2024.03.02 · wntrblm/nox · GitHub
Configuration & API — Nox 2024.10.9 documentation

uvによるパッケージインストールのため、noxの仮想環境の立ち上げが高速になります。

Noxの実行はnoxfile.pyがあるディレクトリ直下で、 nox コマンドを実行することで行えます。
上記のnoxfile.pyが読み取られ、各セッションが実行されています。

ruffとpyproject.tomlのパッケージインストールが uv pip install で行われているのが分かります。
session.run(”uv”, “run”, “sync”, “—dev”) コマンドでpyproject.tomlからdev用のパッケージをインストールできます。

$ uvx nox
nox > Running session lint-3.12
nox > Creating virtual environment (uv) using python3.12 in .nox/lint-3-12
nox > uv pip install ruff
nox > uv run ruff check
   Built nox-sandbox @ file:///Users/satsuki/github/nox-sandbox
Uninstalled 1 package in 0.45ms
Installed 1 package in 0.71ms
All checks passed!
nox > uv run ruff format
1 file reformatted, 5 files left unchanged
nox > Session lint-3.12 was successful.
nox > Running session mypy-3.12
nox > Creating virtual environment (uv) using python3.12 in .nox/mypy-3-12
nox > uv pip install pyproject.toml
nox > uv pip install mypy
nox > uv run mypy src
Success: no issues found in 3 source files
nox > Session mypy-3.12 was successful.
nox > Running session test-3.12
nox > Creating virtual environment (uv) using python3.12 in .nox/test-3-12
nox > uv sync --dev
Resolved 17 packages in 0.39ms
Audited 15 packages in 0.02ms
nox > uv run pytest tests
================================================================================================================================================================================ test session starts =================================================================================================================================================================================
platform darwin -- Python 3.12.5, pytest-8.3.3, pluggy-1.5.0
rootdir: /Users/satsuki/github/nox-sandbox
configfile: pyproject.toml
collected 4 items

tests/test_train_lr.py ....                                                                                                                                                                                                                                                                                                                                                    [100%]

================================================================================================================================================================================= 4 passed in 0.46s ==================================================================================================================================================================================
nox > Session test-3.12 was successful.
nox > Ran multiple sessions:
nox > * lint-3.12: success
nox > * mypy-3.12: success
nox > * test-3.12: success

python仮想環境が瞬時に立ち上がっているのを確認できます。

run nox

tagをsessionごとに割り当てて、tag単位で実行を行えます。
https://nox.thea.codes/en/stable/tutorial.html#session-tags

上記の例だとruffとmypyのsessionをlintというタグに割り当てているため、以下のようにlintタグを指定することでruffとmypyのみ実行できます。

$ uvx nox -t lint

実行引数を渡すことができます。test sessionの例ではテスト対象のファイルを実行引数で渡せるようにしてます。
https://nox.thea.codes/en/stable/config.html#passing-arguments-into-sessions

$ uvx nox -t test -- tests/test_train_lr.py

GitHub Actionsで実行する

uv経由でnoxをインストール・実行するGitHub Actionsを作成しました。

name: Run nox tests

on: [push]

jobs:
  tests:
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v2
      - name: Set up Python 3.12
        uses: actions/setup-python@v2
        with:
          python-version: 3.12
      - name: Install uv and nox
        run: |
          pip install --upgrade pip
          pip install uv
          uv tool install nox
      - name: Run Nox
        run: |
          uvx nox

nox実行部分が11秒で完了しています。

add check · nsakki55/nox-github-actions-example@94ea041 · GitHub

その他の便利な機能

公式ドキュメントを見ていて、自分が便利に思った機能をピックアップして紹介します。

session options 設定

コマンドライン引数で設定できる値を、noxfile.pyで設定できます。sessionごとにpythonバージョンや仮想環境のバックエンドを指定してましたが、まとめて行えます。

https://nox.thea.codes/en/stable/config.html#modifying-nox-s-behavior-in-the-noxfile

nox.options.python = "3.12"
nox.options.default_venv_backend = "uv"

実行順序の設定

session.notify をsessionの最後に追記することで、次に実行するsessionを指定できます。

https://nox.thea.codes/en/stable/config.html#nox.sessions.Session.notify

以下の例だとlint終了後にmypyが実行されます。

@nox.session()
def lint(session):
    session.install("ruff")
    session.run("uv", "run", "ruff", "check")
    session.run("uv", "run", "ruff", "format")

    session.notify("mypy")

@nox.session()
def mypy(session):
    session.install("pyproject.toml")
    session.install("mypy")
    session.run("uv", "run", "mypy", "src")

環境変数の設定

session.run コマンドの機能の一つですが、環境変数をrun単位で設定できます。

https://nox.thea.codes/en/stable/config.html#nox.sessions.Session.run

session.run(
    'bash', '-c', 'echo $SOME_ENV',
    env={'SOME_ENV': 'Hello'})

runコマンドでは、以下のように1文で実行コマンドを渡せないので注意が必要です。

session.run('pytest -k fast tests/')

セッションパラメータの設定

事前に設定したパラメータの値ごとsessionを実行できます。

https://nox.thea.codes/en/stable/config.html#parametrizing-sessions

@nox.session
@nox.parametrize('django', ['1.9', '2.0'])
@nox.parametrize('database', ['postgres', 'mysql'])
def tests(session, django, database):
    ...

Nox自体のバージョン指定

実行するNox自体のバージョンをnoxfile.py内で指定することができます。

https://nox.thea.codes/en/stable/config.html#nox-version-requirements

import nox

nox.needs_version = ">=2019.5.30"

@nox.session(name="test")  # name argument was added in 2019.5.30
def pytest(session):
    session.run("pytest")

Tox or Nox ?

ToxとNoxについて分かりやすく説明してくれたYouTube動画があるのでこちらが参考になると思います。
www.youtube.com

ToxとNoxどちらの方が優れているというわけではなく、それぞれの良さがあるという話です。

知名度で言うとToxの方があり、Toxに関する記事の方がNoxよりも豊富にあります。
NoxはPythonで記述できることによる柔軟性を特徴とする一方、Toxはエンジニアに広く馴染みのあるDSLで記述できるという特徴があります。
どちらを採用するかはチームやプロジェクト内容に依存するところが大きいのかなと思います。

参考

GlueJobのicebergテーブル処理テストをローカルで実行する

AWS Glue Jobによるicebergテーブル操作

AWS Glueではicebergテーブルフォーマットがサポートされています。
AWS Glue での Iceberg フレームワークの使用 - AWS Glue

iceberg形式は大規模なデータセットを効率的に処理を行うことができる、データレイクに保存されてるデータ処理に向いたテーブル形式です。

iceberg形式の主要な特徴として以下が挙げられます
Apache Iceberg - Apache Iceberg™

  • SQL操作
    データの追加・更新・削除をSQLコマンドで実行可能

  • スキーマ進化
    テーブルスキーマの柔軟な変更が可能

  • 隠れパーティショニング
    人手管理が不要なパーティションを自動で作成

  • タイムトラベルとロールバック
    特定時刻のテーブル状態の保持と切り替え

  • データ圧縮

architecture
引用: Spec - Apache Iceberg™
icebergテーブルは主に3つの要素で構成されます

Glue Jobでiceberg形式のテーブルへの処理を行う際、実際のAWSリソースへのアクセスを行わずに動作確認したい場合は2つの方法が検討できます

  • LocalStackのAWS Glueを使用 (Pro版のみ使用可 Glue | Docs)
  • ローカル環境に作成したicebergテーブルを使用

今回は2つ目のローカル環境で作成したicebergテーブルを使用する方法を試してみようと思います。

本文中コード
github.com

テスト対象のGlue Job

import sys
from typing import Dict

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext

from pyspark.sql import DataFrame

S3_ENDPOINT_URL = "http://s3.dev:4566"
S3_BUCKET = "test-job-bucket"
TABLE_NAME = "test_table"
DATABASE_NAME = "test_database"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame:
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={
            "quoteChar": '"',
            "withHeader": True,
            "separator": ",",
        },
        connection_type="s3",
        format="csv",
        connection_options={
            "paths": [source_s3_path],
            "recurse": True,
        },
    )
    return dyf

def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool:
    tables_collection = glue_context.spark_session.catalog.listTables(database_name)
    return table_name in [table.name for table in tables_collection]

def append_iceberg_table(df: DataFrame, table_name: str, table_location) -> None:
    df.writeTo(table_name).tableProperty("format-version", "2").tableProperty("location", table_location).append()

def create_iceberg_table(df: DataFrame, table_name: str, table_location) -> None:
    df.writeTo(table_name).tableProperty("format-version", "2").tableProperty("location", table_location).create()

def main(args: Dict[str, str]) -> None:
    sc = SparkContext()
    glue_context = GlueContext(sc)

    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)

    dyf = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/input")
    df = dyf.toDF()
    is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME)
    if is_exist:
        append_iceberg_table(df, TABLE_NAME, f"s3://{S3_BUCKET}/output")
    else:
        create_iceberg_table(df, TABLE_NAME, f"s3://{S3_BUCKET}/output")

    job.commit()

if __name__ == "__main__":
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    main(args)

以下の処理を実行するGlueJobのスクリプトです。

  • S3からcsvデータを読み込み
  • テーブルが存在する場合→既存テーブルにデータを追加
  • テーブルが存在しない場合→新規テーブルを作成

ローカルのS3リソースを利用してテストする方法をこちらの記事で紹介してるので、S3からのデータ読み込み部分は本記事では割愛します。

LocalStackのS3環境を利用したAWS Glue Jobローカル実行・テスト方法 - 肉球でキーボード

テスト用Glue Contextの設定

from awsglue.context import GlueContext
from pyspark.sql import SparkSession

WAREHOUSE_PATH = "./spark-warehouse"

@pytest.fixture(scope="session")
def glue_context() -> GlueContext:
    spark = (
        SparkSession.builder.master("local[1]")
        # Configure for testing iceberg table operation
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
        .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.local.type", "hadoop")
        .config("spark.sql.catalog.local.warehouse", WAREHOUSE_PATH)
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.0")
        .config("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true")
        .config("spark.sql.defaultCatalog", "local")
        # Configure for testing local hive metastore
        .config("spark.sql.hive.metastore.jars", "builtin")
        .config("spark.sql.hive.metastore.version", "2.3.9")
        .config("spark.sql.catalogImplementation", "in-memory")
        .getOrCreate()
    )

    yield GlueContext(spark.sparkContext)
    spark.stop()

ローカル環境で作成したicebergテーブルにアクセスする設定を追加してます。

設定はこちらの記事を参考にしています。

Navigating the Iceberg: unit testing iceberg tables with Pyspark | by Hannes De Smet | datamindedbe | Medium

ローカル開発用のGlueの公式imageには以下の内容の/home/glue_user/spark/conf/hive-site.xml が存在してます。

調べてみるとhive-site.xmlはDBの接続情報を管理するファイルらしく、デフォルトではこちらの設定を読み込んで、AWS Glue Catalogのメタデータにアクセスする挙動となります。

<configuration>
    <property>
        <name>hive.metastore.connect.retries</name>
        <value>15</value>
    </property>
    <property>
        <name>hive.metastore.client.factory.class</name>
        <value>com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory</value>
    </property>
</configuration>

pytestコード

@pytest.fixture(scope="session")
def cleanup_warehouse() -> None:
    yield
    if os.path.exists(WAREHOUSE_PATH):
        shutil.rmtree(WAREHOUSE_PATH)

@pytest.fixture(scope="module")
def test_table(glue_context: GlueContext, sample_dataframe: DataFrame) -> str:
    spark = glue_context.spark_session
    table_name = "test_table"
    try:
        sample_dataframe.writeTo(f"local.{table_name}").create()
        yield table_name
    finally:
        spark.sql(f"DROP TABLE IF EXISTS local.{table_name}")

def test_create_iceberg_table(glue_context: GlueContext, cleanup_warehouse: None, sample_dataframe: DataFrame) -> None:
    spark = glue_context.spark_session

    # new table setting
    table_name = "test_new_table"
    table_full_name = f"local.{table_name}"
    table_location = f"{WAREHOUSE_PATH}/{table_name}"

    create_iceberg_table(df=sample_dataframe, table_name=table_full_name, table_location=table_location)

    result_df = spark.table(table_full_name)
    assert result_df.collect() == sample_dataframe.collect()

def test_append_iceberg_table(
    glue_context: GlueContext, cleanup_warehouse: None, sample_dataframe: DataFrame, test_table: str
) -> None:
    spark = glue_context.spark_session

    table_full_name = f"local.{test_table}"
    table_location = f"{WAREHOUSE_PATH}/{test_table}"

    append_data = [
        ("val4", 4, "2000/01/04 04:00:00"),
        ("val5", 5, "2000/01/05 05:00:00"),
    ]
    append_df = spark.createDataFrame(append_data, sample_dataframe.schema)

    append_iceberg_table(df=append_df, table_name=table_full_name, table_location=table_location)

    result_df = spark.table(table_full_name)
    new_df = result_df.filter(~col("col1").isin(["val1", "val2", "val3"]))
    assert new_df.collect() == append_df.collect()
    assert result_df.count() == 5

以下のフィクスチャを利用しています。

  • cleanup_warehouse
    • ローカルのテーブルを永続化するフォルダの作成・削除
  • test_table
    • テスト用テーブルの作成・削除

外部のデータストアにアクセスせず、テーブルの新規作成・データ追加のテストができているのを確認できます。

参考

LocalStackのS3環境を利用したAWS Glue Jobローカル実行・テスト方法

Glue Jobのローカル開発

AWS Glue Jobをローカル環境で開発する際、AWS公式が提供してるDocker imageを活用する方法があります。

Developing and testing AWS Glue job scripts locally

Glue Jobを利用する場合、S3からデータを取得・保存するユースケースが多いかと思います。

本記事では、ローカル環境にAWS環境をエミュレートするLocalStackを活用して、実際のAWSリソースへのデータをやり取りを行わずGlue Jobの動作検証・テストを行う方法を書きます。
Overview | Docs

Glue version 4.0のdocker imageであるamazon/aws-glue-libs:glue_libs_4.0.0_image_01 を使用します。

Glue versionごとにdocker imageが異なるので、ご注意ください。

本文中コード github.com

ディレクトリ構成

/
├─ src
|  └─ glue_job.py
├─ tests
│  └─ test_glue_job.py
└─ compose.yaml

Glue Jobの実行スクリプト

import sys
from typing import Dict

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext

S3_ENDPOINT_URL = "http://s3.dev:4566"
AWS_REGION = "ap-northeast-1"
S3_BUCKET = "test-job-bucket"

def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame:
    dyf = glue_context.create_dynamic_frame.from_options(
        format_options={
            "quoteChar": '"',
            "withHeader": True,
            "separator": ",",
        },
        connection_type="s3",
        format="csv",
        connection_options={
            "paths": [source_s3_path],
            "recurse": True,
        },
    )
    return dyf

def write_dynamic_frame_to_s3(glue_context: GlueContext, dyf: DynamicFrame, destination_s3_path: str) -> None:
    glue_context.write_dynamic_frame.from_options(
        frame=dyf,
        connection_type="s3",
        connection_options={"path": destination_s3_path},
        format="parquet",
        format_options={"writeHeader": True},
    )

def main(args: Dict[str, str]) -> None:
    sc = SparkContext()
    if args["JOB_NAME"] == "test":
        sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
        sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION)
        sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    glue_context = GlueContext(sc)

    job = Job(glue_context)
    job.init(args["JOB_NAME"], args)

    dyf = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/test_data.csv")
    write_dynamic_frame_to_s3(glue_context=glue_context, dyf=dyf, destination_s3_path=f"s3://{S3_BUCKET}/output")

    job.commit()

if __name__ == "__main__":
    args = getResolvedOptions(sys.argv, ["JOB_NAME"])
    main(args)

以下の処理を実行するGlue Jobのスクリプトを用意しました

  • S3からcsvデータを取得
  • S3にparquet形式のデータを保存

SparkContextにLocalStackでエミュレートしたS3にアクセスする設定を追加しています。

本番環境のGlueJobでは実際のAWSリソースにアクセスするため、以下の設定はローカル開発時のみ追加する必要があります。

実行引数のJOB_NAMEがtestの場合は、LocalStackへアクセスする設定を追加することでリソースの使い分けを行っています。

sc = SparkContext()
if args["JOB_NAME"] == "test":
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
    sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION)
    sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
glue_context = GlueContext(sc)

docker composeの設定

services:
  glue.dev.s3.local:
    container_name: s3.dev
    image: localstack/localstack:3.8.0
    environment:
      - SERVICES=s3
      - AWS_DEFAULT_REGION=ap-northeast-1
      - AWS_DEFAULT_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    networks:
      - glue.dev.network
  glue.dev:
    container_name: glue.dev
    image: amazon/aws-glue-libs:glue_libs_4.0.0_image_01
    volumes:
      - ./:/home/glue_user/workspace/
    environment:
      - DISABLE_SSL=true
      - AWS_REGION=ap-northeast-1
      - AWS_OUTPUT=json
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
    networks:
      - glue.dev.network
    tty: true
    stdin_open: true
networks:
  glue.dev.network:
    name: glue.dev.network

LocalStackでエミュレートしたAWS環境にGlue Jobのコンテナがアクセスできるように、Glue Jobのコンテナの環境変数に、LocalStackの起動設定で指定したAWS_ACCESS_KEY_IDとAWS_SECRET_ACCESS_KEYを追加します。

compose.yamlはこちらの実装を参考にさせてもらいました。
GitHub - n-yokota/aws_glue_test_concept

docker containerを起動します。

$ docker compose up -d

Glue Jobをローカル環境で実行する

LocalStackのS3 bucket準備

Glue Jobのコンテナ環境に入ります。

$ docker compose exec glue.dev bash 

LocalStackのS3に test-job-bucket Bucketを作成します。

$ aws s3 mb s3://test-job-bucket --endpoint-url http://s3.dev:4566

テスト用ファイルをLocalStackのS3 Bucketに追加します。

$ aws s3 mv ./data/test_data.csv s3://test-job-bucket/test_data.csv --endpoint-url http://s3.dev:4566

S3 Bucketにテスト用ファイルが保存されていることを確認できます。

$ aws s3api list-objects-v2 --bucket test-job-bucket  --endpoint-url http://s3.dev:4566
{
    "Contents": [
        {
            "LastModified": "2024-10-08T14:31:52.000Z",
            "ETag": "\"19ee3f2027cea3841e74c3aa3520b5ed\"",
            "StorageClass": "STANDARD",
            "Key": "test_data.csv",
            "Size": 100
        }
    ]
}

コンテナ環境でGlue Job実行

Glue Jobのスクリプトを通常のpyhonスクリプトとして実行します。

$ python3 src/glue_job.py --JOB_NAME test

対象のS3 Bucketにparquet形式でファイルが保存されていることを確認できます。

$ aws s3api list-objects-v2 --bucket test-job-bucket  --endpoint-url http://s3.dev:4566
{
    "Contents": [
        {
            "LastModified": "2024-10-08T14:32:23.000Z",
            "ETag": "\"fa768a3a4c9659604c161e45a17ec02f\"",
            "StorageClass": "STANDARD",
            "Key": "output/part-00000-3479d3db-5a89-4bd7-856c-fd714291c2f3-c000.snappy.parquet",
            "Size": 981
        },
        {
            "LastModified": "2024-10-08T14:31:52.000Z",
            "ETag": "\"19ee3f2027cea3841e74c3aa3520b5ed\"",
            "StorageClass": "STANDARD",
            "Key": "test_data.csv",
            "Size": 100
        }
    ]
}

LocalStackのS3を使用したGlue Jobのテスト実行方法

テスト用GlueContextのfixture作成

@pytest.fixture(scope="session")
def glue_context() -> GlueContext:
    spark = (
        SparkSession.builder.master("local[1]")
        # Configure for testing fast
        # https://kakehashi-dev.hatenablog.com/entry/2023/07/13/110000
        .config("spark.sql.shuffle.partitions", "1")
        .config("spark.ui.showConsoleProgress", "false")
        .config("spark.ui.enabled", "false")
        .config("spark.ui.dagGraph.retainedRootRDD", "1")
        .config("spark.ui.retainedJobs", "1")
        .config("spark.ui.retainedStages", "1")
        .config("spark.ui.retainedTasks", "1")
        .config("spark.sql. ui.retainedExecutions", "1")
        .config("spark.worker.ui.retainedExecutors", "1")
        .config("spark.worker.ui.retainedDrivers", "1")
        .getOrCreate()
    )
    # Configuration for localstack
    # https://future-architect.github.io/articles/20220428a
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint", S3_ENDPOINT_URL)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.endpoint.region", AWS_REGION)
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.mode", "None")
    spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.change.detection.version.required", "false")

    yield GlueContext(spark.sparkContext)
    spark.stop()

テスト実行速度を高速化するためにspark設定はこちらの記事を参考にしてます.
AWS GlueのCI/CD環境を作ってみた - KAKEHASHI Tech Blog

pytest実行時にLocalStackのS3へアクセスするための設定はこちらの記事を参考にしてます.
AWS Glueの開発環境の構築(2022) | フューチャー技術ブログ

テスト用S3 Bucketのfixture作成

@pytest.fixture(scope="session")
def s3_client():
    return boto3.client(
        "s3",
        endpoint_url=S3_ENDPOINT_URL,
        aws_access_key_id=AWS_ACCESS_KEY_ID,
        aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
        region_name=AWS_REGION,
    )

@pytest.fixture(scope="session")
def s3_bucket(s3_client: boto3.client) -> str:
    bucket_name = "test-s3-bucket"

    try:
        s3_client.head_bucket(Bucket=bucket_name)
    except Exception:
        s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
        )

    yield bucket_name

    try:
        s3_client.delete_bucket(Bucket=bucket_name)
    except Exception as e:
        print(f"Failed to clean up test bucket: {e}")

@pytest.fixture(scope="session")
def setup_s3_data(s3_client: boto3.client, s3_bucket: str) -> dict[str, str]:
    key = "test_data.csv"
    inputs = [
        {"col1": "val1", "col2": 1, "col3": "2000/01/01 01:00:00"},
        {"col1": "val2", "col2": 2, "col3": "2000/01/02 02:00:00"},
        {"col1": "val3", "col2": 3, "col3": "2000/01/03 03:00:00"},
    ]
    input_str = io.StringIO()
    w = csv.DictWriter(input_str, fieldnames=inputs[0].keys())
    w.writeheader()
    for input in inputs:
        w.writerow(input)

    body = input_str.getvalue()
    s3_client.put_object(Bucket=s3_bucket, Key=key, Body=body)

    yield {"bucket_name": s3_bucket, "key": key}

    try:
        s3_client.delete_object(Bucket=s3_bucket, Key=key)
    except Exception as e:
        print(f"Failed to clean up test data: {e}")

# https://docs.pytest.org/en/6.2.x/fixture.html#factories-as-fixtures
@pytest.fixture
def get_s3_objects(s3_client):
    def _get_s3_objects(s3_bucket: str, prefix: str) -> list[str] | None:
        try:
            response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=prefix)
            if "Contents" in response:
                return [obj["Key"] for obj in response["Contents"]]
        except Exception:
            return

    return _get_s3_objects
    

@pytest.fixture(scope="module")
def sample_dynamicframe(glue_context: GlueContext) -> DynamicFrame:
    spark = glue_context.spark_session
    df = spark.createDataFrame(
        [
            ("val1", 1, "2000/01/01 01:00:00"),
            ("val2", 2, "2000/01/02 02:00:00"),
            ("val3", 3, "2000/01/03 03:00:00"),
        ],
        ["col1", "col2", "col3"],
    )
    dyf = DynamicFrame.fromDF(df, glue_context, "dyf")

    return dyf

各関数の役割

  • s3_client

    LocalStackのS3環境にアクセスするboto3 clientのfixture

  • s3_bucket

    テスト用のS3 Bucketを作成・削除するfixture

  • setup_s3_data

    テスト用のS3 Bucketにデータを追加・削除するfixture

  • get_s3_objects

    テスト用のS3 Bucketに特定のキーに含まれるオブジェクト一覧を取得するヘルパー関数

  • sample_dynamicframe

    テスト用DynamicFrameデータを作成するfixture

S3アクセスを伴う関数のテスト

def test_get_dynamic_frame_from_s3(glue_context: GlueContext, setup_s3_data: dict[str, str]) -> None:
    source_s3_path = f"s3://{setup_s3_data['bucket_name']}/{setup_s3_data['key']}"
    result = get_dynamic_frame_from_s3(glue_context=glue_context, source_s3_path=source_s3_path)

    assert isinstance(result, DynamicFrame)
    assert result.count() == 3

    df = result.toDF()
    assert len(df.columns) == 3
    assert df.columns == ["col1", "col2", "col3"]

    rows = df.collect()
    assert rows == [
        Row(col1="val1", col2="1", col3="2000/01/01 01:00:00"),
        Row(col1="val2", col2="2", col3="2000/01/02 02:00:00"),
        Row(col1="val3", col2="3", col3="2000/01/03 03:00:00"),
    ]

def test_write_dynamic_frame_from_s3(
    glue_context: GlueContext,
    s3_bucket,
    sample_dynamicframe: DynamicFrame,
    get_s3_objects,
) -> None:
    file_key = "test_write_data"
    destination_s3_path = f"s3://{s3_bucket}/{file_key}"
    write_dynamic_frame_to_s3(
        glue_context=glue_context,
        dyf=sample_dynamicframe,
        destination_s3_path=destination_s3_path,
    )
    actual_s3_objects = get_s3_objects(s3_bucket=s3_bucket, prefix=file_key)

    assert len(actual_s3_objects) > 0
    assert any([object for object in actual_s3_objects if object.endswith(".parquet")])

LocalStackのS3 Bucketを使用して、S3とデータのやり取りを行う関数をテストします。

Glue Jobのコンテナ内でpytestコマンドを実行します。

$ pytest tests

参考

dotfiles管理をchezmoiに移行する

今回作成したdotfliesです。 github.com

chezmoiとは

Go製のクロスプラットフォームのdotfiles管理ツールです。dotfilesとは .zshrc や .gitconfig といった設定ファイルを指す言葉です。
chezmoiの名前はフランス語の chez-moi (シェモア)に由来し、意味は「自宅」を表します。

www.chezmoi.io

主な機能としては以下となってます

  • テンプレート機能
  • パスワードマネージャーをサポート
  • アーカイブからのファイル読み込み
  • ファイル暗号化
  • スクリプトの実行

dotfilesの管理をシンボリックリンクで行っていましたが、もっとシンプルにできないかと思って調べみると dotfiles管理ツールがあることを知りました。
chezmoiはGitHubスター数が多く、ドキュメントが整理され情報量が多かったので今回使ってみることにしました。
Redditでもdotfiles管理方法についての議論がされているので、他のdotfiles管理ツールも参考にしてみるといいかと思います。
| How do you guys manage your dotfiles ?

初期設定

インストール

Macの場合はbrewで入れられます。自分はbrewで入れました。

$ brew install chezmoi

プロジェクトの初期化

chezmoiを使用開始する場合、initコマンドを実行します。

$ chezmoi init

~/.local/share/chezmoi というフォルダが作成され、gitのローカルレポジトリが作成されています。
chezmoiではdotfilesをgit repositoryで管理する前提のAPI仕様となっています。

既にdotiflesがリモートレポジトリに存在する場合は、レポジトリを指定することで~/.local/share/chezmoi 以下に既存のdotiflesレポジトリがcloneされます。

$ chezmoi init [email protected]:<your-github-id>/dotfiles.git

~/.local/share/chezmoi 以下のファイルがchezmoiで管理対象となるdotfilesとなります。

次にchezmoiの管理対象とするファイルを指定します。

管理対象のdotfilesを追加

chezmoi add コマンドで管理対象のファイルを指定します。
例えば、 .zshrc を管理する場合は以下のコマンドを実行します。

$ chezmoi add ~/.zshrc

実行すると、 ~/.local/share/chezmoi のフォルダ以下に dot_zshrc というファイル名で .zshrc の内容がコピーされます。

dotfilesの編集

chezmoi edit コマンドで管理対象のファイルを編集できます。

$ chezmoi edit ~/.zshrc

上記のコマンドでは ~/.zshrc は変更されず、~/.local/share/chezmoi/dot_zshrc が変更されます。

dotfilesの変更を反映

chezmoi apply コマンドで元のdotfileに変更を反映できます。

$ chezmoi apply

上記の例だと~./local/share/chezmoi/dot_zshrc の変更が ~/.zshrc に反映されることになります。

git管理

~./local/share/chezmoi 以下にあるファイルをgit管理対象にします。

$ git add .
$ git commit -m "Initial commit"
$ git push origin main

dotfiles管理のワークフローまとめ

以上のコマンド実行のプロセスを整理すると以下のようになります。

https://www.chezmoi.io/quick-start/から引用

今回の例だと以下のようになります。

  • home directory: ~/.zshrc
  • working copy: ~./local/share/chezmoi/dot_zshrc
  • local repo : ローカルのgit repository
  • remote repo : リモートのgit repository

chezmoiはGitのように変更を追従するファイルを work spaceに追加することで管理を行います。
自前でdotfilesを管理する場合はシンボリックリンクを貼る方法が行われますが、chezmoiでは chezmoi add が代わりとなります。

今回行った設定

dotfilesの追加

# zsh
$ chezmoi add ~/.zshrc

# Git
$ chezmoi add ~/.gitconfig

# Tmux
$ chezmoi add ~/.tmux.conf

# JetBrain
$ chezmoi add ~/.ideavimrc

# Alacritty
$ chezmoi add ~/.config/alacritty

# Starship
$ chezmoi add ~/.config/starship.toml

# VSCode
$ chezmoi add ~/Library/Application Support/Code/User/keybindings.json
$ chezmoi add ~/Library/Application Support/Code/User/settings.json

~/.config/alacritty のようにルートフォルダより深い階層にあるファイルを追加する場合、自動でフォルダが ~./local/share/chezmoi 以下に作成されます。
この場合は.local/share/chezmoi/dot_config/alacritty という名前のフォルダが自動作成されます。

VSCodeの設定ファイルはMacに依存したフォルダ階層になっています。
chezmoiでは同一ファイルをシステムごとに別Pathで管理する方法が提供されています。
Manage machine-to-machine differences - chezmoi

chezmoiのGitHub RepositoryのDiscussionsでも、作者がsettings.jsonの管理方法を解説しているので、理想的にはこちらのやり方を行った方がいいです。
Handle configuration files that are externally modified and also in different locations on difference machines · twpayne chezmoi · Discussion #1312 · GitHub

brew install用のスクリプト作成

chezmoiではrun_ というprefixのついたスクリプトの実行を自動で行ってくれます。
Use scripts to perform actions - chezmoi
スクリプト名の命名規則によって挙動を制御でき、 run_once_ のprefixがつくスクリプトは chezmoi apply 初回実行時のみ実行されるので、PCの初期セットアップに便利です。

brewでインストールしたものを管理しようと思います。
まず brewで入れたものの一覧を取得します。

$ brew bundle dump 

Brewfileというファイル名で一覧が出力されます。
run_once_install_brew.sh というファイル名で 、brew installを実行するためのコマンドを保存します。

#!/usr/bin/env zsh

brew bundle --file="./Brewfile"

chezmoi appy を初回実行した際にbrew installが行われるようになりました。

外部パッケージ取得の管理

dotfiles管理対象外の外部レポジトリやパッケージを .chezmoiexternal.<json|jsonc|toml> というファイルで管理できます。
親ディレクトリに該当のフォルダ存在しない場合、chezmoiが自動でダウンロードしてディレクトリを作成してくれます。 https://www.chezmoi.io/reference/special-files-and-directories/chezmoiexternal-format/

tmuxプラグイン管理のtpmと、alairittyのテーマ設定のフォルダを管理するようにしました。

[".tmux/plugins/tpm"]
type = "archive"
url = "https://github.com/tmux-plugins/tpm/archive/master.tar.gz"
exact = true
stripComponents = 1
refreshPeriod = "168h"

[".config/alacritty/themes"]
type = "archive"
url = "https://github.com/alacritty/alacritty-theme/archive/master.tar.gz"
exact = true
stripComponents = 1
refreshPeriod = "168h"

[".config/alacritty/catppuccin"]
type = "archive"
url = "https://github.com/catppuccin/catppuccin/archive/master.tar.gz"
exact = true
stripComponents = 1
refreshPeriod = "168h"

作成したdotfiles

github.com

参考

「Machine Learning System Design Interview」読書メモ

本記事について

機械学習システム設計の面接対策本である「Machine Learning System Design Interview」を読んだ時の読書メモです。
本書についての紹介記事は以下となってます。 nsakki55.hatenablog.com

本書の公式HP
bytebytego.com

1. Introduction and Overview

MLシステムデザインを行うためのフレームワークを作成

  • Clarifying requirements
  • Framing the problem as an ML task
  • Data preparation
  • Model deployment
  • Evaluation
  • Deployment and serving
  • Monitoring and infrastructure

Clarifying Requirements

  • 問題の解像度を上げるために以下の質問を行う
    • Business objective
    • Features the system needs to support
    • Data
    • Constraints
    • Scale of the system
    • Performance

Frame the Problem as an ML Task

Data Preparation

  • MLモデルに高品質のデータを入力するための処理
  • Data Engineering
    • データエンジニアリングはデータを収集・保存・取得・加工するパイプラインを設計・構築すること
    • Data sources: MLシステムは異なる生成元からのデータを組み合わせて使用する
    • Data storage: データ集合を保持・管理する。Relational, Kye/Value, Column-based, Graph, Documentでデータベースが異なる。
    • ETL : データ取得・加工・保存プロセス
    • Data types : 構造化・非構造化データに分類でき、適応するモデルアルゴリズムが変わる
  • Feature Engineering
    • 特徴量エンジニアリングは2つのプロセスが含まれる
      • ドメイン知識を使って生データから有効な特徴を抽出する
      • 有効な特徴をモデルが使用可能なデータ型に変換する
    • 特徴量エンジニアリングの操作
      • Handling missing values : 欠損値の取扱い。削除と補完が一般的。
      • Feature Scaling : 正規化・標準化・ログスケーリングがある。
      • Discretization(Bucketing) : 連続値をカテゴリ特徴に変換するプロセス。
      • Encoding categorical features: カテゴリ特徴をモデルに入力できるデータ型に変換する。Integer encoding, One-hot encoding, Embedding learning がある。

Model Deployment

  • Model Deoloymentは適切なMLモデルを選択し学習するプロセス
  • Model selection
    • モデル選択の典型的なプロセス
      • 簡単なベースラインの作成
      • 簡単なモデルで実験
      • より複雑なモデルに切り替え
      • さらなる精度向上が必要な場合モデルアンサンブルを使用する
    • 典型的なモデル選択肢
      • ロジスティック回帰
      • 線形回帰
      • 決定木
      • 勾配ブースティング木
      • SVM
      • Naive Bayes
      • Factorization Machine(FM)
      • Neural Networks
    • モデル比較基準
      • 学習に必要なデータ量
      • 学習速度
      • 調整が必要なハイパーパラメータ数
      • 継続的学習の可能性
      • コンピューティングリソース要件
      • モデル解釈性
  • Model Training
    • データセットの作成 : 5ステップに分かれる。生データ収集→特徴とラベルの決定→サンプリング戦略の選定→データ分割→クラス不均衡の対応
      • サンプリング戦略の種類
        • convenience sampling, snowball sampling, stratified sampling, reservoir sampling, importance sampling
    • 損失関数の選定 : 既存の損失関数のリストから選ぶのが常だが、問題に応じて独自の変更を加える必要がある
    • scratch vs fine-tuning
    • 分散学習 : 時間が経過するにつれモデルとデータ規模が大きくなる場合に重要となる

Evaluation

  • Offline evaluatioin
    • ground truthと予測値の近さを測る指標を使う
  • Online evaluation
    • モデルインパクトを測るためにオフライン指標とは異なるビジネス指標を選ぶ
    • 例: Ad click predictionの場合はClick-through rate, revenue lift

Deployment and Serving

  • Cloud vs on-device deployment : trade offが存在
    • デプロイの簡易さ
    • コスト
    • ネットワークレイテンシ
    • 推論レイテンシ
    • ハードウェア制約
    • プライバシー
    • インターネット接続への依存
  • Model compression : モデルサイズを小さくする操作。3つの方法がよく使われる
    • 知識蒸留 : 大きいモデル(teacher)を模倣した小さなモデル(student)を学習
    • プルーニング : 不必要なパラメータを0にすることでモデルをスパースにする
    • 量子化 : パラメータのデータ型を小さくする
  • Test in production : 本番リクエストを用いてモデルをテストする。シャドウデプロイ、ABテスト、カナリアリリース、インターリービング、バンディットなど。
  • Prediction pipeline
    • Batch prediction
      • 事前に予測値を計算
      • 2つの欠点
        • ユーザー嗜好の変化に対応ができない
        • 事前に計算する必要がある予測が分かってる場合でないと活用できない
    • Online prediction
      • 推論リクエストが来るたびに予測値を計算
      • 予測値取得に時間がかかる可能性がある

Monitoring

  • Why a system fails in production
    • データ分布シフトが最も頻繁に起きる理由
    • データ分布シフトへの対応方法
      • 大規模データセットでの学習
      • 新しいデータ分布でのモデル学習
  • What to monitor
    • システム運用関連指標
      • CPU/GPU利用率、リクエスト数、平均レスポンス時間
    • ML関連指標
      • モデルの入力・出力、ドリフト、モデル精度、モデルバージョン

Infrastructure

  • 学習・デプロイ・MLシステム運用の基盤
  • ML Interviewでは聞かれることは少ないが、DevOps・MLOpsのロールでは必要な知識

2. Visual Search System

Pinterstのような画像をもとに類似画像を検索するシステムの構築。

Clarifying Requirements

論点

  • ランク付けを類似順に行うのか
  • 動画もサポートするか
  • ユーザーごとパーソナライズした結果を表示するか
  • モデルは画像メタデータ・タグを使用できるか
  • Click以外のユーザーアクションは行われるか
  • コンテンツ監視を行うか
  • 学習データのラベリングはユーザー行動をもとに作成できるか
  • 検索速度はどれくらい早い必要があるか

要件整理

  • ユーザーから与えられた画像をクエリとして、類似画像を検索する
  • 類似度に基づいてランク付し、ユーザーに表示する
  • 画像のみをサポート
  • パーソナライズは不要

Frame the Problem as an ML Task

Defining the ML objective

  • ユーザーが探してる画像を正確に取得すること

Specifying the system’s input and output

  • 入力 : クエリ画像
  • 出力 : 類似度に基づいてランク付された画像一覧

Choosing the right ML category

  • visual search systemはランキング問題としてみなせる
  • Representation Learning (表現学習)
    • 入力データをEmbeddingと呼ばれる内部表現に変換できるように学習したモデル
    • 入力画像をN次元の埋め込み空間にマッピングするモデルと捉えられる
  • Representation Learningを使用して画像をランキングする
    • 入力画像をEmbeddingベクトルに変換
    • Embedding空間上でのクエリ画像と他の画像の距離を測定し、類似度スコアを計算する

Data Preparation

Data engineering

  • 利用可能データ
    • Images
    • Users
    • User-image interactions

Feature engineering

  • 典型的な画像処理テクニック
    • Resizing
    • Scaling
    • Z-score normalization
    • Consistent color mode

Model Deployment

Model selection

  • Neural Networksを選択
    • Neural Networksは画像やテキストのような非構造化データを扱いやすい
    • 表現学習で必要なEmbeddingを作成するのに、Neural Networksは古典的モデルより優れている
  • モデル候補
    • CNN-based : ResNet
    • Transformer-based : ViT

Model training

  • 学習過程でEmbeddingを取得できる必要がある
  • contrastive training
    • 類似・非類似画像の区別を行うモデルを学習
    • 類似画像はクエリ画像と内部表現が近くなるように学習される

Constructing the dataset

  • contrastive trainingのデータ
    • クエリ画像(1) + 類似画像(1) + 非類似画像(n-1)
  • 類似画像のラベリング方法
    • 人間による判定
    • Clickをプロキシ指標とした判定
    • クエリ画像から人工的に生成
  • ベストなアプローチ
    • trade-offを考慮して複数の選択肢を議論することが重要
    • 自己教師学習の手法を採用する
      • SimCLRのような手法が大規模データでの学習結果を担保しやすい
      • 数億の画像イメージにアクセスできる

Choosing the loss function

  • 学習の目的はEmbedding空間で類似画像データが近くなるように、モデルパラメータが学習されること
  • 生成されたEmbeddingの品質を測定できるよう損失関数を設計
  • contrastive loss
    • compute similarities
    • Softmax
    • Cross-entropy

Evaluation

  • offline metrics
    • 検索システムで一般的に使用される指標
      • Mean reciprocal rank (MRR)
      • Recall@k
      • Precision@k
      • Mean average precision (mAP)
      • Normalized discounted cumulative gain (nDCG)
    • nDCGをオフライン指標として使用する
  • online metrics
    • Click-through rate (CTR)
      • 検索・レコメンドシステムで一般的に使用される
    • 提案画像に使われた時間
      • 検索システムが正確であるほど、増加が期待される指標

Serving

  • Prediction pipeline
    • Embedding generation service
      • 入力クエリ画像のEmbeddingを計算するサービス
    • Nearest neighbor service
      • Embedding空間からクエリ画像との近傍画像を取得する
    • Re-ranking service
      • ビジネスロジックに関わる処理
      • プライベート画像の除外、重複画像の除外などの不適切な結果のフィルターを行う
  • Indexing pipeline
    • Indexing service
      • 検索パフォーマンス向上のために全ての画像のインデックスを作成
      • 新しい画像を追加された際にインデックスを生成
    • Performance of nearest neighbor (NN) algorithms
      • 最近傍検索は情報検索・レコメンデーションシステムの中心的要素
      • 効率面での僅かな改善が全体のパフォーマンス向上に大きく寄与する
      • Approximate nearest neighbor(ANN)
        • Tree-based ANN
        • Locality-sensitive hashing(LSH)-based ANN
        • Clustering-based ANN
      • 実装

Other Talking Points

  • 不適切コンテンツ除去の対応
  • ポジションバイアスのような別のバイアスの存在
  • 検索結果向上のために画像メタデータ・タグをどのように活用するか
  • 物体検知を利用した効率的な切り抜き
  • 良い内部表現を学習するためにグラフニューラルネットワークをどのように活用するか
  • Textを入力とした画像検索をサポートするには
  • データアノテーションのために active learning, human-in-the-loopをどのように活用するか

3. Google Street View Blurring System

Google Street Viewのようなボカシを入れるシステムの構築。

Clarifying Requirements

論点

  • ビジネス目標はユーザーのプライバシーを守ることか
  • 設計するシステムはStreet View画像から人間の顔と車ナンバープレートを見つけ、ボカシを入れること。適切にボカシが入っていない場合、ユーザーから報告できる。
  • アノテーション済み画像は手に入るか
  • 人種、年齢、性別といったバイアスを持つデータセットか
  • 厳しいレイテンシ要件が求められるか

要件整理

  • Street View画像から人間の顔と車ナンバープレートを検知し、ボカシをいれるシステムを設計する
  • アノテーション済みの100万画像が存在
  • システムのビジネス目標はユーザープライバシーを守ること

Frame the Problem as an ML Task

Defining the ML objective

  • 画像中から特定の物体を正確に検知すること

Specifying the system’s input and output

  • 入力 : 物体が0または複数ある画像
  • 出力 : 物体の位置

Choosing the right ML category

  • 物体検知システムは2つの機能を担う
    • 画像中の各物体の位置を予測. 回帰問題
    • 各バウンディングボックスのクラスを予測. 他クラス分類問題
  • Two-stage networks
    • 2つの分割されたモデルが使われる. R-CNN, Fast R-CNN, Faster-RCNN
      • Region proposal network(RPN)
        • 画像をスキャンし物体と思われる候補領域を提示する
      • Classifier
        • 候補領域の物体を分類
  • One-stage networks
    • 1つのモデルでバウンディングボックスとクラス分類を行う. YOLO, SSD
  • Two-stage networksを採用する
    • 100万データは一般的な物体検知のデータセットサイズと比較すると多いわけではない
    • Two-stageにしても学習コストが肥大化しない

Data Preparation

Data Engineering

Feature Engineering

  • データ拡張
    • Random crop
    • Random saturation
    • Vertical or horizontal flip
    • Rotation and/or translation
    • Affine transformations
    • Changing brightness, saturation, or contrast
  • データ拡張による生成タイミングは2通りの方法がある
    • オフライン : 学習前に生成
    • オンライン : 学習中に都度生成

Model Deployment

Model Selection

  • モデル要素
    • Convolutional layers
    • Region Proposal Network (RPN)
    • Classifier

Model training

  • 2つの損失関数を使用

Evaluation

  • バウンディングボックス予測値の評価指標
    • Inference Over Union (IOU)
  • offline metrics
    • 物体検知で便利な指標
      • Precision
      • Average Precision
      • Mean Average Precision
  • online metrics
    • ユーザーレポート数・不満数
    • 異なる人種や年齢グループにまたがって平等に人間の顔にボカシを入れられてるか

Serving

  • Overlapping bounding boxes
    • Non-maximum suppression (NMS)
    • ML system design
      • Batch prediction pipeline
        • Preprocessing
          • 生データを特徴量データに加工
        • Blurring service
          1. 画像中で検出した物体リストを作成
          2. NMSを用いて検出した物体のリストを調整
          3. 検出した物体にボカシをいれる
          4. オブジェクトストレージにボカシを入れた画像を保存
      • Data pipeline
        • Hard negative mining
          • 予測が誤った画像を学習データに追加

Other Taking Points

  • Transformer-basedの物体検知アーキテクチャはone-stage, two-stageモデルとどう異なるか、それぞれのメリット・デメリットは何か
  • 分散学習を用いた大規模データセットでの物体検知モデルの改善
  • GDPRのシステムへの影響
  • 顔検知システムのバイアスの評価
  • どのように継続的にモデルをファインチューニングするか
  • 学習のためのデータポイントを選択するために、active learningとhuman-in-the-loop-MLをどのように利用するか

4. YouTube Video Search

YouTubeのようなTextクエリから最も関連する動画を検索するシステムを構築する

Clarifying Requirements

論点

  • 入力クエリはTextのみか
  • プラットフォームのコンテンツは動画のみか
  • 動画は映像コンテンツとタイトルや説明書きのテキストデータで決定される
  • 利用可能な学習データはあるか
  • 英語以外の言語もサポートする必要があるか
  • プラットフォームに存在する動画の数は何個か
  • 結果をパーソナライズ化する必要があるか

要件整理

  • 動画の検索システムの構築
  • 入力はテキストクエリ、出力はテキストクエリに関連性のある動画のリスト
  • 動画の映像コンテンツとテキストデータを利用可能
  • 1000万個の動画とテキストのペアの学習データセットが利用可能

Frame the Problem as an ML Task

Defining the ML objective

  • Textクエリとの関連性に基づいた動画のランク付け

Specifying the system’s input and output

  • 入力 : text クエリ
  • 出力 : text クエリとの関連性に基づいて並び替えられた動画のリスト

Choosing the right ML category

  • Visual search
    • Textクエリと映像コンテンツの関連性に基づいて動画をランク付する
    • 表現学習が一般的に使用される
    • Video Encoder と Text Encoderを持ち、Video EmbeddingとText Embeddingの内積計算で類似性の計算を行う
  • Text search
    • Textクエリと動画のタイトル、説明、タグなどのTextデータとの類似性に基づいて動画をランク付する
    • 転置索引(Inverted Index )が一般的に使用される
    • 機械学習モデルを必要とせず、学習コストがかからない
    • Elastic Searchが有名な検索エンジン

Data Preparation

Feature Engineering

  • Preparing text data
    • Text normalization
    • Tokenization
    • Tokens to IDs
  • Preparing video data
    • decode frames
    • sample frames
    • resizing
    • scaling, normalization, correcting color mode

Model Deployment

  • Model selection
    • Text encoder
      • Textをベクトルへと変換し、意味の近い文章同士の距離が近くなるようにEmbeddingを生成する
      • Statistical methods
        • 統計的手法により文章を特徴ベクトルへと変換する
        • Bag of Words (BoW), Term Frequency Inverse Document Frequency (TF-IDF)
      • ML-based methods
        • MLモデルにより単語同士の意味の近さを反映したEmbedding空間を作成する
        • Embedding (lookup) layer, Word2vec, Transformer-based architectures
      • 最も効果的なEmbeddingを作成できるTransformer-besed architecturesを採用する
    • Video encoder
      • Video-level models
        • 動画全体でembeddingを作成する
        • モデルは動画全体を処理し、計算コストが高い
      • Frame-level models
        • 動画からフレーム画像をサンプリングしEmbeddingを作成する
        • モデルはフレーム画像を処理し、計算コストが低い
        • 動画の連続的な特徴を学習できない
      • 学習と推論足が早く計算コストが低い、ViT(Frame-level model)を採用する

Evaluation

  • Offline metrics
    • Precision@k, mAP
    • Recall@k
    • Mean Reciprocal Rank (MRR)
  • Online metrics
    • Click-through rate (CTR)
    • Video completion rate
    • Total watch time of search results

Serving

  • Prediction pipeline
    • Visual search
      • Textクエリのembeddingを取得し、最近傍探索で類似動画を検索する
    • Text search
      • Textクエリから動画タイトル・タグを検索する
    • Flushing layer
      • Visual searchとText searchの結果を組み合わせる
    • Re-ranking service
  • Video indexing pipeline
    • 学習済みのvideo encoderを用いて新しい動画のembeddingのindexを作成
  • Text indexing pipeline
    • 新しい動画のタイトル・タグのElasticSearchのindexを作成

Other Talking Points

  • マルチステージの設計(候補生成+ランキング)
  • 動画秒数・動画の人気度などの特徴の使用
  • ユーザー行動(click, like)でラベリングしたデータの活用
  • Textクエリと意味の近いタイトル・タグを見つけるMLモデルの使用
  • クエリ分類の要素追加
  • 検索結果向上ためにマルチモーダルモデルをどのように使用するか
  • 多言語サポートを行うためにどのように拡張すればよいか
  • 重複動画がユーザー体験に悪影響を及ぼすか
  • Textクエリを要素分解することの効果
  • 出力リストを生成する際に人気度や新規度を考慮するにはどうすれば良いか
  • 現実世界の検索システムがどう動いてるか

5. Harmful Content Detection

Facebook, LinkedIn, Twitterのような有害なコンテンツの検知システムを構築する

Clarifying Requirements

論点

  • 有害なコンテンツとアカウントの両方を検知するか
  • 投稿はテキスト・画像・動画が含まれるか
  • 英語のみをサポートするか
  • 有害なコンテンツはどのようなカテゴリを考慮するべきか
  • 投稿をラベリングする人間のアノテーターは存在するか
  • ユーザーからの有害コンテンツの報告を機能に含めるか
  • 有害コンテンツとなった理由を説明する必要があるか
  • システムのレイテンシー要件はあるか

要件整理

  • 新しい投稿がされた時に有害コンテンツの検知を行い、なぜ有害扱いされたかの説明をユーザーに知らせる
  • コンテンツはテキスト・画像・動画で構成され、さまざまな言語がある
  • ユーザーは有害コンテンツを報告できる

Frame the Problem as an ML Task

Defining the ML objective

  • 有害コンテンツを正確に検知する

Specifying the system’s input and output

  • 入力 : 投稿(画像・テキスト・動画・ユーザー情報)
    • Late fusion
      • 異なるデータ型ごとのMLモデルを作成し、それぞれの結果を合成する方法
      • 各モデルで学習・評価が可能
      • 複数モデル学習のコストがかかる。各データの組み合わせ情報が失われる。
    • Early fusion
      • 異なるデータ型を先に合成し1つのモデルを作成する
      • 1つのモデル学習だけ行えば良い。データの組み合わせ情報を使用できる
      • モーダルの関係性を学習するのが難しい
  • 出力 : 有害である確率値

Choosing the right ML category

  • MLカテゴリの候補
    • 単クラス分類
    • 有害クラスごとの単クラス分類
    • 他クラス分類
    • 他タスク分類 (採用)
      • Shared layers
        • 入力特徴を新しいデータに変換するための隠れ層
      • Task-specific layers
        • クラスごとの独自のML層
        • クラスごとに特徴を変換し分類のために最適化する

Data Preparation

Data engineering

  • 利用可能データ
    • ユーザー情報
    • 投稿
    • ユーザーの投稿に対する反応

Feature Engineering

  • Text content
    • Text preprocessing
      • normalization, tokenization
    • Vectorization
      • 事前学習済みのDistlmBERTの使用
  • Image or video
    • Preprocessing
      • decode, resize, normalize
    • Feature extraction
      • image, videoを特徴量ベクトルに変換する
      • image
        • CLIP, SimCLR
      • video
        • VideoMoCo
  • User reactions to the post
    • like, シェア、コメント、報告数
      • 数値特徴にしてscaling
    • 投稿に対するコメント
      • コメントごとのembeddingを作成
      • コメントごとのembeddingを集約
  • Author features
    • 投稿者の過去記録
      • 不適切な投稿数
      • ユーザーからの報告数
      • 不適切な単語割合
    • 投稿者のデモグラ情報
      • å¹´é½¢
      • 性別
      • 都市
    • アカウント情報
      • フォロワー・フォロー数
      • アカウント歴
  • Contextual infofmation
    • 日付
    • 端末

Model Deployment

  • Model selection
    • Neural Networks
  • Model training
    • Constructing the dataset
      • Hand labeling
        • 人手で後からラベリングしたデータ
        • 評価に使用する
      • Natural labeling
        • ユーザーからの報告でラベリングしたデータ
        • 学習に使用する
    • Choosing the loss function
      • タスクごとのcross entropyの和

Evaluation

  • Offline metrics
    • PR-curve
    • ROC-curve
  • Online metrics
    • Prevalence
      • 全投稿の内、防げなかった有害コンテンツの割合
    • Harmful impressions
    • Valid appeals
      • 有害と判定した内、誤って判定した割合
    • Positive rate
    • User reports per harmful class

Serving

  • Harmful content detection service
    • 新しい投稿があった際に有害コンテンツの確率を予測する
  • Violation enforcement service
    • 有害コンテンツの確率が高い投稿を削除する
  • Demoting service
    • 確率が低く有害コンテンツとして予測した投稿を一時的に降格する

Other Talking Points

  • 人間によるラベリングで発生したバイアスを扱う
  • 流行の有害クラスに対応する
  • ユーザーの一連の行動情報を有害コンテンツ予測に使用する
  • 人間のレビューのために効率的に投稿をサンプリングする
  • グレーゾーンのコンテンツへの対応
  • オンデバイスへのデプロイを行い、有害コンテンツ検知システムを効率化する
  • Transformer-basedのアーキテクチャを linear Transformerで代替して、効率的なシステムに変更する

6. Video Recommendation System

YouTubeのような動画レコメンデーションシステムを構築する

Clarifying Requirements

論点

  • 動画レコメンデーションのビジネス目標はユーザーエンゲージメントを増加させることか
  • ユーザーが現在視聴してる動画か、ユーザーのホームページの動画に関連した動画をおすすめするのか
  • ユーザーは全世界にいて、多言語をサポートしてるか
  • ユーザーからの反応に基づいてデータセットを構築するのか
  • プレイリスト機能は要件に含まれるか
  • プラットフォーム上で利用可能な動画は何個か
  • レイテンシー要件はあるか

要件整理

  • ホームページの動画レコメンデーションシステムの構築
  • ビジネス目標はユーザーエンゲージメントの増加
  • ユーザーがホームページをロードするたびに、システムが関連動画をレコメンドする
  • ユーザーは世界中にいて動画は多言語対応
  • 100億動画が存在し、瞬時にレコメンドする必要がある

Frame the Problem as an ML Task

Defining the ML objective

  • MLの目標の候補
    • ユーザーのクリック数の最大化
      • clickbaitと呼ばれる動画をレコメンドする危険性がある
    • 動画視聴完了数の最大化
      • モデルが秒数が短い動画をレコメンドする危険性がある
    • 合計視聴時間の最大化
    • 関連動画数の最大化 (採用)
      • エンジニアやプロダクトマネージャーが決めたルールで関連性を測定可能
      • Clickや動画の半分視聴などのユーザー行動でラベリング可能

Specifying the system’s input and output

  • 入力 : ユーザー情報
  • 出力 : 関連スコアに基づいてランク付された動画のリスト

Choosing the right ML category

  • 一般的なパーソナライズレコメンデーションシステムの種類
    • Content-based filtering
      • Pros
        • 新しい動画を推薦できる
        • ユーザー独自の思考を捉えられる
      • Cons
        • ユーザーの新しい興味を見つけにくい
        • ドメイン知識が必要
    • Collaborative filtering
      • Pros
        • ドメイン知識が不要
        • ユーザーの新しい分野の興味を見つけやすい
        • 効率的
      • Cons
        • コールドスタート問題
        • ニッチな興味を扱えない
    • Hybrid filtering (採用)
      • Parallel hybrid filtering
      • Sequential hybrid filtering

Data Preparation

Data Engineering

  • 利用可能データ
    • Videos
    • Users
    • User-video interactions

Feature Engineering

  • Video features
    • VideoID
    • 動画秒数
    • 言語
    • タイトル・タグ
  • User features
    • ユーザーデモグラ情報
    • コンテキスト情報
    • ユーザーの行動ログ

Model Deployment

  • model
    • Matrix factorization
    • Two-tower neural network
  • Constructing the dataset
    • user特徴とvideo特徴のペアとラベルデータ
  • Choosing the loss function
    • cross-entropy

Evaluation

  • Offline metrics
    • Precision@k
    • mAP
    • Diversity
  • Online metrics
    • Click-through rate (CTR)
    • The number of completed videos
    • Total watch time
    • Explicit user feedback

Serving

  • prediction pipeline
    • Candidate generation
      • 数十億ある動画候補から、数千に候補を減らす
      • ユーザー特徴と動画embeddingから最近傍法で生成する
      • 流行、人気に基づく異なるcandidate generationを組み合わせる
    • Scoring
    • Re-ranking
  • Challenges of video recommendation systems
    • Serving speed
      • 軽量なモデルをcandidate generationに利用するのが有効
    • Precision
    • Diversity
      • 複数のcandidate generationを導入するのが有効
    • Cold-start problem
      • 新規ユーザー
        • two-tower neural networkを用いることでユーザー特徴からレコメンドが可能
      • 新規動画
    • Training scalability

Other Talking Points

  • レコメンドシステムのexploration-exploitationトレードオフについて
  • レコメンドシステムに発生する異なるタイプのバイアスについて
  • 倫理に関する重要な考慮事項
  • 季節性の考慮
  • 複数目的に対する最適化
  • dislikeのようなネガティブフィードバックの活用
  • ユーザーの検索履歴・視聴履歴のような動画の時系列情報の活用

7. Event Recommendation System

Eventbriteのようなパーソナライズしたイベントレコメンドを行うシステムの構築

Clarifying Requirements

論点

  • ビジネス目標はチケット売り上げを増加させること
  • イベントに加えてユーザーはホテルやレストランの予約もできるか
  • イベントは一時的に発生し期限のある事象
  • イベントの説明、金額幅、場所、日付、時刻情報を活用できる
  • アノテーション済みデータが利用可能か
  • ユーザーの現在位置を取得可能か
  • フレンド機能はあるか
  • ユーザーは他のユーザーを招待可能か
  • ユーザーはイベントへの招待を行えるか
  • イベントは有料 or ç„¡æ–™
  • ユーザー数とイベント数はどれくらいか
  • 1日あたり何人のアクティブユーザーがweb・app siteに訪れるか
  • Google Map APIのような外部APIを利用可能か

要件整理

  • ユーザーにパーソナライズ化されたイベントレコメンドを行うシステムの構築
  • イベントが終了するとユーザーは登録が不可能になる
  • ユーザーはフレンド追加とイベント招待が可能
  • 学習データはユーザーの行動履歴に基づいて作られる
  • 主目的はチケット売り上げを増加させること

Frame the Problem as an ML Task

Defining the ML objective

  • イベント登録数を最大化すること

Specifying the system’s input and output

  • 入力 : ユーザー情報
  • 出力 : ユーザーに関連する上位k個のイベントのリスト

Choosing the right ML category

  • レコメンド問題への対応一般的な方法
    • 人気イベントをレコメンドするシンプルなルールベース
      • baselineとして適切
    • content-based, collaborative filteringのようなEmbeddingモデル
    • ランキング問題に問題を置き換える(採用)
      • Learning to Rank (LTR)
      • クエリが与えられた時に、クエリに最も関連する最新のアイテムリストを並び替える
  • LTR
    • Pointwise LTR (採用)
      • 1つitemとQueryを入力としてスコアを出力
    • Pairwise LTR
      • 2つのitemとQueryを入力として、itemの並び替えを出力
    • Listwise LTR
      • 複数のitemとQueryを入力として、itemの並び替えを出力

Data Preparation

Data engineering

  • 利用可能データ
    • Users
    • Events
    • Friendship
    • Interactions

Feature engineering

  • Event-base recommendation
    • 従来のレコメンドよりイベントベースのレコメンドの方が困難
    • イベントの存在期間が短いため、過去のデータを多く取得できない
    • コールドスタート問題が課題
  • Location-related features
    • イベントにアクセス可能か
    • イベントがユーザーと同じ国・都市か
    • イベントまでの距離がユーザーの都合にあうか
  • Time-related features
    • イベントまでの時間はどれ位あるか
    • 日付と時刻がユーザーの都合にあうか
  • Social-related features
    • イベントの参加人数
    • フレンドの参加に関する特徴
    • 他の人に招待されたイベントかどうか
    • フレンドがホストのイベントかどうか
    • 同一ホストのイベントに過去に参加したことがあるか
  • User-related features
    • 年齢・性別
  • Event-related features
    • 参加費用
    • イベント説明が過去の参加イベントの説明と似ているか
      • イベント説明をTF-IDでベクトル化し、類似度を計算
      • 人間のホストが作成する文章のため、ノイズとなる可能性がある
  • potential talking points
    • バッチ・ストリーミング特徴
    • 特徴計算の効率化
    • Decay factorの利用
    • Embedding Learningの利用
    • ユーザー情報から作成した特徴に含まれるバイアス

Model Deployment

  • Model selection
    • Logistic regression
      • Pros
        • 推論が高速
        • 効率的な学習
        • データが線形分離可能な場合に有効
        • モデル解釈が容易
      • Cons
        • 非線形問題が解けない
        • 多重共線性の影響を受ける
    • Decision tree
      • Pros
        • 学習が高速
        • 推論が高速
        • データ準備が容易
        • モデル解釈が容易
      • Cons
        • 決定境界が最適ではない
        • 過学習しやすい
      • 決定木のロバスト性を向上させる方法
        • Bagging
        • Boosting
    • Gradient-boosted decision tree (GBDT)
      • Pros
        • データ準備が容易
        • バリアンスを減らせる
        • バイアスを減らせる
        • 構造化データに利用可能
      • Cons
        • 調整するハイパーパラメータが多い
        • 非構造化データに使用できない
        • 継続的学習には合わない
    • Neural network (採用)
      • Pros
        • 継続的学習が可能
        • 非構造化データに利用可能
        • 表現力が高い
      • Cons
        • 学習コストが高い
        • 入力データ品質が出力に大きな影響を及ぼす
        • 大規模な学習データが必要
        • モデルがブラックボックス
  • Constructing the dataset
    • ユーザー特徴とイベント特徴のペアに、過去に登録したかどうかでラベル付したデータを作成
    • 不均衡データとなるため、Focal lossã‚„undersamplingを行う
  • Choosing the loss function
    • binary cross-entropy

Evaluation

  • Offlines metrics
    • Recall@k, Precision@k
    • MRR, nDCG, mAP
  • Online metrics
    • Click-through rate(CTR)
    • Conversion rate
    • Bookmark rate
    • Revenue lift

Serving

  • Online learning pipeline
    • イベントレコメンデーションはコールドスタート問題が起きるため継続的学習が必要
  • Prediction pipeline
    • Event filtering
      • 100万個のイベントから候補となるイベントをフィルターする
      • 位置情報やユーザー指定のフィルタを使用する
    • Ranking service
      • Raw Dataから動的に生成する特徴と、Feature Storeから取得した事前計算した特徴を組み合わせる

Other Talking Points

  • 生じうるバイアス
  • より表現力を高めるための特徴量の活用
  • レコメンドイベントの多様性と新規性を高めるにはどうすればよいか
  • プライバシーとセキュリティーの観点から考慮すべきこと
  • イベントホスト側とユーザー側の双方のニーズを満たすにはどうすればよいか
  • データセットを作成する際のdata leakageを防ぐ
  • モデル更新の最適な頻度を決める

8. Ad Click Prediction on Social Platforms

Google, Facebook, Instagramのような広告クリック予測システムを構築する

Clarifying Requirements

論点

  • ビジネス目標は売り上げを最大化すること
  • ユーザータイムラインに表示される広告のみを考慮し、各Clickが同じ売り上げを出す
  • 同じ広告を同一ユーザーに複数回表示するか
  • 広告を隠す、特定広告主をブロックする機能をサポートするか
  • 学習データは広告とユーザー行動に基づいて作成されるか
  • ユーザーのClickを正例としてラベル付するか
  • Clickが発生しなかった場合を負例としてラベル付するか
  • 継続的学習が必要か

要件整理

  • クリック予測システムを構築する
  • ビジネス目標は売り上げを最大化すること
  • 広告はユーザータイムラインのみに表示され、各Clickは同じ額の売り上げを発生させる
  • モデルを継続学習させる必要がある
  • 広告とユーザー行動に基づいて学習データが作成される

Frame the Problem as an ML Task

Defining the ML objective

  • 広告がクリックされるか予測する

Specifying the system’s input and output

  • 入力 : ユーザー情報
  • 出力 : クリック予測確率に基づいてランク付された広告リスト

Choosing the right ML category

  • pointwise Learning to Rank (LTR)
  • 単クラス分類

Data Preparation

Data engineering

  • 利用可能データ
    • Ads
    • Users
    • User-ad interactions

Feature engineering

  • Ad feature
    • IDs
    • image/video
    • Category, sub category
    • impression, click numbers
  • User feature
    • デモグラ情報
    • コンテキスト情報
    • ユーザーの反応情報

Model Deployment

  • Model selection
    • 広告クリック予測システムで一般的に使われる方法
      • Logistic Regression
      • Feature crossing + logistic regression
      • Gradient boosted decision trees
      • Gradient boosted decision trees + logistic regression
      • Neural networks
      • Deep & Cross networks
      • Factorization Machines
      • Deep Factorization Machines
  • Constructing the dataset
    • Positive label
      • 広告表示からt秒後にclickされた場合
      • 広告表示からt秒後にclickされなかった場合
  • Choosing the loss function
    • cross-entropy

Evaluation

  • Offline metrics
    • cross-entropy
    • normalized cross-entropy
  • Online metrics
    • CTR
    • Conversion rate
    • Revenue lift
    • Hide rate

Serving

  • Data preparation pipeline
    • Batch feature computation
      • 静的な特徴を集計しFeature Storeに格納
    • Online feature computation
      • 動的に変化する特徴をリアルタイムで計算
  • Continual learning pipeline
    • 新しい学習データでfine tuningを行う
  • Prediction pipeline

Other Talking Points

  • data leakageを防ぐことの重要性
  • model calibrationの実施
  • FFMとFMの違い
  • XDeepFMとDeepFMの違い
  • 破滅的忘却とは何か、防ぐための一般的な手段とは

9. Similar Listing on Vacation Rental Platforms

Airbnbのような類似リスティングを提示するシステムの構築

リスティング : 掲載されている家や船などの宿泊施設全般

Clarifying Requirements

論点

  • ビジネス目標は予約数を増加させること
  • 類似性の定義
  • レコメンドリストはユーザーごとにパーソナライズするか
  • プラットフォーム上で利用可能なリスティングの数
  • 学習データをどのように作成するか
  • 新しい候補が類似リスティングに現れるまでの時間

要件整理

  • vacation rental platformsでの類似リスティング作成の設計
  • 入力はユーザーが現在見ている特定のリスティング、出力はユーザーが次にクリックしそうな類似リスティング
  • レコメンドリスティングはログイン・非ログインユーザーに対して同じものをだす
  • 500万リスティング存在し、新しいリスティングが1日後にレコメンドに含まれるようにする
  • ビジネス目標は予約数を増加させること

Frame the Problem as as ML Task

Defining the ML objective

  • ユーザーが現在見ているリスティングを元に次にユーザーがクリックしそうなリスティングを予測する

Specifying the system’s input and output

  • 入力 : ユーザーが現在見ているリスティング
  • 出力 : ユーザーがクリックする確率に元づいて並び替えられたリスティング

Choosing the right ML category

  • session-based recommendation systems
    • ユーザーが現在ブラウズしてるアイテムに基づいてレコメンドを行うこと
    • 良いレコメンドはユーザーの一般的な嗜好ではなく、現在の嗜好に基づいてる
    • 従来のレコメンドと比較して、ユーザーの嗜好が頻繁に変わる
    • リスティングのEmbedding Vectorを作成するモデルを学習する
    • Embedding空間の距離を類似度として計算

Data Preparation

Data engineering

  • 利用可能データ
    • Users
    • Listings
    • User-listing interaction

Feature engineering

  • search session
    • ブラウジング履歴
    • クリックされた一連のリスティングID
    • 最終的に予約されたリスティングIDを保持

Model Deployment

  • Model selection
    • shallow neural netowork
      • リスティングのEmbedding学習
    • Model training
      • リスティングを入力としてコンテクストに含まれるリスティング一覧を予測する
  • Constructing the dataset
    • negative sampling
      • Positive pairs
        • 近いembeddingを持つリスティングの組み
      • Negative pairs
        • 遠いembeddingを持つリスティングの組み
  • Choosing the loss function
    • cross-entropy

Evaluation

  • Offline metrics
    • The average rank of the eventually-booked listing
      • ランク付されたリスティングリストの内、最終的に予約されたリスティングの位置
  • Online metrics
    • CTR
    • Session book rate

Serving

  • Training pipeline
    • 新しいリスティングデータ・ユーザーの反応データでモデルをfine tuning
  • Indexing pipeline
    • 全てのリスティングのEmbeddingのIndexを作成
  • Prediction pipeline
    • Embedding fetcher service
      • モデル学習時に入力リスティングが含まれる場合
      • モデル学習時に入力リスティングが含まれない場合
    • Nearest neighbor service
    • Re-ranking service

Other Talking Point

  • 潜在的なバイアスは何か
  • session-based アプローチとrandom walkの比較
  • ユーザーの長期的な嗜好を活用したsession-basedレコメンドシステムの改善
  • 季節性をどのようにリスティングシステムに導入するか

10. Personalized News Feed

Facebook, Twitter, LinkedInのようなニュースフィードのパーソナライズシステムの構築

Clarifying Requirements

論点

  • ニュースフィードのパーソナライズの目的はユーザーのプラットフォームへの定着
  • ユーザーがタイムラインを読み込んだ時に、新しいポストを表示する
  • 投稿はテキスト・画像・動画の組み合わせ
  • ユーザーエンゲージメントを維持するには、最もエンゲージメントの高いコンテンツをタイムラインのトップに表示する
  • click, like, shareのような特定のエンゲージメントに最適化した方がいいか
  • プラットフォーム上での主要なユーザーリアクションは何か
  • システムのレイテンシ要件
  • 1日のアクティブユーザー数、1にのタイムライン更新回数

要件整理

  • パーソナライズしたユーザーフィードシステムの構築
  • ユーザーエンゲージメントに基づいて、見られていない投稿、見られていないコメントがある投稿のランク付を行う
  • 200ms以内
  • システムの目的はユーザーエンゲージメントの増加

Frame the problem as an ML task

Defining the ML objective

  • 候補
    • 滞在時間やクリックなどの暗黙的なユーザーリアクション数の最大化
    • likeã‚„shareなどの明示的なユーザーリアクション数の最大化
    • 暗黙的と明示的なリアクションの重み付スコアの最大化(採用)

Specifying the system’s input and output

  • 入力 : ユーザー情報
  • 出力 : エンゲージメントスコアに基づいてランク付された見られていない or 見られていないコメントがある投稿のリスト

Choosing the right ML category

  • Pointwise Learning to Rank (LTR)
  • ユーザーと投稿の特徴を元に、複数の単クラス分類モデルを作成

Data Preparation

Data engineering

  • 利用可能データ
    • User
    • Posts
    • User-post interactions
    • Friendship

Feature engineering

  • Post features
    • Text content
    • Images or videos
    • Reactions
    • Hashtags
    • Post’s age
  • User features
    • デモグラ情報
    • コンテキスト情報
    • ユーザー投稿履歴
    • 投稿へのメンション
  • User-author affinities
    • like, click, comment, share rate
    • 投稿主とユーザーのフレンド期間
    • 近しい友達・家族か

Model Deployment

  • Model selection
    • neural network
      • 非構造化データを扱える
      • カテゴリカル特徴を表現するEmbedding layerを使える
      • 事前学習済みのモデルでfine tuningを行える
    • architecture
      • N independent DNNs
      • A multi-task DNN
  • Model training
    • Constructing the dataset
      • likeのクラス分類用のデータセットでは、likeのリアクションがあった投稿のラベルを正例として、likeのリアクションがない投稿を負例として扱う
    • Choosing the loss function
      • combine task-specific loss
        • binary cross-entropy (classification task)
        • MAE, MSE, Huber loss (regression task)

Evaluation

  • Offline metrics
  • Online metrics
    • Click-through rate (CTR)
    • Reaction rate
    • Total time spent
    • User satisfaction rate found in a user survey

Serving

  • Data preparation pipeline
  • Prediction pipeline
    • Retrieval service
    • Ranking service
    • Re-ranking service

Other Talking Points

  • 口コミが広がるだろう投稿の扱い
  • 新規ユーザーに対するパーソナライズ
  • 潜在的なバイアスを緩和させる方法
  • 適切な再学習頻度

11. People You May Know

Facebook, LinkedIn, Twitterのような、共通の学校・友達・職場などの繋がりを持ちたいと思えるユーザーリスト(PYMK)を作成するシステムの構築

Clarifying Requirements

論点

  • PYMKの目的はユーザーに見込みのあるつながりを見つけてもらってネットワークを広げてもらうこと
  • つながりを考慮する上で最も重要な要素は学歴、職歴、ユーザーの社会的コンテキスト
  • フレンドであるとはお互いが友達申請状態にあることか
  • プラットフォーム上のユーザー数の合計、1日のアクティブユーザー数
  • ユーザーあたりの平均つながり数
  • ほとんどのユーザーのソーシャルグラフは安定していて、短期間では大きく変わらない

要件整理

  • LinkedInのようなPYMKの構築
  • 入力はユーザー情報、出力は見込みのあるつながりのリスト
  • システムのモチベーションはユーザーに新しいつながりを見つけてもらい、ネットワークを拡大してもらうこと
  • 10億ユーザーが存在し、ユーザーは平均1000のつながりを持つ

Frame the problem as an ML Task

Defining the ML objective

  • ユーザー間のつながり数を最大化する

Specifying the system’s input and output

  • 入力 : ユーザー情報
  • 出力 : ユーザーに関連するつながりのリスト

Choosing the right ML category

  • Pointwise LTR
    • PYMKをランキング問題と捉える
    • Graph構造の予測タスク
      • Graph-level prediction
      • Node-level prediction
      • Edge-level prediction
  • Edge prediction
    • グラフ情報を扱うモデル
    • 2つのNode間にEdgeが存在する確率を予測

Data Preparation

Data engineering

  • 使用可能データ
    • User
    • Connections
    • Interactions

Feature engineering

  • User features
    • デモグラ情報
    • つながり数、フォロワー数、フォロー数、つながりリクエスト数
    • アカウント年齢
    • 受け取った反応数
  • User-user affinities
    • 学歴・職歴の親和性
    • 社会的な親和性

Model Deployment

  • Model selection
    • Graph Neural Network (GNN)
      • グラフを入力として扱える
      • Node embeddingを作成しNodeを数値表現
      • 2つのNode間のつながりは内積計算による類似性の測定によって表現
      • GNN-based architectures
        • GCN, GraphSAGEm GAT, GIT
  • Model training
    • Constructing the dataset
      1. 時刻tのグラフのスナップショットを作成
      2. グラフの初期ノード特徴、エッジ特徴を作成
      3. ラベリング

Evaluation

  • Offline metrics
  • Online metrics
    • 過去X日で送ったつながりリクエスト数
    • 過去X日で許可したつながりリクエスト数

Serving

  • Efficiency
    • 10億ユーザー全てを候補とすると計算量が膨大となるため、候補を絞る必要がある
    • Utilizing FoF
      • 推薦候補をフレンドのフレンドに限定する
    • Pre-compute PYMK
      • Online prediction
        • ホームページを更新した際にリアルタイムで見込みのあるつながりを計算する
      • Batch prediction
        • 事前に予測値を計算しDBに格納しておく
  • ML system design
    • PYMK generation pipeline
      • 全ユーザーのPYMKを生成し、DBに格納しておく
    • Prediction pipeline

Other Talking Points

  • Personalized random walkがレコメンドシステムの別候補として有効
  • 頻繁にログインするユーザーがレコメンドに現れやすいバイアスが存在する
  • ユーザーがレコメンドを無視した際のフィードバックをシステムに導入する
  • 遅れフィードバックの扱い