Azure Database for PostgreSQLでCDCを試してみる

最近開発しているサービスがだんだん成長してきて、先々を考えるといくつかのサービスに分離したいなーと思いChange Data Capture (CDC)について色々と調べていました。

MySQLでの構築については、この記事DebeziumでCDCを構築してみたがとても丁寧に解説されているのでお薦めです。この記事の解説を参考にしてMySQL+Kafka+Debeziumで動作してお試しできる環境ができたので、色々と挙動を確認できました。

PostgreSQLでCDC

MySQLでの実験環境は簡単に構築できたのですが、今回導入を検討しているサービスではPostgreSQLを使用しています。 ということで、まずは手元でPostgreSQL + Kafka + DebeziumでCDC環境を構築してみます。

Kafkaの構築

こちらは前出のブログの記載とほぼ同じで、Docker hubにある公式イメージから構築します。

version: "3.8"
services:
  pg-debezium-zookeeper:
    container_name: pg-debezium-zookeeper
    image: "bitnami/zookeeper:latest"
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  pg-debezium-kafka:
    container_name: pg-debezium-kafka
    image: "bitnami/kafka:latest"
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=pg-debezium-zookeeper:2181
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://pg-debezium-kafka:9092,PLAINTEXT_HOST://127.0.0.1:29092
    depends_on:
      - pg-debezium-zookeeper

PostgreSQLの構築

MySQLでの構築と同様に、PostgreSQLを利用する場合にも設定を変更する必要があります。 デフォルトでは、wal_level = REPLICATION になっているのですが、これを wal_level = LOGICAL に変更しより詳細なログを出力する必要があります。LOGICALに設定変更するとログのサイズが増えるようなので注意が必要かもしれません。

今回は設定ファイルを変更せずにコンテナ起動時のコマンドで設定を変更することにします。 docker-compose.ymlはこんな感じ

version: "3.8"
services:
... 省略
  pg-debezium-postgres:
    container_name: pg-debezium-postgres
    image: postgres:14.7-alpine
    command: [ "postgres", "-c", "wal_level=logical" ]
    volumes:
      - pg-debezium-postgres-data:/var/lib/postgresql/data:cached
    environment:
      POSTGRES_PASSWORD: "Passw0rd"
      POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --locale=ja_JP.UTF-8"
      POSTGRES_USER: "test"
      POSTGRES_DB: test
    healthcheck:
      test: [ "CMD-SHELL", "pg_isready" ]

テーブルの作成

テストに使用するテーブルを作っておきます

$ docker exec -it pg-debezium-postgres psql -U test test \
    -c "CREATE TABLE test (id SERIAL PRIMARY KEY, subject text NOT NULL, created timestamptz NOT NULL);"

Debeziumの構築

Debeziumも、公式のイメージがあるのでそれを利用して、上記のKafkaと接続するように設定します。

version: "3.8"
services:
... 省略
  pg-debezium:
    container_name: pg-debezium
    image: "debezium/connect:2.0"
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=pg-debezium-kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=_kafka_connect_configs
      - OFFSET_STORAGE_TOPIC=_kafka_connect_offsets
      - STATUS_STORAGE_TOPIC=_kafka_connect_statuses
    depends_on:
      - pg-debezium-zookeeper
      - pg-debezium-kafka
      - pg-debezium-postgres

最終的なdocker-compose.yml

これまでの解説分を全て含んだdocker-compose.ymlはこんな感じです

version: "3.8"
services:
  pg-debezium-zookeeper:
    container_name: pg-debezium-zookeeper
    image: "bitnami/zookeeper:latest"
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  pg-debezium-kafka:
    container_name: pg-debezium-kafka
    image: "bitnami/kafka:latest"
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=pg-debezium-zookeeper:2181
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,PLAINTEXT_HOST://:29092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://pg-debezium-kafka:9092,PLAINTEXT_HOST://127.0.0.1:29092
    depends_on:
      - pg-debezium-zookeeper

  pg-debezium-postgres:
    container_name: pg-debezium-postgres
    image: postgres:14.7-alpine
    command: [ "postgres", "-c", "wal_level=logical" ]
    volumes:
      - pg-debezium-postgres-data:/var/lib/postgresql/data:cached
    environment:
      POSTGRES_PASSWORD: "Passw0rd"
      POSTGRES_INITDB_ARGS: "--encoding=UTF-8 --locale=ja_JP.UTF-8"
      POSTGRES_USER: "test"
      POSTGRES_DB: test
    healthcheck:
      test: [ "CMD-SHELL", "pg_isready" ]
      
  pg-debezium:
    container_name: pg-debezium
    image: "debezium/connect:2.0"
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=pg-debezium-kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=_kafka_connect_configs
      - OFFSET_STORAGE_TOPIC=_kafka_connect_offsets
      - STATUS_STORAGE_TOPIC=_kafka_connect_statuses
    depends_on:
      - pg-debezium-zookeeper
      - pg-debezium-kafka
      - pg-debezium-postgres

