kikumotoのメモ帳

インフラ・ミドル周りを中心に、興味をもったことを適当な感じで。twitter : @takakiku

Dify Community版で会話ログへのアクセスを制限する

本記事は Qiita Dify Advent Calendar 2024 の 15日目の記事です。

前置き

会社で Dify Communitiy版(セルフホスト環境)を AWS で動かしています。
構成は Dify on AWS with CDK に限りなく近いですが

  • Terraform で構築
  • ElastiCache Valkey を利用(Redis から移行しました)

が大きな違いでしょうか。
あとは、社員限定アクセスとするため ALB で Google 認証いれてます。

と、こんな感じの環境で社内で活発に利用され始めました。
みなさんにどんどんアプリを作ってもらいたいので基本的に エディター 権限を付与しています。

そうすると、一部の部署から(若干)機密性のある資料をナレッジにいれて利用したいという要望がありました。
Dify上で、ナレッジへのアクセスはナレッジ単位で制御できる設定はあるので良いのですが、会話ログも他の人に見られたくないとのこと。
現状の Dify では エディター 権限あれば、他の人が作ったアプリの会話ログも見えてしまいます。

この記事では、この会話ログへのアクセスを

  • Difyのソースは変更せずに、オーナー・管理者・アプリ作成者に制限する仕組み

について書きます。
なお、一旦画面を使ってのアクセスのみについて対処するものです。

事前注意事項

本記事での方法は v0.13.2 の時点で動作できているものですが、将来にわたりその動作が保証されるものではありません。
また、本記事を参考に同等なものを作成し、それによって生じるいかなる結果も当方では責任はとれません。

仕組み

おおまかな仕組みは以下の通りです。

  • API の前段にリーバースプロクシとして Nginx(実際は OpenResty)を配置
  • /console/api/apps/<uuid>/chat-conversations へのアクセス時に Lua を動かす
  • Lua でDBにアクセスし、閲覧して良い人かどうかを判断する
  • OKなら API コンテナにリバースプロクシする
  • NGなら 403 Forbidden にする

構成をイメージ化すると以下のような感じです

以下で、もう少し詳しく書いていきます。

リバースプロクシ

今回、Lua を利用するのでOpenRestyコンテナを利用しています。 そこで、以下のような設定をしてあります。

    location ~ /console/api/apps/(?<app_id>[^/]+)/chat-conversations {
      access_by_lua_file '/usr/local/openresty/lua-scripts/auth.lua';

      proxy_pass http://localhost:5001;
      include proxy.conf;
    }

localhost:5001 が API コンテナのアクセス先になります。
ECSにおいて API コンテナと OpenResty コンテナを同一タスク内で定義しているので、このようなアクセス方法となっています。

また、ログ&アナウンス は

  • /app/<uuid>/logs

というパスですが、その中の処理で実際の会話ログが取得されるようになっており、その際のアクセス先が

  • /console/api/apps/<uuid>/chat-conversations

というようなものなので、そこへのアクセスをきっかけに Lua が実行されるようになっています

Lua

Lua でやっていることはおおよそ以下の通りです(ソース公開は控えさせていただきますが以下を愚直に実装しているだけです)

  • Authorization ヘッダーをデコードして、user_id を得る。
  • リクエストURIに含まれるアプリケーションIDをキーにDBに問い合わせ、アプリケーションの作者の id を得る。
    • アプリケーション作者の場合は 許可 となる
  • user_id をキーにしてDBに問い合わせ、そのユーザーのロールを取得する。
    • ロールが owner or admin であれば、許可 となる
  • 上記に該当しない場合は 403 Forbidden となる

これらのポイントとなる情報を後は説明して終わりにします

user_id の取得

エンドポイントへのアクセス時に、

Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNDRmNTVlOTUtMDA3OC00YTJhLWIzMjMtYWQzNDZmNDYzMjkyIiwiZXhwIjoxNzMwMzQ2MzkzLCJpc3MiOiJTRUxGX0hPU1RFRCIsInN1YiI6IkNvbnNvbGUgQVBJIFBhc3Nwb3J0In0.ymnvkzc96HnWX26CZ7i3vCAl9h8knLZNmTB6lMwGro0

のようなリクエストヘッダーが付与されます。

このトークンは https://github.com/langgenius/dify/blob/0.13.2/api/extensions/ext_login.py#L23-L37 にあるように

    decoded = PassportService().verify(auth_token)
    user_id = decoded.get("user_id")

にてデコードされて user_id を取得できるものです

そしてデコード自体は https://github.com/langgenius/dify/blob/0.13.2/api/libs/passport.py#L16 にあるように

jwt.decode(token, self.sk, algorithms=["HS256"])

JWTライブラリを使ってデコードされています

self.sk は環境変数で与えている SECRET_KEY なので、同等なデコードの処理を Lua で行うことにより user-id を取得できるということです

アプリケーション作成者

アプリケーションの情報は apps テーブルにあります

                                            Table "public.apps"
         Column          |            Type             | Collation | Nullable |           Default
-------------------------+-----------------------------+-----------+----------+-----------------------------
 id                      | uuid                        |           | not null | uuid_generate_v4()
 tenant_id               | uuid                        |           | not null |
 name                    | character varying(255)      |           | not null |
 mode                    | character varying(255)      |           | not null |
 icon                    | character varying(255)      |           |          |
 icon_background         | character varying(255)      |           |          |
 app_model_config_id     | uuid                        |           |          |
 status                  | character varying(255)      |           | not null | 'normal'::character varying
 enable_site             | boolean                     |           | not null |
 enable_api              | boolean                     |           | not null |
 api_rpm                 | integer                     |           | not null | 0
 api_rph                 | integer                     |           | not null | 0
 is_demo                 | boolean                     |           | not null | false
 is_public               | boolean                     |           | not null | false
 created_at              | timestamp without time zone |           | not null | CURRENT_TIMESTAMP(0)
 updated_at              | timestamp without time zone |           | not null | CURRENT_TIMESTAMP(0)
 is_universal            | boolean                     |           | not null | false
 workflow_id             | uuid                        |           |          |
 description             | text                        |           | not null | ''::character varying
 tracing                 | text                        |           |          |
 max_active_requests     | integer                     |           |          |
 icon_type               | character varying(255)      |           |          |
 created_by              | uuid                        |           |          |
 updated_by              | uuid                        |           |          |
 use_icon_as_answer_icon | boolean                     |           | not null | false
Indexes:
    "app_pkey" PRIMARY KEY, btree (id)
    "app_tenant_id_idx" btree (tenant_id)
Referenced by:
    TABLE "tool_published_apps" CONSTRAINT "tool_published_apps_app_id_fkey" FOREIGN KEY (app_id) REFERENCES apps(id)

URLのpathに含まれる UUID がこのテーブルの id の値なので、そこから create_by カラムを見れば作者の user_id がわかります。 そして、認証情報に含まれる user_id と比較すれば、アクセスしている人がアプリケーションの作者かどうか判断できるということです。

オーナー、管理者

ロールは tenant_account_joins で設定されているようです

                              Table "public.tenant_account_joins"
   Column   |            Type             | Collation | Nullable |           Default
------------+-----------------------------+-----------+----------+-----------------------------
 id         | uuid                        |           | not null | uuid_generate_v4()
 tenant_id  | uuid                        |           | not null |
 account_id | uuid                        |           | not null |
 role       | character varying(16)       |           | not null | 'normal'::character varying
 invited_by | uuid                        |           |          |
 created_at | timestamp without time zone |           | not null | CURRENT_TIMESTAMP(0)
 updated_at | timestamp without time zone |           | not null | CURRENT_TIMESTAMP(0)
 current    | boolean                     |           | not null | false
Indexes:
    "tenant_account_join_pkey" PRIMARY KEY, btree (id)
    "tenant_account_join_account_id_idx" btree (account_id)
    "tenant_account_join_tenant_id_idx" btree (tenant_id)
    "unique_tenant_account_join" UNIQUE CONSTRAINT, btree (tenant_id, account_id)

account_id が user_id なので、これからアクセスしている人のロールを取得できます

まとめ

Dify Community版において、Dify本体を修正せずに、会話ログの閲覧を制限する仕組みを作ってみました

  • OpenResty + Lua を採用
  • リクエストに含まれる認証情報をもとに Dify DB に問い合わせて、閲覧権限の許可を判断する

という仕組みでした

以上、最後までご覧いただきありがとうございました

AWS Fargateで稼働するAtlantisから、GCPリソースを構築する - 修正版

google/internal/externalaccount: Adding metadata verification · golang/oauth2@ec4a9b2 · GitHub

の変更により、以下の仕組みは hashicorp/google バージョン v4.59.0 移行では動作しなくなりました。


AWS FargateでAtlantisを動かして、AWSリソースに対するTerraformのPlanのレビューやApplyをPR上で行なっていたのですが、GCPリソースに対しても必要になってきたので、その時に行った作業のメモ。

なお、AWS Fargateで稼働するAtlantisから、GCPリソースを構築する - 失敗編 - kikumotoのメモ帳 で一度環境を作ったのですが、やり方がまずかったので改めて取り組んだメモとなります。
こちらの記事だけで話しが完結するように、一部失敗編と同じ内容の記載はあります。

前提

前提としては、GCPの鍵ファイルを生成せずにやりたいというのがあります。
鍵ファイルの管理・ローテーションは煩わしいし、穴になる可能性もあるので。

タスクロール

Fargate上のタスクとしてAtlantis を動かしており、すでにタスクロールがあるのでこのタスクロールを利用します。

問題点と解決方法の概要

問題点

以下のGCP側の手順の最後で、構成ファイルをダウンロードするのですが、このファイルに関連する問題があります。

なお、構成ファイルは以下のようなものです。

{
  "type": "external_account",
  "audience": "//iam.googleapis.com/projects/123456789012/locations/global/workloadIdentityPools/atlantis/providers/xxxxxxxx",
  "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
  "service_account_impersonation_url": "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:generateAccessToken",
  "token_url": "https://sts.googleapis.com/v1/token",
  "credential_source": {
    "environment_id": "aws1",
    "region_url": "http://169.254.169.254/latest/meta-data/placement/availability-zone",
    "url": "http://169.254.169.254/latest/meta-data/iam/security-credentials",
    "regional_cred_verification_url": "https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
  }
}
問題1:シークレットをファイルとして扱えない