networks:
  internal:
    driver: bridge
    internal: true
  external:
    driver: bridge
    internal: false
    name: pg_debezium_external_network

volumes:
  pg-debezium-postgres-data:

こんな感じで起動してみます

$ docker compose up -d
[+] Running 4/4
 ⠿ Container pg-debezium-postgres   Started          0.4s
 ⠿ Container pg-debezium-zookeeper  Started          0.4s
 ⠿ Container pg-debezium-kafka      Started          0.6s
 ⠿ Container pg-debezium            Started          0.9s

これで、PostgreSQL + Kafka + Debezium でのCDCの基盤が動作している状態になります。

CDCの構築

基盤の構築が終わったので、Debeziumコンテナ内にコネクタを作成します。

以下が今回の調査で作成した設定内容です。 sample.json という名称で保存しておきます。

{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "pg-debezium-postgres",
    "database.port": "5432",
    "database.user": "test",
    "database.password": "Passw0rd",
    "database.dbname" : "test",
    "database.server.name": "pg-debezium-postgres",
    "table.whitelist": "public.test",
    "plugin.name": "pgoutput",
    "topic.prefix": "test_topic",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,

    "database.connectionTimeZone": "UTC",
    "include.schema.changes": "false"
  }
}

PostgreSQLコネクタの設定内容は、公式ドキュメントを参照してください。

設定ファイルができたので以下のコマンドでコネクタを作成します。(出力されているJSON文字列は整形しています)

$ curl -i -X POST -H "Accept:application/json" -H \
    "Content-Type:application/json" \
    http://localhost:8083/connectors/ \
    -d @./sample.json
    
HTTP/1.1 201 Created
Date: Sat, 06 May 2023 05:36:54 GMT
Location: http://localhost:8083/connectors/postgres-connector
Content-Type: application/json
Content-Length: 733
Server: Jetty(9.4.48.v20220622)

{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "pg-debezium-postgres",
    "database.port": "5432",
    "database.user": "test",
    "database.password": "Passw0rd",
    "database.dbname": "test",
    "database.server.name": "pg-debezium-postgres",
    "table.whitelist": "public.test",
    "plugin.name": "pgoutput",
    "topic.prefix": "test_topic",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "database.connectionTimeZone": "UTC",
    "include.schema.changes": "false",
    "name": "postgres-connector"
  },
  "tasks": [],
  "type": "source"
}

トピックの確認

以下のコマンドで、トピックの一覧を表示することができます。まだレコードが存在しないので、先ほど作成したコネクタで定義したトピックはまだ存在しません。

$ docker-compose exec pg-debezium-kafka kafka-topics.sh --list --bootstrap-server pg-debezium-kafka:9092
__consumer_offsets
_kafka_connect_configs
_kafka_connect_offsets
_kafka_connect_statuses

レコードを追加してみます。レコードを追加するとトピックが作成されていることが確認できます。

$ docker exec -it pg-debezium-postgres psql -U test test \
    -c "INSERT INTO test (subject, created) values ( 'Test 1', NOW());"
INSERT 0 1

$ docker-compose exec pg-debezium-kafka kafka-topics.sh --list --bootstrap-server pg-debezium-kafka:9092
__consumer_offsets
_kafka_connect_configs
_kafka_connect_offsets
_kafka_connect_statuses
test_topic.public.test

では、kafkaコンテナに用意されている、kafka-console-consumer.sh を使用して購読してみます。

$ docker-compose exec pg-debezium-kafka kafka-console-consumer.sh \
        --bootstrap-server 127.0.0.1:9092 \
        --from-beginning --topic test_topic.public.test