環境変数 GOOGLE_APPLICATION_CREDENTIALS に構成ファイルのパスを指定して利用します。
Google Cloud 上では、Google Cloud Secret Manager に登録したシークレットを、Cloud Run などでファイルパスとしてマウントできるのですが、AWSの Secrets Manager や Systems Manager Parameter Store に登録したシークレットをECS タスクにファイルパスとして提供する方法がありません。

構成ファイルは鍵ファイルほど秘匿性が高くないとは思っているのですが、それでもあまり漏らしたくもないので、シークレットとして扱いたいです。
なので、シークレットとして扱いつつ、ファイルとして扱う方法を考える必要があります。

問題2;認証構成ファイルのメタデータURLがEC2用

構成ファイルの credential_source -> url ですが、これは EC2 用のメターデータURLです。 一方でECSの場合は、

にあるように

なものとなります。
また、戻ってくる値も異なります。
このECS用のメタデータURLを、Terraformが(Terraform が利用しているGCPライブラリが)正しく処理できないという点が大きな問題です。

解決方針

問題1

これは、 qiita.com

を参考にさせていただきました。

  • シークレットにファイルの内容を保存
  • 該当シークレットを環境変数に指定(コンテナ定義)
  • 環境変数の内容をファイルに書き出す。

という感じです。

このために、カスタムな Atlantis イメージを作成します。

問題2

上記のEC2メタデータURLにアクセスすると、ロール名が返ってきます。続いて、メタデータURLに返ってきたロール名を付け加えて(例;http://169.254.169.254/latest/meta-data/iam/security-credentials/sampleEC2Role)アクセスすると、以下のような構造のJSONが得られます。

{
    "Code": "Success",
    "LastUpdated" : "<LAST_UPDATED_DATE>",
    "Type" : "AWS-HMAC",
    "AccessKeyId": "<ACCESS_KEY_ID>",
    "SecretAccessKey": "<SECRET_ACCESS_KEY>",
    "Token": "<SECURITY_TOKEN_STRING>",
    "Expiration": "<EXPIRATION_DATE>"
}

一方で、ECSメタデータURLにアクセスすると、その時点で以下のような構造のJSONが得られます。

{
    "AccessKeyId": "<ACCESS_KEY_ID>",
    "Expiration": "<EXPIRATION_DATE>",
    "RoleArn": "<TASK_ROLE_ARN>",
    "SecretAccessKey": "<SECRET_ACCESS_KEY>",
    "Token": "<SECURITY_TOKEN_STRING>"
}

ECSメタデータURLの情報からEC2メタデータURLが返すデータは生成ができそうなので、この変換をするWebアプリを、Atlantisと同じタスク内に別コンテナとして用意することにしました。
このWebアプリのエンドポイントを構成ファイルの credential_source -> url に指定するようにします。

対応作業

GCP(Google Cloud)側

GCP側では、サービスアカウント、Workload Identity Pool の作成を行なっていきます。 ドキュメントとしては「Workload Identity 連携の構成」のところ。

必要なリソースをTerraformで書くと以下のような感じに。 aws_account_id、aws_task_role_name は適宜、自分の環境のものを設定する感じです。

locals {
  aws_account_id     = "000000000000"
  aws_task_role_name = "ecs_task_role"
}

####
# Atlantis が利用するサービスアカウント

resource "google_service_account" "atlantis" {
  account_id  = "atlantis"
  description = "Service Account for Atlantis (Managed by Terraform)"
}

resource "google_project_iam_member" "atlantis" {
  project = var.gcp_project
  role    = "roles/editor"
  member  = "serviceAccount:${google_service_account.atlantis.email}"
}

####
# AWS 連携のための Workload Identity

resource "google_iam_workload_identity_pool" "atlantis" {
  workload_identity_pool_id = "atlantis"
  display_name              = "atlantis"
  description               = "Pool for Atlantis (Managed by Terraform)"
}

resource "google_iam_workload_identity_pool_provider" "atlantis" {
  workload_identity_pool_provider_id = "aws-atlantis"
  workload_identity_pool_id          = google_iam_workload_identity_pool.atlantis.workload_identity_pool_id

  aws {
    account_id = local.aws_account_id
  }

  attribute_mapping = {
    "google.subject"     = "assertion.arn"
    "attribute.aws_role" = "assertion.arn.contains('assumed-role') ? assertion.arn.extract('{account_arn}assumed-role/') + 'assumed-role/' + assertion.arn.extract('assumed-role/{role_name}/') : assertion.arn"
  }

  attribute_condition = "attribute.aws_role=='arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}'"
}

####
# Workload Identity とサービスアカウントの関連付け

resource "google_service_account_iam_binding" "atlantis" {
  service_account_id = google_service_account.atlantis.id
  role               = "roles/iam.workloadIdentityUser"

  members = [
    "principalSet://iam.googleapis.com/${google_iam_workload_identity_pool.atlantis.name}/attribute.aws_role/arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}"
  ]
}

これをapplyしたら、構成ファイルを取得します。 プール詳細画面の右側にあるペインで、接続済みサービスアカウント に行けば、ダウンロードできます。

AWS側

パラメータストアの準備

問題1の解決の流れと、問題2の解決に関連したファイル内容の修正となります。

私の場合は、パラメータストアに構成ファイルのデータを保存し、それを GOOGLE_APPLICATION_CREDENTIALS_DATA 環境変数で取得できるようにして、 GOOGLE_APPLICATION_CREDENTIALS 環境変数の指すファイルに保存しています。

保存するデータについては、

  • credential_source -> region_url は削除
  • credential_source -> url は、http://127.0.0.1:8080/latest/meta-data/iam/security-credentials というようにしています。同居コンテナのWebアプリはローカルホストとして参照できるので、合わせてそのWebアプリがListenしているポートを指定している感じです。
カスタム Atlantis イメージの作成

構成ファイルの環境変数からのファイル化をするために、カスタム Atlantis Dockerイメージを作成します。

こんな wrapper.sh

#!/usr/bin/dumb-init /bin/sh
set -e

echo $GOOGLE_APPLICATION_CREDENTIALS_DATA | jq . > $GOOGLE_APPLICATION_CREDENTIALS

を用意して、Docerfile が以下のような感じにしています。

FROM ghcr.io/runatlantis/atlantis:latest

RUN apk update && apk add jq

COPY wrapper.sh /usr/local/bin/wrapper.sh

ENTRYPOINT ["wrapper.sh"]
CMD ["server"]

なお、wrapper.sh のところで、jq を通して $GOOGLE_APPLICATION_CREDENTIALS に書き出していますが、これは必須ではなくて ECS Exec で入って調査するような時に読みやすいようにしておきたかった、ぐらいな感じです。

そしてDockerビルドしたイメージをECRレポジトリに登録しておきます。

メタデータ変換Webアプリ

github.com

にWebアプリの実装を置いています。

これをDockerビルドしイメージをECRレポジトリに登録しておきます。

タスク定義・コンテナ定義

もとのタスク定義・コンテナ定義を修正します。

{
    "containerDefinitions": [
        {
            "name": "atlantis",
            "image": "<作成したカスタムAtlantisイメージのレポジトリURL>",
            "environment": [
-- snip --
                {
                    "name": "GOOGLE_APPLICATION_CREDENTIALS",
                    "value": "/path/to/gcp_credentials.json"
                }
            ],
            "secrets": [
-- snip --
                {
                    "name": "GOOGLE_APPLICATION_CREDENTIALS_DATA",
                    "valueFrom": "<パラメータストアのパラメータ名>"
                }
            ],
-- snip --
        },
        {
            "name": "imitate-ec2-metadata-url",
            "image": "<作成したメタデータ変換WebアプリのレポジトリURL>",
            "environment": [
                {
                    "name": "PORT",
                    "value": "8080"
                }
            ],
-- snip --
        }
    ],
-- snip --
}

これでデプロイします。

以上で、PR上でAtlantisからGCPにapplyできるようになります。

1つのAtlantisで複数クラウドを管理したい方の参考になれば!

補足:複数のGCPプロジェクトを対象とする場合

複数のGCPプロジェクトを対象とする場合は、上記で作成したサービスアカウント(のメールアドレス)に対して、各プロジェクトのIAMで権限付与すればOKです。

AWS Fargateで稼働するAtlantisから、GCPリソースを構築する - 失敗編

AWS FargateでAtlantisを動かして、AWSリソースに対するTerraformのPlanのレビューやApplyをPR上で行なっていたのですが、GCPリソースに対しても必要になってきたので、その時に行った作業のメモ。

2022.12.23: 下記の方法は、起動直後は動くが、AWS_SESSION_TOKENの有効期限が切れると動かなくなる。 完全にうっかりしてました。 現在、他の方法について検証中。 リベンジ編は果たしてあるか!?

前提

前提としては、GCPの鍵ファイルを生成せずにやりたいというのがあります。 鍵ファイルの管理・ローテーションは煩わしいし、穴になる可能性もあるので。

タスクロール

Fargate上のタスクとしてAtlantis を動かしていおり、すでにタスクロールがあるのでこのタスクロールを利用します。

GCP(Google Cloud)側

GCP側では、サービスアカウント、Workload Identity Pool の作成を行なっていきます。 ドキュメントとしては「Workload Identity 連携の構成」のところ。

必要なリソースをTerraformで書くと以下のような感じに。 aws_account_id、aws_task_role_name は適宜、自分の環境のものを設定する感じです。

locals {
  aws_account_id     = "000000000000"
  aws_task_role_name = "ecs_task_role"
}

####
# Atlantis が利用するサービスアカウント

resource "google_service_account" "atlantis" {
  account_id  = "atlantis"
  description = "Service Account for Atlantis (Managed by Terraform)"
}

resource "google_project_iam_member" "atlantis" {
  project = var.gcp_project
  role    = "roles/editor"
  member  = "serviceAccount:${google_service_account.atlantis.email}"
}

####
# AWS 連携のための Workload Identity

resource "google_iam_workload_identity_pool" "atlantis" {
  workload_identity_pool_id = "atlantis"
  display_name              = "atlantis"
  description               = "Pool for Atlantis (Managed by Terraform)"
}

resource "google_iam_workload_identity_pool_provider" "atlantis" {
  workload_identity_pool_provider_id = "aws-atlantis"
  workload_identity_pool_id          = google_iam_workload_identity_pool.atlantis.workload_identity_pool_id

  aws {
    account_id = local.aws_account_id
  }

  attribute_mapping = {
    "google.subject"     = "assertion.arn"
    "attribute.aws_role" = "assertion.arn.contains('assumed-role') ? assertion.arn.extract('{account_arn}assumed-role/') + 'assumed-role/' + assertion.arn.extract('assumed-role/{role_name}/') : assertion.arn"
  }

  attribute_condition = "attribute.aws_role=='arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}'"
}

####
# Workload Identity とサービスアカウントの関連付け

resource "google_service_account_iam_binding" "atlantis" {
  service_account_id = google_service_account.atlantis.id
  role               = "roles/iam.workloadIdentityUser"

  members = [
    "principalSet://iam.googleapis.com/${google_iam_workload_identity_pool.atlantis.name}/attribute.aws_role/arn:aws:sts::${local.aws_account_id}:assumed-role/${local.aws_task_role_name}"
  ]
}

これをapplyしたら、構成ファイルを取得します。 プール詳細画面の右側にあるペインで、接続済みサービスアカウント に行けば、ダウンロードできます。

AWS 側

パラメータストアの準備

構成ファイルの場所は、環境変数 GOOGLE_APPLICATION_CREDENTIALS で指定するのですが、GCPのシークレットのようにファイルとしてみせるような設定ができないので、

qiita.com

のやり方を真似る感じにします。

私の場合は、パラメータストアに構成ファイルのデータを保存し、それを GOOGLE_APPLICATION_CREDENTIALS_DATA 環境変数で取得できるようにして、 GOOGLE_APPLICATION_CREDENTIALS 環境変数の指すファイルに保存しています。

なお、保存するデータですが、

blog.studysapuri.jp

にあるように、メタデータURLがECSタスクの場合は異なるので、credential_source の url, region_url を消して登録しています。

カスタム Atlantis イメージの作成

構成ファイルの環境変数からのファイル化と、ECSタスクのメタデータURLからAWSクレデンシャルを取得するために、カスタム Atlantis Dockerイメージを作成します

こんな wrapper.sh

#!/usr/bin/dumb-init /bin/sh
set -e

echo $GOOGLE_APPLICATION_CREDENTIALS_DATA | jq . > $GOOGLE_APPLICATION_CREDENTIALS

token_file=$(mktemp)
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
curl -sS "http://169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI" > "$token_file"
AWS_ACCESS_KEY_ID=$(jq -r ".AccessKeyId" "$token_file")
AWS_SECRET_ACCESS_KEY=$(jq -r ".SecretAccessKey" "$token_file")
AWS_SESSION_TOKEN=$(jq -r ".Token" "$token_file")
rm "$token_file"

export AWS_ACCESS_KEY_ID
export AWS_SECRET_ACCESS_KEY
export AWS_SESSION_TOKEN

exec docker-entrypoint.sh "$@"

を用意して、Docerfile が以下のような感じにしています。

FROM ghcr.io/runatlantis/atlantis:latest

RUN apk update && apk add jq

COPY wrapper.sh /usr/local/bin/wrapper.sh

ENTRYPOINT ["wrapper.sh"]
CMD ["server"]

なお、wrapper.sh のところで、jq を通して $GOOGLE_APPLICATION_CREDENTIALS に書き出していますが、これは必須ではなくて、パラメータストアに保存したデータが改行情報を失っているので、どうせ jq 入れていることもあり綺麗にしておこう、くらいな感じです。

DockerビルドしたイメージをECRレポジトリに登録しておきます。

タスク定義の更新

あとは、タスク定義において * GOOGLE_APPLICATION_CREDENTIALS * GOOGLE_APPLICATION_CREDENTIALS_DATA

の環境変数を設定し(以下のような感じ)

イメージURLを登録したECRリポジトリのものに変更した上で、デプロイすればOK。

以上で、PR上でAtlantisからGCPにapplyできるようになります。

1つのAtlantisで複数クラウドを管理したい方の参考になれば!

補足:複数のGCPプロジェクトを対象とする場合

複数のGCPプロジェクトを対象とする場合は、上記で作成したサービスアカウント(のメールアドレス)に対して、各プロジェクトのIAMで権限付与すればOKです。

Terraformを使ってLooker Studio 用に Google Cloud サービス アカウントを設定する

基本的に、

support.google.com

の内容を Terraform で記述するとどうなるか、という記事です。

Looker Studio 用サービスアカウント

サービス アカウントを使ってデータにアクセスできるようにする、という点では以下のような Terraform になる。

resource "google_service_account" "looker-studio" {
  account_id  = "demo-lookerstudio"
  description = "Use for Looker Studio access to BigQuery (Managed by Terraform)"
}

resource "google_service_account_iam_binding" "looker-studio" {
  service_account_id = google_service_account.looker-studio.id
  role               = "roles/iam.serviceAccountTokenCreator"

  members = [
    "serviceAccount:service-XXX-888888888888@gcp-sa-datastudio.iam.gserviceaccount.com"
  ]
}

resource "google_project_iam_member" "looker-studio-bigquery-jobuser" {
  project = var.gcp_project
  role    = "roles/bigquery.jobUser"
  member  = "serviceAccount:${google_service_account.looker-studio.email}"
}

resource "google_bigquery_dataset_iam_member" "looker-studio" {
  dataset_id = "ZZZZZZ"
  role       = "roles/bigquery.dataViewer"
  member     = "serviceAccount:${google_service_account.looker-studio.email}"
}

serviceAccount:service-XXX-888888888888@gcp-sa-datastudio.iam.gserviceaccount.com は、Looker Studio サービスエージェントのメールアドレスとなります。 これは、本家ヘルプにもあるように Looker Studio サービス エージェントのヘルプページ で表示されるサービスエージェントのメールアドレスからコピーしてくるもの。

また、ここでは BigQuery のデータセット ZZZZZZ にアクセスできるように設定しています。最後の google_bigquery_dataset_iam_member リソースのところ。 ここは適宜、テーブルへのアクセスに絞るならそれに該当するように変更することになる。

ここまでの設定で、データソースのオーナーがそもそも強い権限を持っていれば、データの認証情報 を作成したサービスアカウントに変更可能である。 変更後は、そのデータソースを利用するレポートを開くと、BigQueryへのアクセスはサービスアカウントによって行われる。

ユーザロールを付与する

データソースのオーナーが一般ユーザ的な権限であれば、データの認証情報 をサービスアカウントに変更できない。

例えば

locals {
  test_user_email = "[email protected]"
}

resource "google_project_iam_member" "test-test-user" {
  project = var.gcp_project
  role    = "roles/bigquery.jobUser"
  member  = "user:${local.test_user_email}"
}

resource "google_bigquery_dataset_iam_member" "test-test-user" {
  dataset_id = "ZZZZZZ"
  role       = "roles/bigquery.dataViewer"
  member     = "user:${local.test_user_email}"
}

のようなユーザは、該当BigQueryのデータセット ZZZZZZ にアクセスできてクエリも実行できるので、これをデータソースとして設定できる。 データソースの画面の データの認証情報 でユーザ名(下記画面参照) をクリックして、表示されるダイアログで、サービスアカウント認証情報 に上記で作成したサービスアカウントのメールアドレス入力して更新を実行しようとしても、以下のようにエラーとなる。

該当ユーザがサービスアカウントを利用できるように以下の設定も追加する。

resource "google_service_account_iam_member" "test-test-user" {
  service_account_id = google_service_account.looker-studio.id
  role               = "roles/iam.serviceAccountUser"
  member             = "user:${local.test_user_email}"
}

この設定の追加後は、サービスアカウントのメールアドレスを入力して更新 すると成功する。 これで、これ以降はこのデータソースへのアクセスはサービスアカウントによって行われる。

めでたしめでたし!

Slack Bolt for Python でワークフローステップ実行でLazyリスナー関数を利用する方法

Slack Bolt for Pythonを利用して、ワークフローステップを提供するカスタムアプリを AWS Lambda 上で動かす時にはこう書くと良いよ、という記事です。

試していったこと

Step0:ローカルでSocket Modeで試す。

全体のコードは割愛しますが、Socket Modeの場合は以下のようなコードで特に問題は起きません。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)