{"before":null,"after":{"id":1,"subject":"Test 1","created":"2023-05-06T05:44:15.096521Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"test_topic","ts_ms":1683351855101,"snapshot":"false","db":"test","sequence":"[null,\"24280176\"]","schema":"public","table":"test","txId":738,"lsn":24280176,"xmin":null},"op":"c","ts_ms":1683351855591,"transaction":null}

無事、先ほど追加したレコードの内容は取得できています。では、別のターミナルを開いてデータベースを更新してみます。

$ docker exec -it pg-debezium-postgres psql -U test test \
    -c "INSERT INTO test (subject, created) values ( 'Test 2', NOW());"
$ docker exec -it pg-debezium-postgres psql -U test test \
    -c "UPDATE test SET subject = 'Test 2 updated' WHERE id = 2;"

コンシューマには以下のように表示されました。

{
  "before": null,
  "after": {
    "id": 2,
    "subject": "Test 2",
    "created": "2023-05-06T05:46:17.579336Z"
  },
  "source": {
    "version": "2.0.0.Final",
    "connector": "postgresql",
    "name": "test_topic",
    "ts_ms": 1683351977580,
    "snapshot": "false",
    "db": "test",
    "sequence": "[\"24280464\",\"24280856\"]",
    "schema": "public",
    "table": "test",
    "txId": 739,
    "lsn": 24280856,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1683351977937,
  "transaction": null
}
{
  "before": null,
  "after": {
    "id": 2,
    "subject": "Test 2 updated",
    "created": "2023-05-06T05:46:17.579336Z"
  },
  "source": {
    "version": "2.0.0.Final",
    "connector": "postgresql",
    "name": "test_topic",
    "ts_ms": 1683352381043,
    "snapshot": "false",
    "db": "test",
    "sequence": "[\"24282144\",\"24282200\"]",
    "schema": "public",
    "table": "test",
    "txId": 741,
    "lsn": 24282200,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1683352381077,
  "transaction": null
}

データを更新したのになぜか beforenull になってしまっています。 色々調べた結果、Debeziumのユーザガイド内に記載がありました。 7.3.2. Debezium PostgreSQL 変更イベントの値 こちらの解説を参考に以下のように設定を変更します。

$ docker exec -it pg-debezium-postgres psql -U test test \
    -c "ALTER TABLE public.test REPLICA IDENTITY FULL;"
ALTER TABLE

設定の変更ができたので、再度データを更新してみます。

$ docker exec -it pg-debezium-postgres psql -U test test \
    -c "UPDATE test SET subject = 'Test 2 updated 2' WHERE id = 2;"
UPDATE 1

無事、before に変更前のデータが入るようになりました。

{
  "before": {
    "id": 2,
    "subject": "Test 2 updated",
    "created": "2023-05-06T05:46:17.579336Z"
  },
  "after": {
    "id": 2,
    "subject": "Test 2 updated 2",
    "created": "2023-05-06T05:46:17.579336Z"
  },
  "source": {
    "version": "2.0.0.Final",
    "connector": "postgresql",
    "name": "test_topic",
    "ts_ms": 1683352906329,
    "snapshot": "false",
    "db": "test",
    "sequence": "[\"24284944\",\"24285000\"]",
    "schema": "public",
    "table": "test",
    "txId": 743,
    "lsn": 24285000,
    "xmin": null
  },
  "op": "u",
  "ts_ms": 1683352906769,
  "transaction": null
}

では最後に削除を試してみます。

$ docker exec -it pg-debezium-postgres psql -U test test \
    -c "DELETE FROM test WHERE id = 2;"    

無事、afternull になりました

{
  "before": {
    "id": 2,
    "subject": "Test 2 updated 2",
    "created": "2023-05-06T05:46:17.579336Z"
  },
  "after": null,
  "source": {
    "version": "2.0.0.Final",
    "connector": "postgresql",
    "name": "test_topic",
    "ts_ms": 1683353081106,
    "snapshot": "false",
    "db": "test",
    "sequence": "[\"24285496\",\"24285552\"]",
    "schema": "public",
    "table": "test",
    "txId": 744,
    "lsn": 24285552,
    "xmin": null
  },
  "op": "d",
  "ts_ms": 1683353081505,
  "transaction": null
}

これで、コンシューマを作成すれば、変更データをもとにコピーを作成したりデータを集計したりなど色々と処理ができますね!前出のブログでも紹介されている、KafkaJSを使ってコンシューマを作るのが良さそうです。

Azure Database for PostgreSQLでの設定

ここまで、手元環境のdoker上のPostgreSQLでDebeziumを試してみましたが、Azure Database for PostgreSQLで動作するか検証することにしてみます。

Azureの公式ドキュメントには、変更データ キャプチャ用に Azure Event Hubs の Apache Kafka Connect のサポートを Debezium と統合するという記事があり、Azure Event Hubsを利用した構築方法が解説されています。

このページにも 警告 として記載がありますが、Event Hubを利用する場合イベントの保存期間が制限されるという制約があるようです。これが実運用時に問題になるかはまだ把握できていないのですが、まずは自前でCDCを構築してみようと思います。

Azure Database for PostgreSQLの作成

今回は試験用なので、以下のように小さめ(お安い)で作成しています。 (フレキシブル サーバーでないと14などの新しいバージョンが利用できないので、フレキシブル サーバーを使っています)

設定の変更

まずは、 wal_levelを変更します。サーバーパラメータ編集ページで以下のようにwal_levelLOGICAL に変更して、設定を保存します。

次に、今回は手元環境から直接PostgreSQLに接続をする必要があるので、ネットワーク編集ページで 現在のクライアントIPを追加する を選択して、接続元IPアドレスを追加して、設定を保存します。

今回は実験用なのでこのように外部からの接続を許可する設定をしましたが、実運用時の設定は各環境に合わせて適切に設定してください

これで、手元環境から接続できるようになったので、以下のコマンドで接続確認をします。

$ docker exec -it pg-debezium-postgres psql -h ホスト名.postgres.database.azure.com -U test test
Password for user test: 
psql (14.7)
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.

test=> 

無事接続できましたので、先ほどと同様にテーブルを作成します。また、先ほど追加で設定したレプリケーションの設定なども更新しておきます。

$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
    -c "CREATE TABLE test (id SERIAL PRIMARY KEY, subject text NOT NULL, created timestamptz NOT NULL");
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
    -c "ALTER TABLE public.test REPLICA IDENTITY FULL;"
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
    -c "ALTER ROLE test WITH REPLICATION;"

CDCの構築

SSL接続設定の準備

Azure Database for PostgreSQLへの接続は、TLSを使用して接続する必要があるため、Debeziumからの接続に少し準備が必要です。 公式ドキュメントのAzure Database for PostgreSQL - フレキシブル サーバーでのトランスポート層セキュリティを使用した暗号化された接続の解説を参考にして、ルート証明書をダウンロードします。

docker exec -it pg-debezium curl -O https://dl.cacerts.digicert.com/DigiCertGlobalRootCA.crt.pem

コネクタの追加

準備ができたので、以下のような設定でコネクタを追加します。SSLでの接続が必要なため、ローカル用の設定に database.sslmode / database.sslrootcert の2つを追加しています。 今回は、azure~sample.jsonという名前で保存します。

{
    "name": "azure-postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "データベースのホスト名",
        "database.port": "5432",
        "database.user": "test",
        "database.password": "データベースのパスワード",
        "database.dbname" : "test",
        "database.server.name": "データベースのホスト名",
        "database.sslmode": "verify-full",
        "database.sslrootcert": "/kafka/DigiCertGlobalRootCA.crt.pem",
        "table.whitelist": "public.test",
        "plugin.name": "pgoutput",
        "topic.prefix": "azure_pgsql_topic",

        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,

        "database.connectionTimeZone": "UTC",
        "include.schema.changes": "false"
    }
  }

先ほどと同様に以下のコマンドでコネクタを追加します。

$ curl -i -X POST -H "Accept:application/json" -H \
    "Content-Type:application/json" \
    http://localhost:8083/connectors/ \
    -d @./azure-sample.json


HTTP/1.1 201 Created
Date: Sat, 06 May 2023 06:49:08 GMT
Location: http://localhost:8083/connectors/azure-postgres-connector
Content-Type: application/json
Content-Length: 924
Server: Jetty(9.4.48.v20220622)

{
  "name": "azure-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "ホスト名",
    "database.port": "5432",
    "database.user": "test",
    "database.password": "データベースのパスワード",
    "database.dbname": "webapp",
    "database.server.name": "ホスト名",
    "database.sslmode": "verify-full",
    "database.sslrootcert": "/kafka/DigiCertGlobalRootCA.crt.pem",
    "table.whitelist": "public.test",
    "plugin.name": "pgoutput",
    "topic.prefix": "azure_pgsql_topic",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "database.connectionTimeZone": "UTC",
    "include.schema.changes": "false",
    "name": "azure-postgres-connector"
  },
  "tasks": [],
  "type": "source"
}