# WorkflowStep 定義

def edit(ack, step, configure, logger):
    ack()
    logger.info(step)

    blocks = []
    configure(blocks=blocks)

def save(ack, body, view, update, logger):
    ack()
    logger.info(body)

    update(inputs={}, outputs=[])

def execute(body, client, step, complete, fail, logger):
    logger.info(body)
    complete(outputs={})


# WorkflowStep 登録

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=execute,
)
app.step(ws)

Step1: AWS Lambda で動かす - process_before_response=True

ローカルで動いたので、では Lambda で動かしましょうということで、process_before_response は True にしてデプロイします。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=True,
)

もちろん、slack_bolt.adapter.aws_lambda.SlackRequestHandler を使うようにしていますが、上記以外の箇所のコードに変更はありません。

実行すると(executeが呼ばれる)、complete の呼出結果として

{
    "ok": false,
    "error": "trigger_exchanged"
}

が返って来ているログが出ていました。

エラーの内容は https://api.slack.com/methods/workflows.stepCompleted にあるように

Error returned when the provided workflow_step_execute_id has already been used.

ということとで、workflow_step_execute_id はもう使用済み、という感じのようです。

よくよくログ見ると、

INFO:slack_bolt.workflows.step.step:execute
INFO:slack_bolt.workflows.step.step:execute
ERROR:slack_bolt.App:Failed

みたいな流れで、最初の execute では complete は成功しているようですが("ok": true になっている)、再度リクエストが来たためにこの状況になっているようです。 なお、ERRORになっているので、何度か同じリクエストがやってきます。

Step2:AWS Lambda で動かす - process_before_response=False

ちょっと、よくわからないけれど、Socket Mode の時には process_before_response=False であったので、これで試してみます。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)

def execute(body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")

    complete(outputs={})

sleep とログで、なんらかの処理を模倣しておきます。

これをデプロイして実行すると、

logger.info("sleeped")

に該当するログは出ません。

一方で

DEBUG:slack_bolt.App:Responding with status: 200 body: ""

というログは出ているので、ack は行われている模様。

結局、https://slack.dev/bolt-python/ja-jp/concepts#lazy-listeners で書かれているように、HTTP レスポンスを返したあとにスレッドやプロセスの実行を続けることができない 状況になっている感じに見える。

明示的に ack していないから、このあたりでフレームワークが ack している雰囲気を感じる。

Step3: AWS Lambda で動かす - 明示的に ack してみる

じゃあ、ack を complete の後に呼び出して見ようと。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=False,
)

def execute(ack, body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")

    complete(outputs={})

    ack()

こんな感じですね。

が、結果は先ほど同じで、sleeped のログは出ない。

https://slack.dev/bolt-python/ja-jp/concepts#executing-steps のサンプル見ても、execute で ack してないから、フレームワーク側でよしなにしているのだろうなぁという想像もできる。

Lazyリスナー関数を利用するには??

ここまでの状況から、

  • ack ã‚’ complete 前に呼び出しておくと良さそう(Socket Modeの挙動と同じになるはず)
  • Lambda だと単純に ack を先に呼び出すと, 実際の処理ができない
  • なので、Lazyリスナー関数を利用したい(実態の遅延呼び出しをしたい)

という感じに。

が、ぱっと見、WorkflowStep の execute に渡す関数を、遅延呼び出しする方法がない。 ということで、ここにきてようやくフレームワークのソースを検索。。。

WorkflowStep は https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L302 で定義されている。
ここ見ていくと、execute に渡した値は https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L338-L344 のように

        self.execute = self.build_listener(
            callback_id=callback_id,
            app_name=app_name,
            listener_or_functions=execute,
            name="execute",
            base_logger=base_logger,
        )

build_listener 関数に渡されている。この時に listener_or_functions と複数になっているのに気づく。

さらに見ていく。 listener_or_functions が List の場合は、https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L372 の分岐に入っていく。
そこから、execute には関数の配列を渡せることがわかった。

さらに https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L392

            ack_function = functions.pop(0)

リストの先頭は、ack 関数になり、https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/workflows/step/step.py#L398

                lazy_functions=functions,

残りの関数が Lazyリスナー関数になるっぽく見えることがわかった。

ちなみに、その次の行

                auto_acknowledgement=name == "execute",

というのがあり、これから execute の場合は ack は勝手に実行されるんだなということも見えてきた。 この auto_acknowledgement については https://github.com/slackapi/bolt-python/blob/v1.15.3/slack_bolt/listener/thread_runner.py#L48 を見ていくと良さそう。

Step4:Lazyリスナー関数対応

ということで、以下のようなコードにする。

app = App(
    token=token,
    signing_secret=signing_secret,
    process_before_response=True,
)

def execute(body, client, step, complete, fail, logger):
    logger.info(body)

    time.sleep(1)
    logger.info("sleeped")
    complete(outputs={})

def _ack(ack, logger):
    logger.info("ack")
    ack()

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=[_ack, execute],
)