以下のコマンドで、コネクタが追加されているか確認します

$ curl -i -X GET -H "Accept:application/json" \
    -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/

HTTP/1.1 200 OK
Date: Sat, 06 May 2023 06:51:04 GMT
Content-Type: application/json
Content-Length: 49
Server: Jetty(9.4.48.v20220622)

["postgres-connector","azure-postgres-connector"]

問題なく追加されているようです。

トピックの確認

準備ができたので、先ほどと同様に kafka-console-consumer.sh を使用して購読してみます。

$ curl -i -X POST -H "Accept:application/json" -H \
    "Content-Type:application/json" \
    http://localhost:8083/connectors/ \
    -d @./azure-sample.json

HTTP/1.1 201 Created
Date: Sat, 06 May 2023 06:49:08 GMT
Location: http://localhost:8083/connectors/azure-postgres-connector
Content-Type: application/json
Content-Length: 924
Server: Jetty(9.4.48.v20220622)

{
  "name": "azure-postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "ホスト名",
    "database.port": "5432",
    "database.user": "test",
    "database.password": "パスワード",
    "database.dbname": "webapp",
    "database.server.name": "ホスト名",
    "database.sslmode": "verify-full",
    "database.sslrootcert": "/kafka/DigiCertGlobalRootCA.crt.pem",
    "table.whitelist": "public.test",
    "plugin.name": "pgoutput",
    "topic.prefix": "azure_pgsql_topic",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",
    "database.connectionTimeZone": "UTC",
    "include.schema.changes": "false",
    "name": "azure-postgres-connector"
  },
  "tasks": [],
  "type": "source"
}