これで、trigger_exchanged のエラーも発生せず、期待通りに動いた。

サンプルコード

上記の方法を取り入れたサンプルを以下に置きました。 github.com

サンプルでは、ベタに書いていくよりは整理がつくので、ワークフローステップを別ファイルにクラス化し分離しています。

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py

class CustomWorkflowStep:

    def register(self, app, callback_id):
        ws = WorkflowStep(
            callback_id=callback_id,
            edit=self.edit,
            save=self.save,
            execute=[self.ack, self.execute],
        )
        app.step(ws)

本筋ではないですが、サンプルコードについて少し解説

このワークフローステップは、設定したメッセージを postEphemeral で投稿するものです。
Slack の Workflow は、入力値を保持してくれるので、

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py#L32

"initial_value": step.get("inputs", {}).get("message", {}).get("value", ""),

な感じで書いておくと、最初の設定時はデフォルト値(ここでは空文字)、以降は入力データが表示されるようになります。

設定したメッセージを mrkdwn 表示するには

https://github.com/kikumoto/bolt-workflowstep-sample/blob/main/app/workflowstep.py#L103

"text": html.unescape(inputs["message"]["value"]),

のように html unescape する必要があります。

まとめ

Slack Bolt for Pythonを利用して、ワークフローステップを提供するカスタムアプリを AWS Lambda 上で動かす時には、execute では Lazy リスナー関数を利用する必要があります。

ws = WorkflowStep(
    callback_id="sample-step",
    edit=edit,
    save=save,
    execute=[_ack, execute],
)

のように、execute に ack を実行するための関数と、実処理をする関数を List にして渡せばOKです。

詳しくは https://github.com/kikumoto/bolt-workflowstep-sample を参考にしてもらえると良いです。

以上。

Ractorに入門してみた - Consumers/Producer はこんな感じ?編 -

以下での背景と失敗を踏まえて、元々やりたいことを書いてみた、という内容の記事になります。

kikumoto.hatenablog.com

やりたい事

  • ある処理をするためのデータが多数ある
  • それを並列に処理したい。
  • 並列数は固定でOK。
  • 結果は随時処理したいが、処理つの都合で1カ所でやりたい

みたいな感じです。

github.com

の Worker pool のサンプルだと

(1..N).each{|i|
  pipe << i
}

で、全てのデータを渡してから、最後に結果をごそっと受け取るという感じで微妙にニーズと一致しませんでした。(結果を随時受け取っていきたい)

実装

そこで、実際のデータとか実処理の内容はのぞいて、並列処理部分はこんな感じ?というのを書いてみました。

def main()
  # 同時実行数
  c = 2

  producer = Ractor.new Ractor.current, c do |parent, c|
    puts "start producer"

    get_data.each do |d|
      Ractor.yield Ractor.make_shareable(d, copy: true)
      # sleep 1
    end

    # consumer に終了通知
    c.times do
      Ractor.yield :term
    end

    parent.send :producer_finished
  end

  consumers = (1..c).map do |i|
    Ractor.new producer, i do |producer, i|
      puts "start consumer #{i}"

      loop do
        d = producer.take
        break if d == :term

        puts "consumer_#{i}: #{d['id']}"
        Ractor.yield d['value']
      end

      # main Ractor への終了報告
      Ractor.yield :consumer_finished
    end
  end

  # consumer からの結果受け取り&終了待ち
  until consumers.empty?
    r, obj = Ractor.select(*consumers)
    if obj == :consumer_finished
      consumers.delete r
      next
    end
    puts "message: #{obj}"
  end

  puts "wait producer"
  Ractor.receive
end

def get_data
  [
    { 'id' => 1, 'value' => 'aaa'},
    { 'id' => 2, 'value' => 'bbb'},
    { 'id' => 3, 'value' => 'ccc'},
    { 'id' => 4, 'value' => 'ddd'},
  ]
end

main

一応、これで期待通りっぽく動いているのだけど、果たして。。。
詳しい人に教えて欲しい〜。

Ractorに入門してみた - 基本的なところでハマった編 -

背景

ruby で書き始めたちょっとしたツールで、処理する対象量の都合で並列実行したいな、となり、できればやはり複数 CPU 使いたいなということで、Ruby って Process 以外で今って何かできるんだっけと見たら Ractor というのがあるんですね。

Ractor 自体は

techlife.cookpad.com

などなど見てもらうのが良いと思います。なんせ、まだ私はわからないことだらけなので。

で experimental ということですが、作ろとしているツールは1回ぽっきり的なので、それなら勉強がてら(最近この手のもので新しいことを使う機会が少ないこともあり)使ってみようと思い手を付けてみました。

この記事は、やってみて、適当な理解なせいでしばらくハマった内容を書いたものです。 ちゃんと考えればわかるでしょ、的な自戒な意味を込めた記事。

なお、rubyのバージョンは以下。

$ ruby --version
ruby 3.1.0p0 (2021-12-25 revision fb4df44d16) [x86_64-darwin19]

間違い実装

Consumers - Producer 的な感じなものを実装したくて、プロタイピング的になんとなくで書いてみました。(dame.rb とします)

def main
  producer = Ractor.new do
    get_data.each do |d|
      Ractor.yield Ractor.make_shareable(d, copy: true)
    end
    Ractor.yield :term
  end

  consumer = Ractor.new producer do |producer|
    loop do
      d = producer.take
      break if d == :term
      puts "consumer: #{d['value']}"
    end
  end

  # producer 完了待ち
  producer.take

  # consumer 完了待ち
  consumer.take
end

def get_data
  [
    { 'id' => 1, 'value' => 'aaa'},
    { 'id' => 2, 'value' => 'bbb'},
    { 'id' => 3, 'value' => 'ccc'},
    { 'id' => 4, 'value' => 'ddd'},
  ]
end

main
  • producer からデータ流して、
  • consumer で受け取って何か処理
  • 後は終了待ち

みたいなやつです。

これ実行すると

$ ruby dame.rb
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
consumer: bbb
consumer: ccc
consumer: ddd

みたいな感じで、一個データが consumer に渡っていなんですよ。 これにしばらく悩みました。。。

原因は

  # producer 完了待ち
  producer.take

これ。

当然ですよね。。。
producer.take するってことは producer が Ractor.yield したやつを受け取るので、ここに1つ producer から流されたデータが渡ってきていた、というだけ。

終了待ちは take だからという適当な理解で書いてしまった、というオチです。

動くようにしたもの

動くようにするには、この実装だと最後の待ちの順番を入れ替えるだけ。

  # consumer 完了待ち
  consumer.take

  # producer 完了待ち
  producer.take

これだと期待通り。

$ ruby ok.rb
<internal:ractor>:267: warning: Ractor is experimental, and the behavior may change in future versions of Ruby! Also there are many implementation issues.
consumer: aaa
consumer: bbb
consumer: ccc
consumer: ddd

さて、動いていはいるけど、これで本当に良いのか全く自信なし。

:term みたいなメッセージ投げるのが正しいのだろうか、というのも。

make_shareable のところはこの記事的なところだと copy: true 必要はないけど、今後予定のことを考えてそうしてはいます。

まだまだ理解に乏しいけど、実装しながら学んでいくことにします!!
(初めから〇〇言語で書けば、というのはあるが...それは置いとく)