準備ができたので、別ターミナルからデータを追加・更新・削除してみます。

$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
    -c "INSERT INTO test (subject, created) values ( 'Test 1', NOW());"    
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
    -c "INSERT INTO test (subject, created) values ( 'Test 2', NOW());"    
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
    -c "UPDATE test SET subject = 'Test 2 updated' WHERE id = 2;"    
$ docker exec -it pg-debezium-postgres psql -h ホスト名 -U test test \
    -c "DELETE FROM test WHERE id = 2;"    

コンシューマでは、問題なく以下のように変更を取得できました。

{"before":null,"after":{"id":1,"subject":"Test 1","created":"2023-05-06T02:28:46.075454Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683355748922,"snapshot":"first","db":"webapp","sequence":"[null,\"20568873528\"]","schema":"public","table":"test","txId":48642,"lsn":20568873528,"xmin":null},"op":"r","ts_ms":1683355749255,"transaction":null}
{"before":null,"after":{"id":2,"subject":"Test 2","created":"2023-05-06T02:28:46.084254Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683355748922,"snapshot":"true","db":"webapp","sequence":"[null,\"20568873528\"]","schema":"public","table":"test","txId":48642,"lsn":20568873528,"xmin":null},"op":"r","ts_ms":1683355749257,"transaction":null}
{"before":{"id":2,"subject":"Test 2","created":"2023-05-06T02:28:46.084254Z"},"after":{"id":2,"subject":"Test 2 updated","created":"2023-05-06T02:28:46.084254Z"},"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683356320172,"snapshot":"false","db":"webapp","sequence":"[\"20602423752\",\"20602427936\"]","schema":"public","table":"test","txId":48718,"lsn":20602427936,"xmin":null},"op":"u","ts_ms":1683356320309,"transaction":null}
{"before":{"id":2,"subject":"Test 2 updated","created":"2023-05-06T02:28:46.084254Z"},"after":null,"source":{"version":"2.0.0.Final","connector":"postgresql","name":"azure_pgsql_topic","ts_ms":1683356329235,"snapshot":"false","db":"webapp","sequence":"[\"20602428392\",\"20602428648\"]","schema":"public","table":"test","txId":48720,"lsn":20602428648,"xmin":null},"op":"d","ts_ms":1683356329507,"transaction":null}

これで、Azure Database for PostgreSQLでもDebeziumを使用したCDCが構築できることが確認できました。

まとめ

今回は、Azure Database for PostgreSQL + DebeziumでCDCを構築するための準備として、ローカル環境 + Azure Database for PostgreSQLでCDCを構築するところまでを解説しました。 実際に利用するためには、まだまだ調査しないといけないことは多いですが、ひとまず以降の実験・調査をする土台までは検証できました。

さらっと書いていますが、概念を把握するのに色々実験したり、PostgreSQL・Azure Database for PostgreSQLで動作させるために何度も構築し直したり、色々と試行錯誤が必要でした。

この後は、以下のような残った課題を順次調査を進めていこうと思います

  • Kafka + zookeeperをどこに構築するか検討して構築
  • コネクタのパラメータの調整
  • ログの管理や監視
  • Azure Event Hubsを利用する形での実験と検討

また知見が溜まったら、ブログにまとめようと思います。

関連リンク