pg2arrowを並列化する

今回は皆さんが大好きな便利ツール「pg2arrow」のお話です。

PostgreSQLでポータブルな列指向データ形式 Apache Arrow を読み出すには、Arrow_Fdwを利用する事ができます。
PG-StromではGPU-Direct SQLにも対応していますし、列指向データという事もあって、被参照列しかI/Oが発生しない、同じ列のデータが近傍に固まっているという大量データ処理に適した特性を持ってもいます。

また、Apache Arrow形式のファイルを作成するにはPyArrowやPandasなど様々なツールがありますが、我々DB屋としてはPostgreSQLに格納されたトランザクショナルなデータを、分析用にApache Arrow形式として吐き出せるととても嬉しい。そんな時に使えるツールがpg2arrowなのです。

pg2arrowは、PostgreSQLにクエリを投げ、その問合せ結果をApache Arrow形式のファイルとして保存するためのツールで、PG-Stromと同梱して配布されています(ソースコードのかなりの部分を Arrow_Fdw と共用しているためです)。問い合わせに使用するSQLコマンドは、テーブルを単純にダンプするだけではなく、例えばWHERE句で抽出条件を指定したり、複数のテーブルをJOINした結果を含めるといった事も可能です。
GitHubのログを確認したところ、最初のコミットが2019年4月ですので、およそ4年ほど前に設計・開発したツールという事ですね。

一方、pg2arrowには並列動作をサポートしていないという弱点がありました。
そのため、テーブルをダンプするPostgreSQL側のCPUも、データを受け取って書き込むpg2arrow側のCPUも、またその間のネットワークもリソースに余裕があるにも関わらず結構な遊び状態となっており、『例えば1TBくらいのテーブルをArrowに変換してベンチマークを取るぜ!』といった場合でも、夜2:00頃*1にポチっとpg2arrowを走らせて、翌朝結果を確認するという事が常でした。

普通に考えて、並列化すれば大きく高速化するはずです。

データの重複を防ぐために

DBから読み出したデータを別の形式にして保存するだけですので、原理的には、複数のセッションを作成して、それぞれが並列に処理を行えば済む話です。
例えば、あるテーブルをフルダンプしてArrowに変換するとして、各DBクライアントが互いに重なり合わず、しかも漏れの無いような条件で問い合わせを実行すれば良いわけです。典型的には何かのフィールドにハッシュ関数を与え、クライアント数で割った時の剰余がクライアント番号に一致するものだけを取り出せば事足ります。
(これは PostgreSQL のパラレルクエリでも使われているアイデアです)

しかし、考慮すべき点はもう一つあります。
例えば、クライアントAがDBに接続した後、別の誰かが対象テーブルを100行更新し、その後にクライアントB、クライアントCがDBに接続したとします。
この時、更新された100行のうち30行はオレンジ色の、40行は水色の、30行はウグイス色の行だとします。そうすると、結果として生成されたArrowファイルは「部分的に更新された」不整合な状態となってしまいます。

ここでは古典的なテクニックであるスナップショット同期関数を使います。
最初のクライアントAがDBに接続し、トランザクションを開始した後、pg_export_snapshot()を呼び出します。この関数はトランザクションのスナップショットに紐づいたユニークな識別子を返しますが、これを他のセッションでインポートすると、異なるセッションであっても全く同じビューを再現する事ができます。
これはお馴染みpg_dumpの並列ダンプでも用いられており、実際の挙動は以下のようなイメージです。

クライアントA(最初に接続する)

ssbm=# BEGIN READ ONLY;
BEGIN
ssbm=*# SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET
ssbm=*# SELECT pg_catalog.pg_export_snapshot();
 pg_export_snapshot
---------------------
 0000000B-000000B0-1
(1 row)

クライアントB、C、...(ワーカー)

ssbm=# BEGIN READ ONLY;
BEGIN
ssbm=*# SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SET
ssbm=*# SET TRANSACTION SNAPSHOT '0000000B-000000B0-1';
SET

pg2arrowを並列に起動してみる。

pg2arrowの並列モードをサポートするために追加されたのは、-n|--num-workers=N_WORKERSオプションと、-k|--parallel-keys=PARALLEL_KEYSオプションの2つです。
これらは互いに排他的で、同じテーブルをスキャンする際に読み出すデータが重ならないよう検索条件を調整するための方法が若干異なってきます。
しかし、内部的な並列処理(ワーカースレッドの挙動)は変わりませんので、対象となるテーブルの設計やデータの特性によって使い分けてください。

$ ./pg2arrow --help
Usage:
  pg2arrow [OPTION] [database] [username]

General options:
  -d, --dbname=DBNAME   Database name to connect to
  -c, --command=COMMAND SQL command to run
  -t, --table=TABLENAME Equivalent to '-c SELECT * FROM TABLENAME'
      (-c and -t are exclusive, either of them must be given)
  -n, --num-workers=N_WORKERS    Enables parallel dump mode.
                        It requires the SQL command contains $(WORKER_ID)
                        and $(N_WORKERS), to be replaced by the numeric
                        worker-id and number of workers.
  -k, --parallel-keys=PARALLEL_KEYS Enables yet another parallel dump.
                        It requires the SQL command contains $(PARALLEL_KEY)
                        to be replaced by the comma separated token in the
                        PARALLEL_KEYS.
      (-n and -k are exclusive, either of them can be give if parallel dump.
       It is user's responsibility to avoid data duplication.)
      --inner-join=SUB_COMMAND
      --outer-join=SUB_COMMAND
  -o, --output=FILENAME result file in Apache Arrow format
      --append=FILENAME result Apache Arrow file to be appended
      (--output and --append are exclusive. If neither of them
       are given, it creates a temporary file.)
  -S, --stat[=COLUMNS] embeds min/max statistics for each record batch
                       COLUMNS is a comma-separated list of the target
                       columns if partially enabled.

Arrow format options:
  -s, --segment-size=SIZE size of record batch for each

Connection options:
  -h, --host=HOSTNAME  database server host
  -p, --port=PORT      database server port
  -u, --user=USERNAME  database user name
  -w, --no-password    never prompt for password
  -W, --password       force password prompt

Other options:
      --dump=FILENAME  dump information of arrow file
      --progress       shows progress of the job
      --set=NAME:VALUE config option to set before SQL execution
      --help           shows this message

Report bugs to <[email protected]>.
-n|--num-workers=N_WORKERSオプション

-nオプションはシンプルにワーカー数を指定します。
この時、Arrowファイルの元となる問合せSQL文には$(WORKER_ID)と$(N_WORKERS)というマクロを埋め込む事ができ、この部分は0から始まるユニークなワーカーIDと、オプションで指定した並列度にそれぞれ置き換えられます。
つまり、-cオプションで指定するSQLをこのように変えれば良いわけです。

オリジナル:

SELECT * FROM lineorder

修正後:

SELECT * FROM lineorder WHERE lo_orderkey % $(N_WORKERS) = $(WORKER_ID)
-k|--parallel-keys=PARALLEL_KEYSオプション

-kオプションはPARALLEL_KEYSに指定したカンマ区切りのトークン毎にワーカースレッドを起動し、そのトークンをそれぞれSQLコマンド中の$(PARALLEL_KEY)に置き換えます。

例を使って説明しましょう。以下のようにパーティション化されたテーブルが存在し、パーティションの子テーブル毎にワーカースレッドを起動して並列にテーブルを読み出したい場合、-kオプションを次のように使います。

ssbm=# \d+
                                                List of relations
 Schema |       Name       |       Type        | Owner  | Persistence | Access method |    Size    | Description
--------+------------------+-------------------+--------+-------------+---------------+------------+-------------
 public | lineorder        | partitioned table | kaigai | permanent   |               | 0 bytes    |
 public | lineorder__p1992 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1993 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1994 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1995 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1996 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1997 | table             | kaigai | permanent   | heap          | 13 GB      |
 public | lineorder__p1998 | table             | kaigai | permanent   | heap          | 7894 MB    |
 public | lineorder__p1999 | table             | kaigai | permanent   | heap          | 8192 bytes |
 public | lineorder_unsort | table             | kaigai | permanent   | heap          | 87 GB      |
(10 rows)

以下の例では、パーティション子テーブルのサフィックスである年号部分を$(PARALLEL_KEY)によって置き換えます。
そうすると、-kオプションで指定したカンマ区切りのキー値ごとにそれぞれワーカーが生成され、結果として、それが各々パーティション子テーブルの条件なしスキャンを実行しています。

$ pg2arrow -d ssbm -c 'SELECT * FROM lineorder__p$(PARALLEL_KEY)' -o /opt/hoge/f_lineorder.arrow -k=1992,1993,1994,1995,1996,1997,1998 --progress
worker:1 SQL=[SELECT * FROM lineorder__p1993]
worker:3 SQL=[SELECT * FROM lineorder__p1995]
worker:2 SQL=[SELECT * FROM lineorder__p1994]
worker:4 SQL=[SELECT * FROM lineorder__p1996]
worker:5 SQL=[SELECT * FROM lineorder__p1997]
worker:6 SQL=[SELECT * FROM lineorder__p1998]
2024-03-31 20:33:58 RecordBatch[0]: offset=1648 length=268436376 (meta=920, body=268435456) nitems=1303083 by worker:0
2024-03-31 20:33:59 RecordBatch[1]: offset=268438024 length=268436376 (meta=920, body=268435456) nitems=1303083 by worker:3
          :
          :
worker:0 merged pending results by worker:4
2024-03-31 20:38:15 RecordBatch[460]: offset=123480734608 length=127664216 (meta=920, body=127663296) nitems=619722 by worker:0
Total elapsed time: 00:04:26

ワーカーの動作について

ここで、大雑把な処理の流れにも触れておく事にします。
Pg2Arrowが並列モードで起動すると、mainスレッドであるworker-0がスキーマ定義を作成するなどの初期設定を行い、その後、他のワーカーを順次起動していきます。
各ワーカーはそれぞれPostgreSQLに接続し、それぞれ約256MBのバッファ((-sオプションで変更可))が埋まるたびに出力先のArrowファイルへと書き込みを行います。
Arrowファイルは内部がRecord Batchと呼ばれるブロックに分割されており、ファイルポインタを進める部分さえアトミックに行ってしまえば、以降の書込み処理はマルチスレッドが複数の別個の領域に独立して書き込む事ができます。そのため、シーケンシャルに実行しなければならないクリティカルセクションは最小限に抑えられています。

また、クエリの最後まで読み出したにも関わらず256MBのバッファを埋められなかった場合は、隣接スレッドのバッファにマージされ、最終的には worker-0 のバッファにマージされますので、並列ワーカーの数を増やしたとしても Record Batch の大きさが極端に小さい(= PG-Stromでの実行効率が低下する)データが作られるわけではありません。


生成した Arrow ファイルをFDW経由で参照する

それでは、生成した Arrow ファイルを参照してみる事にします。
個々のテーブル定義(カラム名、データ型)を指定するのは面倒ですので、IMPORT FOREIGN SCHEMA文を使用するのがお勧めです。

ssbm=# import foreign schema f_lineorder from server arrow_fdw into public options (file '/opt/hoge/f_lineorder.arrow');
IMPORT FOREIGN SCHEMA
ssbm=# \d f_lineorder
                        Foreign table "public.f_lineorder"
       Column       |     Type      | Collation | Nullable | Default | FDW options
--------------------+---------------+-----------+----------+---------+-------------
 lo_orderkey        | numeric       |           |          |         |
 lo_linenumber      | integer       |           |          |         |
 lo_custkey         | numeric       |           |          |         |
 lo_partkey         | integer       |           |          |         |
 lo_suppkey         | numeric       |           |          |         |
 lo_orderdate       | integer       |           |          |         |
 lo_orderpriority   | character(15) |           |          |         |
 lo_shippriority    | character(1)  |           |          |         |
 lo_quantity        | numeric       |           |          |         |
 lo_extendedprice   | numeric       |           |          |         |
 lo_ordertotalprice | numeric       |           |          |         |
 lo_discount        | numeric       |           |          |         |
 lo_revenue         | numeric       |           |          |         |
 lo_supplycost      | numeric       |           |          |         |
 lo_tax             | numeric       |           |          |         |
 lo_commit_date     | character(8)  |           |          |         |
 lo_shipmode        | character(10) |           |          |         |
Server: arrow_fdw
FDW options: (file '/opt/hoge/f_lineorder.arrow')

試しに、SSBMのQ1_2を走らせてみます。

ssbm=# select sum(lo_extendedprice*lo_discount) as revenue
from f_lineorder, date1
where lo_orderdate = d_datekey
  and d_yearmonthnum = 199401
  and lo_discount between 4 and 6
  and lo_quantity between 26 and 35;
    revenue
---------------
 9624332170119
(1 row)

もちろん、Arrowファイルの元となったlineorderテーブルを使っても同じ値が返ってきます。

ssbm=# select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, date1
where lo_orderdate = d_datekey
  and d_yearmonthnum = 199401
  and lo_discount between 4 and 6
  and lo_quantity between 26 and 35;
    revenue
---------------
 9624332170119
(1 row)

並列Pg2Arrowのパフォーマンス

最後に、もちろん気になる並列Pg2Arrowのパフォーマンスを見てみる事にします。

測定環境のスペックは以下の通り。

以下のオプションで pg2arrow を起動し、87GBのlineorderテーブルを全てArrow形式に変換するまでの時間を計測しました。

$ pg2arrow -d ssbm -c 'SELECT * FROM lineorder_unsort WHERE lo_orderkey % $(N_WORKERS) = $(WORKER_ID)' -o /opt/hoge/f_lineorder.arrow -n N_WORKERS --progress

まず緑の縦棒が並列実行なしのpg2arrowで87GBのlineorderテーブルをArrowに変換した時のもので、1,520秒、おおむね25分ちょい要しています。
これを-nオプションで並列数を増やした場合、およそ400秒(6分40秒)を少し下回った辺りで頭打ちとなっているようです。

これは検索条件(lo_orderkey % $(N_WORKERS) = $(WORKER_ID))で重複行を排除しているため、クライアント数が増加するにしたがって、同時に lineorder テーブルを重複してスキャンしなければならなくなり、トータルのバッファからの読出し負荷が増えてしまったためと言えるでしょう。

一方、パーティション子テーブルの名前を一部を-kオプションで置き換え、それぞれのクライアントが完全に独立した領域をスキャンする事になったパターンでは、並列度が7である(しかもlineorder__p1998の容量は他の子テーブルの半分なので、実質6.5並列)にも関わらず、266秒と並列実行なしのパターンに比べて5.7倍の実行時間を記録しています。

この事から、並列Pg2Arrowのパフォーマンス向上を享受するには、以下の点に注意を払う必要があるでしょう。

  • 単純に並列度を増やすだけでも効果はあるが、同じ領域を重複してスキャンする処理が増えると、効果は限定的になりがち。
  • PostgreSQLがスキャンする量を減らすような検索条件の与え方が望ましい。パーティションなどで分割されていたら理想的。

*1:もっと早く寝ろw

PG-Strom v5.0

ずいぶんご無沙汰のブログ記事となりました。

今回は、設計を一新して速く、頑強になった PG-Strom v5.0 をご紹介します。

なぜ再設計が必要だったのか?

前バージョンの PG-Strom v3.x シリーズの基本的な設計は、2018年のPG-Strom v2.0の頃から大きく変わっていません。
当時の最新GPUモデルは Volta 世代(TESLA V100)で、CUDAのバージョンは9.2ですから、かなりの大昔という事はお分かり頂けると思います。

この頃、PG-Stromの開発において最優先すべき課題は、先ず実用となるバージョンをリリースする事でした。(※ HeteroDB社の創業は2017年7月です)
クエリの処理速度を高速化する事は当然なのですが、それ以上に、まだPG-Stromの内部インフラも十分に枯れていない中で、クラッシュせずに走り切る事や、バグがあったとしても容易に原因箇所を特定できる事が優先であったのです。また、GPU側でSQLを実行するデバイスコードにしても、様々な実装方式にトライしてその中で最良を選択するというよりも、先ずは出たとこ勝負で『動くモノ』を優先するという状況でした。

ただその後、数年を経て、明らかになってきた問題が複数あり、これらはどこかのタイミングで大規模なリファクタリングを行わざるを得ないと考えていました。例えば以下のような問題点です。

問題①:CUDA Contextが消費するリソース

ご存知のようにPostgreSQLはマルチプロセスで動作します。クライアントからの接続が発生するたびにバックエンドプロセスをfork(2)し、そのプロセスがSQL処理の大半を担います。また、サイズの大きなテーブルをスキャンする時など、一時的なワーカープロセスを起動してSQL処理を並列に実行する事もあります。
PG-Stromの追加した実行計画(GpuScan、GpuJoin、GpuPreAgg)が採用された場合、これらはGPUを使用するためにCUDA Contextというものを作成します。しかしCUDA Contextを作成すると、それ自体がGPUのリソース(デバイスメモリ数百MB~)を消費してしまうため、PostgreSQLへの同時接続数が増加すると、ワーキングに利用できるメモリがほとんど残らない事になります。

問題②:複雑すぎるGPUコード自動生成ロジック

2012年にPG-Stromの最初のプロトタイプを作成した時から、PG-StromはSQLとして与えられたScan条件式(WHERE句)やJoin結合条件(ON~句)から自動的にCUDA C++のソースコードを生成し、それを実行時コンパイルしてGPU用のネイティブバイナリを生成していました。
しかし、様々な状況に対応してSQLからCUDA C++用のソースコードを生成するロジックは非常に複雑で、例えば、GpuJoin用のソースコード生成を含むsrc/gpujoin.cは8500行近くの規模があり、ソースコードを保守する上でかなり悩ましい問題を抱えていました。(要はスパゲッティ)

問題③:最低限必要な300ms

PG-Strom v3.x以前は各バックエンドプロセスがCUDA Contextを作成し、GPUのメモリ割り当てやタスク(GPU Kernel)の投入を行っていました。
このCUDA Contextの作成には実は少し時間がかかり、およそ100~150ms程度の遅延が不可避でした。また、CUDA C++のソースコードを作成し、これをGPU向けのバイナリにコンパイルする際にも、最低で200ms程度の時間がかかっていました(もちろん処理の複雑さによります)。
何十秒もかかる処理ならともかく、数十ms程度の応答速度を要求されるクエリでこのペナルティは割と厳しいものがあります。

問題④:NVIDIA GPU以外への拡張性

これは現時点では可能性の話にすぎませんが、例えば、PG-Stromの仕組みをComputational Storage Drive (CSD)に実装する事ができれば、ストレージ側のプロセッサでScan、Join、GroupByといったSQL処理の一部を実行したり、Projection処理を行う事で被参照列のみをホストに返すという列指向データ構造に近い事ができるはずです。しかし、PG-StromがCUDA C++を前提としたソースコード生成に注力していた場合、CSDで実行可能な処理の自動生成部分を二重に持つ事となり、開発効率以上にソフトウェア品質的に悩ましい問題を抱える事となりそうです。

PG-Strom v5.0のアーキテクチャ

これらの問題を解決するため、PG-Strom v5.0ではまずPostgreSQLの各バックエンドプロセスがCUDA Contextを持つ構造を廃止しました。
代わりに、常駐プロセスであるPG-Strom GPU-Serviceだけが唯一GPUと相対してリソースの管理やタスクの投入を行います。
PostgreSQLのバックエンドプロセス(で動作するPG-StromのCustomScanハンドラ)は、プロセス間通信を通じてGPU-Serviceにリクエストを送出し、その処理結果を待つだけです。GPU-Serviceはマルチスレッドで動作し、pg_strom.max_async_tasksを上限として並列にタスクを処理する事ができます。

以下の模式図をご覧ください。
PostgreSQLバックエンドプロセスが個々にGPUを管理する場合と、PG-Strom GPU-ServiceだけがGPUを管理し、他のプロセスはGPU-Serviceにリクエストを送出するモデルでは、CUDAによって消費されるGPUデバイスメモリの量が段違い(特にクライアント数が多い場合)である事がよく分かると思います。

さらにこの構造は、CUDA Contextを初期化するとき(cuCtxCreate())の100ms~150ms遅延の問題も解決します。なぜなら、CUDA ContextはGPU-Serviceの起動時に既に作成済みで、それに比べればUNIXドメインソケットを通じてGPU-Serviceとの間にコネクションを確立する処理時間など微々たるものにしかすぎないからです。

CUDA C++ネイティブコードから、疑似命令コードへ

もう一つ。これまでSQLワークロードをGPUで実行するためにCUDA C++ソースコードを生成し、これを実行時コンパイラ(NVRTC:NVIDIA Run Time Compiler)が最適化、実行時バイナリの生成というステップを踏んでいましたが、GpuJoinなどでCUDA C++ソースコードを生成するためのロジックが複雑になりすぎた事から、PG-Strom v5.0では疑似命令コードを生成するようになりました。

以下の実行計画をご覧ください。
これは様々な条件で絞り込みを行ったdate1テーブルとlineorderテーブルをJOINし、lo_extendedprice*lo_discountの結果を集計するクエリの実行計画です。
VERBOSEオプションを付加すると、GpuPreAggプランの下の方に『xxx OpCode』というものが出力されています。(※ VERBOSEオプション抜きだとここまで賑やかなEXPLAIN出力にはなりません)
このOpCodeというのは、GPU上で実行する演算子や列参照の手順をバイナリ形式でパッキングしたもので、EXPLAINの出力では可読な形式に直したものを出力しています。

従来は、ここに生成したCUDA C++のソースコードのファイル名が出力されていました。これをNVRTCに渡して実行時コンパイルし、GPU用のバイナリを生成するわけです。

=# explain verbose
select sum(lo_extendedprice*lo_discount) as revenue
from lineorder,date1
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
                                                                    QUERY PLAN
-----------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=12713126.00..12713126.01 rows=1 width=32)
   Output: pgstrom.sum_fp_num((pgstrom.psum(((lineorder.lo_extendedprice * lineorder.lo_discount))::double precision)))
   ->  Custom Scan (GpuPreAgg) on public.lineorder  (cost=12713125.99..12713126.00 rows=1 width=32)
         Output: (pgstrom.psum(((lineorder.lo_extendedprice * lineorder.lo_discount))::double precision))
         GPU Projection: pgstrom.psum(((lineorder.lo_extendedprice * lineorder.lo_discount))::double precision)
         GPU Scan Quals: ((lineorder.lo_discount >= '1'::numeric) AND (lineorder.lo_discount <= '3'::numeric) AND (lineorder.lo_quantity < '25'::numeric)) [rows: 2400065000 -> 315657500]
         GPU Join Quals [1]: (date1.d_datekey = lineorder.lo_orderdate) ... [nrows: 315657500 -> 45076290]
         GPU Outer Hash [1]: lineorder.lo_orderdate
         GPU Inner Hash [1]: date1.d_datekey
         GPU-Direct SQL: enabled (GPU-0)
         KVars-Slot: <slot=0, type='numeric', expr='lineorder.lo_discount'>, <slot=1, type='numeric', expr='lineorder.lo_quantity'>, <slot=2, type='float8', expr='(lineorder.lo_extendedprice * lineorder.lo_discount)'>, <slot=3, type='numeric', expr='lineorder.lo_extendedprice'>, <slot=4, type='int4', expr='date1.d_datekey'>, <slot=5, type='int4', expr='lineorder.lo_orderdate'>
         KVecs-Buffer: nbytes: 192512, ndims: 3, items=[kvec0=<0x0000-dfff, type='numeric', expr='lo_discount'>, kvec1=<0xe000-1bfff, type='numeric', expr='lo_quantity'>, kvec2=<0x1c000-29fff, type='numeric', expr='lo_extendedprice'>, kvec3=<0x2a000-2c7ff, type='int4', expr='d_datekey'>, kvec4=<0x2c800-2efff, type='int4', expr='lo_orderdate'>]
         LoadVars OpCode: {Packed items[0]={LoadVars(depth=0): kvars=[<slot=5, type='int4' resno=6(lo_orderdate)>, <slot=1, type='numeric' resno=9(lo_quantity)>, <slot=3, type='numeric' resno=10(lo_extendedprice)>, <slot=0, type='numeric' resno=12(lo_discount)>]}, items[1]={LoadVars(depth=1): kvars=[<slot=4, type='int4' resno=1(d_datekey)>]}}
         MoveVars OpCode: {Packed items[0]={MoveVars(depth=0): items=[<slot=0, offset=0x0000-dfff, type='numeric', expr='lo_discount'>, <slot=3, offset=0x1c000-29fff, type='numeric', expr='lo_extendedprice'>, <slot=5, offset=0x2c800-2efff, type='int4', expr='lo_orderdate'>]}}, items[1]={MoveVars(depth=1): items=[<offset=0x0000-dfff, type='numeric', expr='lo_discount'>, <offset=0x1c000-29fff, type='numeric', expr='lo_extendedprice'>]}}}
         Scan Quals OpCode: {Bool::AND args=[{Func(bool)::numeric_ge args=[{Var(numeric): slot=0, expr='lo_discount'}, {Const(numeric): value='1'}]}, {Func(bool)::numeric_le args=[{Var(numeric): slot=0, expr='lo_discount'}, {Const(numeric): value='3'}]}, {Func(bool)::numeric_lt args=[{Var(numeric): slot=1, expr='lo_quantity'}, {Const(numeric): value='25'}]}]}
         Join Quals OpCode: {Packed items[1]={JoinQuals:  {Func(bool)::int4eq args=[{Var(int4): slot=4, expr='d_datekey'}, {Var(int4): kvec=0x2c800-2f000, expr='lo_orderdate'}]}}}
         Join HashValue OpCode: {Packed items[1]={HashValue arg={Var(int4): kvec=0x2c800-2f000, expr='lo_orderdate'}}}
         Partial Aggregation OpCode: {AggFuncs <psum::fp[slot=2, expr='(lo_extendedprice * lo_discount)']> arg={SaveExpr: <slot=2, type='float8'> arg={Func(float8)::float8 arg={Func(numeric)::numeric_mul args=[{Var(numeric): kvec=0x1c000-
2a000, expr='lo_extendedprice'}, {Var(numeric): kvec=0x0000-e000, expr='lo_discount'}]}}}}
         Partial Function BufSz: 16
         ->  Seq Scan on public.date1  (cost=0.00..78.95 rows=365 width=4)
               Output: date1.d_datekey
               Filter: (date1.d_year = 1993)
(22 rows)

しかし、PG-Strom v3.xが自動生成するCUDA C++コードは、ライブラリ部分を予めビルドしておく方式に切り替えた事から、実質的にはSQL文が与えられるたびに変化する制御構造をソースコード自動生成という形で吸収していたとも言えます。
そこで、この制御構造自体をGPU側へ持ち込めば(+条件分岐に相当する部分は予め関数ポインタをセットするなどして実行コストを抑える)、実行時コンパイルの手間を省けると考えたわけです。

kaigai.hatenablog.com

実際に、比較的規模の小さなテーブル(800万件)の集計処理を PG-Strom v3.5 と PG-Strom v5.0 で比較してみます。

  • PG-Strom v3.5
ssbm=# select count(*) from lineorder_8m where lo_orderpriority = '2-HIGH';
  count
---------
 1604233
(1 row)

Time: 1132.471 ms (00:01.132)
  • PG-Strom v5.0
=# select count(*) from lineorder_8m where lo_orderpriority = '2-HIGH';
  count
---------
 1604233
(1 row)

Time: 114.781 ms

全体で数十秒~を要するクエリであれば初期セットアップの時間差は大きく影響しませんが、比較的小さなデータセットであれば、クエリの実行時間を頑張って速くしてもある一定の限度以上には高速化できないという問題がありました。しかし、v5.0ではGPU-Serviceが既にCUDA Contextを初期化している上、コードのコンパイル&最適化も不要であるため、比較的小さなテーブルであってもGPU処理の恩恵を得やすいというメリットがあります。

GPU-Serviceのマルチスレッド化とパフォーマンス改善

v5.0での大規模なリファクタリングによって、GPUを管理するのはGPU-Serviceプロセス一個だけに絞られ、さらにGPU-ServiceはマルチスレッドによりPostgreSQLバックエンドプロセスからのリクエストを次々と捌いていきます。これはGPUやCUDAのレイヤから見ると大きな変化で、並列に動作するPostgreSQLバックエンドからの要求を処理するたびにCUDA Contextを切り替える必要がなくなり、GPU Kernelを起動する際のスループットが向上します。

これは処理速度がシビアな状況で引っかかってくる事があり、例えばこれは同じ構成のサーバ*1上でStar Schema Benchmark (SSBM)を実行した場合、このSSDのSeqRead速度は6500MB/sですので、理論上、4本束ねた場合は26,000MB/sまでの読出しスループットを発揮できるはずです。
しかし、v3.5の結果を見ると20GB/s程度で性能値が頭打ちになっている一方、v5.0では24GB/s程度まで処理性能が伸びている事が分かります。かなり多くの部分で修正が加えられているため、これだけが高速化の要因というわけではないでしょうが、v5.0になりGPUへタスクを放り込むスケジューリングがより洗練されるようになってきたという事が分かります。


まとめ

PG-Stromにおけるこれら内部アーキテクチャの一新は、安定性・保守性を大きく高めると共に、GPU-Direct SQLでよりハードウェア理論速度に近いパフォーマンスを発揮し、さらにCUDA Contextの生成やCUDA C++コードのコンパイルに要する時間の削減効果で、とりわけ比較的小さなデータセット(~20GB程度)であってもGPU利用の効果が実感できるようになりました。

これらPG-Strom v5.0の特徴、修正点については、明日(3/15)のセミナーでお話しさせていただきますので、ぜひこちらも併せてご参加いただければと思います。

bakusokudb.connpass.com

*1:CPU: AMD EPYC 7402P (24C, 2.8GHz)、RAM: 128GB、GPU: NVIDIA A100 [40GB, PCI-E]、SSD: Intel SSD D7-P5510 [U.2, 3.84TB]

Fluentd向けApache Arrowプラグインについて

構想は半年ほど前?ここ一ヶ月ほど集中して開発に取り組んでいた、Fluentd向けApache Arrowプラグインがようやく動くようになったので、今回はこちらのモジュールについてご紹介します。

そもそもPG-Stromは、IoT/M2M領域で大量に発生するデータを高速に処理できますというのがセールスポイントで、GPU-Direct SQLはじめ、各種の機能によってそれを実現しているワケですが、実際に運用する際には、発生したデータを『どうやってSQLで処理できるようDBにインポートするか?』という問題があります。
例えば、PostgreSQLに一行ずつINSERTするというのも一つの解です。ただし、単純なI/Oに比べると、DBへの書き込みはどうしても処理ボトルネックになりがちです。

そこで、大量に収集するログデータを、少ない時間ロスで(つまり一時ファイルに保存したデータを再度DBにインポートするなどの手間をかける事なく)検索や集計できる状態に持って行くために、以下のように Fluentd から Apache Arrow 形式ファイルを出力し、それを直接 PG-Strom から読み出すというスキームを作りました。

Fluentdとは Treasure Data の古橋貞之氏によって開発されたログ収集ツールで、SyslogのようなサーバログからIoT/M2M機器のデバイスログに至るまで、多種多様なログデータを集積・保存するために事実上のスタンダードとして利用されているソフトウェアです。
Ruby で記述されたプラグインを追加する事で、ログデータの入出力や加工を自在にカスタマイズすることができます。

arrow-file プラグイン

Fluentdのプラグインにはいくつかカテゴリがあり、外部からログを受け取るInputプラグイン、ログを成形するParserプラグイン、受信したログを一時的に蓄積するBufferプラグイン、ログを出力するOutputプラグイン、などの種類があります。

Fluentdがログを受け取ると、Input/Parserプラグインによってログは共通の内部形式へと変換されます。
これは、ログの振り分けに利用できる識別子のtag、ログのタイムスタンプtimeおよび、生ログを整形した連想配列であるrecordです。
Bufferプラグインは、ログを Output プラグインに渡して書き出すまでの間、一時的にこれを保持します。これにより、渡すまでの間、一時的にこれを保持します。これにより、複数レコードをまとめて書き込む事で出力のパフォーマンスが向上したり、障害時のリトライを単純化する事ができます。
最後に、OutputプラグインがBufferプラグインから渡されたログをそれぞれのプラグインに応じた出力先に書き出します。

今回、作成したfluent-plugin-arrow-fileモジュールは、この Output プラグインに相当するもので、出力先として指定されたファイルに Apache Arrow ファイル形式で書き込みます。

インストール

ここでは、Treasure Data社の提供する Fluentd の安定板 td-agent を利用します。
また、arrow-fileプラグインのインストールにはrake-compilerモジュールも必要ですので、予めインストールしておきます。

Fluentdのインストール詳細については、こちらを参照してください。

$ curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent4.sh | sh
$ sudo /opt/td-agent/bin/fluent-gem install rake-compiler

次に、PG-Stromのソースコードをダウンロードし、fluentd ディレクトリ以下の物件をビルドします。

$ git clone https://github.com/heterodb/pg-strom.git
$ cd pg-strom/fluentd
$ make TD_AGENT=1 gem
$ sudo make TD_AGENT=1 install

Fluentdのプラグインがインストールされている事を確認するため、以下のコマンドを実行します。
fluent-plugin-arrow-fileが表示されていれば、インストールは成功です。

動かしてみる

では実際に動かしてみる事にします。

簡単な例として、ローカルのApache Httpdサーバのログを監視し、それをフィールド毎にパースしてApache Arrow形式ファイルに書き込みます。
<source>で/var/log/httpd/access_logをデータソースとして指定しているほか、apache2のParseプラグインを用いて、host, user, time, method, path, code, size, referer, agentの各フィールドを切り出しています。
(これは公式サイトのExampleからのコピペです)

後半の<match>以下がarrow-fileプラグインの設定です。
pathで出力先を指定しています。ここでは/tmp/mytest%Y%m%d.%p.arrowと記述していますが、書き込み時に、%Y、%m、%dはそれぞれ年、月、日に、%pはプロセスのPIDに置き換えられます。
schema_defsでは、出力先 Apache Arrow ファイルのスキーマ構造を定義します。
tsがタイムスタンプ、host、method、path、referer、agentがそれぞれ文字列(Utf8)で、codeとsizeはInt32で設定しています。

また、バッファに関してはもう少し大きなサイズを指定すべきですが、ここでは動作確認のため比較的小さなサイズ(4MB、200行)で、かつ書き出しのインターバルを10sに指定しています。実際にはPG-StromがGPU-Direct SQLを発動するのに向いたサイズのバッファサイズを指定する事をお勧めします。(例えばデフォルト値の 256MB など)

<source>
  @typetail
  path /var/log/httpd/access_log
  pos_file /var/log/td-agent/httpd_access.pos
  tag httpd
  format apache2
  <parse>
    @typeapache2
    expression /^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>(?:[^\"]|\\.)*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>(?:[^\"]|\\.)*)" "(?<agent>(?:[^\"]|\\.)*)")?$/
    time_format %d/%b/%Y:%H:%M:%S %z
  </parse>
</source>

<match httpd>
  @typearrow_file
  path /tmp/mytest%Y%m%d.%p.arrow
  schema_defs "ts=Timestamp[sec],host=Utf8,method=Utf8,path=Utf8,code=Int32,size=Int32,referer=Utf8,agent=Utf8"
  ts_column "ts"
  <buffer>
    flush_interval 10s
    chunk_limit_size 4MB
    chunk_limit_records 200
  </buffer>
</match>

さて、td-agentを起動します。

sudo systemctl start td-agent

以下のように、Apache Httpdのログが path で設定した /tmp/mytest%Y%m%d.%p.arrow が展開された先である /tmp/mytest20220124.3206341.arrow に書き出されています。

中身を見てみると、それっぽい感じになっているのが分かります。

$ arrow2csv /tmp/mytest20220124.3206341.arrow --head --offset 300 --limit 10
"ts","host","method","path","code","size","referer","agent"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/js/theme_extra.js",200,195,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/js/theme.js",200,4401,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/img/fluentd_overview.png",200,121459,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/search/main.js",200,3027,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/Lato/lato-regular.woff2",200,182708,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/fontawesome-webfont.woff2?v=4.7.0",200,77160,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/RobotoSlab/roboto-slab-v7-bold.woff2",200,67312,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:42","192.168.77.95","GET","/docs/ja/fonts/Lato/lato-bold.woff2",200,184912,"http://buri/docs/ja/css/theme.css","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:43","192.168.77.95","GET","/docs/ja/search/worker.js",200,3724,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
"2022-01-24 06:13:43","192.168.77.95","GET","/docs/ja/img/favicon.ico",200,1150,"http://buri/docs/ja/fluentd/","Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"

これを PG-Strom のArrow_Fdwを用いてPostgreSQLにマッピングしてみます。

postgres=# IMPORT FOREIGN SCHEMA mytest
           FROM SERVER arrow_fdw INTO public
           OPTIONS (file '/tmp/mytest20220124.3206341.arrow');
IMPORT FOREIGN SCHEMA

postgres=# SELECT ts, host, path FROM mytest WHERE code = 404;
         ts          |     host      |         path
---------------------+---------------+----------------------
 2022-01-24 12:02:06 | 192.168.77.73 | /~kaigai/ja/fluentd/
(1 row)

postgres=# EXPLAIN SELECT ts, host, path FROM mytest WHERE code = 404;
                                  QUERY PLAN
------------------------------------------------------------------------------
 Custom Scan (GpuScan) on mytest  (cost=4026.12..4026.12 rows=3 width=72)
   GPU Filter: (code = 404)
   referenced: ts, host, path, code
   files0: /tmp/mytest20220124.3206341.arrow (read: 128.00KB, size: 133.94KB)
(4 rows)

生成された Apache Arrow ファイルを外部テーブルとしてマッピングし、これをSQLから参照しています。

Fluentd側で成形されたログの各フィールドを参照する検索条件を与える事ができます。 上記の例では、HTTPステータスコード404のログを検索し、1件がヒットしています。

まとめ

以上のように、Fluentdで受け取ったログを Apache Arrow 形式ファイルとして書き出し、それをそのまま、つまり改めてデータをインポートする事なく PostgreSQL から参照する事ができる事が分かりました。
これは、ログ集積系のシステムから、検索・分析系のシステムへデータを移送するという手間なしにSQL処理を発行できる事を意味するほか、例えば、もう使わなくなった古いログデータをOS上でコピーして退避すれば、それだけでアーカイブ作業が終了します。(Apache Arrow形式の場合、ファイルにスキーマ構造も内包しているため、後になって『あれ?このテーブルのDDLは?』なんて事もありません)

加えて、Fluentdのarrow-fileプラグインはタイムスタンプに統計情報を付加する事もできるため、検索条件に日付時刻範囲の絞り込みを含むケースでは大幅な検索時間の高速化を見込むことができます。
kaigai.hatenablog.com

課題としては、現状、まだ「動くようになった」というレベルですので、実際に Fluentd のインスタンスを何台も立てて検証済みである、という訳ではありません。ですので、この辺はぜひ『一緒に検証しませんか?』という方がいらっしゃいましたら、お声がけいただければと思います。

半精度浮動小数点型(Float2)について

このエントリはPostgreSQL Advent Calendar 2021に参加しています。

実は、現在開発中の別の機能について書きたかったのですが、間に合いませんでした。反省。
そこで、急遽ネタを用意したのが、反省、はんせい、はんせいど…ふどうしょうすうてん(ピコーン!!

という事で、PostgreSQLで利用できる半精度浮動小数点型(float2)の事について書こうと思います。

半精度浮動小数点とは

Cで言えば32bitのfloatに対して64bitのdoubleを倍精度と呼ぶように、floatの半分である16bitの浮動小数点フォーマットが半精度浮動小数点形式です。
もちろん、データ量が少ない分、表現できる範囲や精度に制限はあるのですが、一方で必要なストレージ領域は小さく、またSIMDやGPUといったベクトル演算を行う場合にはメモリバスを有効活用できることから、機械学習の分野などで活用が進んでいます。

型名 ビット幅 指数部 仮数部
倍精度 64bit 11bit 52bit
単精度 32bit 8bit 23bit
半精度 16bit 5bit 10bit

f:id:kaigai:20211216091359p:plain

PostgreSQL で float2 型を定義する

さて、この半精度浮動小数点型ですが、PostgreSQL本体ではまだ対応していません。
そもそもがApache Arrowとのデータ交換に必要であったので PG-Strom 拡張モジュールの一部として作成したモノ・・・ではあるのですが、別にこれ自体はGPUやNVMEを必要とするものではありませんので、これ単体を切り出して利用する事も可能です。

github.com

x86_64のCPUでは今のところ半精度浮動小数点をそのまま計算する事はできませんので、内部的にはこれをfloat4やfloat8に変換した上で演算を行っています。GPU側であればfloat2のまま計算する事もできるのですが。

float2 -> float4/float8 への変換はそれほど難しいことではありません。指数部も仮数部もより幅が広くなる方に動くので、float2で表現できる値は確実にfloat4/float8へと変換する事ができます。

例えば、float2からfloat4への変換は、このような単純なビット操作だけで可能です。

static inline float
fp16_to_fp32(half_t fp16val)
{
    uint32_t    sign = ((uint32_t)(fp16val & 0x8000) << 16);
    int32_t     expo = ((fp16val & 0x7c00) >> 10);
    int32_t     frac = ((fp16val & 0x03ff));
    uint32_t    result;

    if (expo == 0x1f)
    {
        if (frac == 0)
            result = (sign | 0x7f800000);   /* +/-Infinity */
        else
            result = 0xffffffff;            /* NaN */
    }
    else if (expo == 0 && frac == 0)
        result = sign;                      /* +/-0.0 */
    else
    {
        if (expo == 0)
        {
            expo = FP16_EXPO_MIN;
            while ((frac & 0x400) == 0)
            {
                frac <<= 1;
                expo--;
            }
            frac &= 0x3ff;
        }
        else
            expo -= FP16_EXPO_BIAS;

        expo += FP32_EXPO_BIAS;

        result = (sign | (expo << FP32_FRAC_BITS) | (frac << 13));
    }
    return int_as_float(result);
}

一方で、float4/float8 -> float2 への変換は、表現可能な範囲を超えると+/-Infに発散して島唄め、注意が必要です。

postgres=# select 65000::float2;
 float2
--------
 64992
(1 row)

postgres=# select 66000::float2;
  float2
----------
 Infinity
(1 row)

テーブル定義で float2 を用いる

半精度浮動小数点データ型は PG-Strom に含まれているため、以下のようにCREATE EXTENSIONコマンドでインストールする事ができます。

postgres=# CREATE EXTENSION pg_strom ;
CREATE EXTENSION
postgres=# \dT float2
        List of data types
   Schema   |  Name  | Description
------------+--------+-------------
 pg_catalog | float2 |
(1 row)

早速、テーブルを定義して、データを流し込んでみます。

postgres=# CREATE TABLE fp16_test (
              id int,
              a  float2,
              b  float2,
              c  float2,
              d  float2,
              e  float2,
              f  float2,
              g  float2,
              h  float2
            );
postgres=# insert into fp16_test (select x, 1000*random(),
                                            1000*random(), 
                                            1000*random(),
                                            1000*random(),
                                            1000*random(),
                                            1000*random(),
                                            1000*random(),
                                            1000*random() from generate_series(1, 4000000) x);
INSERT 0 4000000

一方、比較のために倍精度浮動小数点で同じようにテーブルを定義してみます。

postgres=# CREATE TABLE fp64_test (
              id int,
              a  float8,
              b  float8,
              c  float8,
              d  float8,
              e  float8,
              f  float8,
              g  float8,
              h  float8
            );
CREATE TABLE
postgres=# insert into fp64_test (select x, 1000*random(),
                                            1000*random(), 
                                            1000*random(),
                                            1000*random(),
                                            1000*random(),
                                            1000*random(),
                                            1000*random(),
                                            1000*random() from generate_series(1, 4000000) x);
INSERT 0 4000000

あたり前の話ではありますが、大きくサイズが変わってきます。
(ただし、タプルのヘッダ 24バイト分は必ずくっつくので、単純に4倍違う、とはなりませんが)

postgres=# \d+
                                        List of relations
 Schema |       Name        |       Type        | Owner  | Persistence |    Size    | Description
--------+-------------------+-------------------+--------+-------------+------------+-------------
 public | fp16_test         | table             | kaigai | permanent   | 199 MB     |
 public | fp64_test         | table             | kaigai | permanent   | 386 MB     |

インデックスを張ることもできます。

postgres=# create index on fp16_test(b);
CREATE INDEX
postgres=# explain select * from fp16_test where b between 100 and 150;
                                       QUERY PLAN
-----------------------------------------------------------------------------------------
 Bitmap Heap Scan on fp16_test  (cost=21238.43..61716.43 rows=1000000 width=20)
   Recheck Cond: ((b >= '100'::double precision) AND (b <= '150'::double precision))
   ->  Bitmap Index Scan on fp16_test_b_idx  (cost=0.00..20988.43 rows=1000000 width=0)
         Index Cond: ((b >= '100'::double precision) AND (b <= '150'::double precision))
(4 rows)

なぜ半精度浮動小数点がGPUで好まれるのか?

最後に、なぜGPUアクセラレーションの文脈で半精度浮動小数点形式が使われるようになってきたのかを説明します。

GPUのように多数のコアが並列に動作するとき、とりわけNVIDIAのGPUではWarpと呼ばれる32スレッド単位でのスケジューリングが行われますが*1、隣接したコアが隣接したメモリからデータをロードする際、coalescingといって、一回のメモリトランザクションで複数スレッド分のデータをロードする事があります。
例えば、一回のメモリトランザクションで32byte(= 256bit)分のデータをL2キャッシュからロード*2できる場合、これが32bitの単精度浮動小数点なら、最大で8スレッドにデータを供給できる。一方、これが16bitの半精度浮動小数点なら、最大で16スレッドにデータを供給できるという計算になる。
通常、この手のワークロードであれば、メモリアクセスが最大の律速要因となってしまうので、そうすると、大量の計算をこなさねばならない機械学習のようなワークロードで、計算精度にある程度目をつぶれる(-1.0~1.0を十分に表現できればよい、など)場合には、単位時間あたりの計算量を増やすためにデータ量を削るという判断もアリとなる。

この辺については、CUDA C++ Programming GuideのMaximize Memory Throughputの章が詳しい。

言うまでもないが、これはデータが単純配列の形で並んでいるような場合の話で、例えばPostgreSQLの行データ(Heap形式)ではそれ以前の段階である。ただし、大抵のSQL条件式の検索というのは、特定の列を一回だけ参照してWHERE X BETWEEN 100 AND 200の条件を評価するものであるので、このためだけに行⇒列変換というのはリーズナブルではない。
(かつて一度実装したことがあり、やめたw)

PG-Stromでも対応している Apache Arrow 形式や、あるいはGPU Cache機能のように、データが単純配列のように並ぶことになっているデータ形式であれば、こういったcoalesced accessによるメモリ読み出しの高速化効果というものも期待できるかもしれない。
(ただし、RAM => GPUへの転送というのがそれよりずっと遅いので、あまり差分は見えてこないかも…。)

*1:しかしこれもVolta以降では条件分岐に絡めてズレる事もあるのでもはや正確な表現とも言い難いが…。

*2:Global Device Memoryからのロードは全てL2を介するとマニュアルには書いてある

PCI-E 4.0がやってきた!

突然ですが、サーバを新調しました。

昨年、先行して NVIDIA A100 を調達していたのですが、手持ちのサーバ自体はSkylake-SpでPCI-E 3.0世代なので、まだGPU自体の持つポテンシャルを評価できず・・・といったところでした。


調達したサーバは、Supermicro社のAS-2014CS-TRというモデルで、構成自体は以下の通りとなります。

  • 筐体: AS-2014CS-TR
  • CPU: AMD EPYC 7402P (24C; 2.85GHz) x1
  • RAM: 128GB [16GB DDR4-3200 (ECC) x8]
  • GPU: NVIDIA A100 (PCI-E; 40GB) x1
  • SSD: Intel D7-P5510 (U.2; 3.84TB) x4
  • HDD: Toshiba 3.5 1.0TB (7.2krpm; 6.0Gb/s) x2
  • N/W: AIOM 2-port 10Gbase-T x1

今回、はじめてCPUとSSDもPCI-E 4.0に対応した世代のものを調達して、これでCPU/GPU/SSDをPCI-E 4.0に揃えてのベンチマークが可能となります。
CPUにMillan世代ではなくRoma世代を選んだのは、この世代ではI/O周りの変化がない(らしい)とされている事と、昨今の半導体不足の影響でタダでさえ長い納期がさらに延びる懸念が・・・というワケです。(そもそもこのサーバを発注したのは6月だw)

さて、早速、いつものように SSBM のデータベースをSF=999で構築し、13本のクエリの応答速度を計測してみます。
最もサイズの大きなlineorderテーブルのサイズは 875GB となるので、ストレージ中心のベンチマークには十分なサイズです。

f:id:kaigai:20211020092809p:plain

まず結果はこの通り。
分かりやすさのためスループット表記でグラフにしていますが、これは単純に(875GB ÷ クエリ応答時間)ですので、縦軸の『Query Execution Throughput [MB/s]』の値が大きいほど処理性能が高い事を示しています。

Filesystem I/Oの場合、元々PCI-E 3.0世代でもSSD性能を遥かに下回る処理性能しか出せていませんでしたが、H/Wが新しくなっても同じような性能値であるという事は、ストレージ読み出しではなく、バッファコピーの繰り返しなど別のところにボトルネックがある事を示唆しています。

一方、GPU-Direct SQLの場合、クエリによっては19GB/sに迫る値を出しています。
PCI-E3.0のSkylake-Sp世代では8.5GB/s~9.0GB/s程度で頭打ちになっていた事を考えると、中々のスコアといえるでしょうか。

続いて、iostatで測定したnvme0~nvme3の各デバイスのクエリ実行中の読み出しスループットを計測すると、これはこれで中々ひどい。
同じサイズのデータを読み出すにも、短時間でガツん!と読み出すか、時間をかけてチンタラ読み出すかというのが可視化されていると思います。
f:id:kaigai:20211020094245p:plain

ここまでは同じデータ形式の場合。

では、この875GB/60億行のlineorderテーブルを Pg2Arrow でApache Arrow形式に変換し、列データとして読み出す場合であればどうか?
行データと列データの処理性能をスループットで測るのは適切とは言えませんので、今度は(60億 ÷ クエリ応答時間)を『1秒あたり処理した行数』としてプロットしてみます。すると、I/Oの効率が良い分、さらに差が広がる結果となりました。

f:id:kaigai:20211020100430p:plain

細かいところで言うと、『秒速で10億行』を達成しているQ1_1と、Q1_2およびQ1_3のデータ読み出しサイズは等しいので、処理時間もそれと同じ程度になっていてほしいのですが、オプティマイザがいま一つ、おバカな実行計画を作ってしまったようです。この辺は要改善といったところでしょうか。

Q1_1のEXPLAIN ANALYZE結果

postgres=# explain analyze
select sum(lo_extendedprice*lo_discount) as revenue
from flineorder,date1
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
                                                                      QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=9717634.19..9717634.20 rows=1 width=8) (actual time=4670.374..4781.510 rows=1 loops=1)
   ->  Gather  (cost=9717633.96..9717634.17 rows=2 width=8) (actual time=4387.503..4781.496 rows=3 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Parallel Custom Scan (GpuPreAgg)  (cost=9716633.96..9716633.97 rows=1 width=8) (actual time=4359.970..4359.976 rows=1 loops=3)
               Reduction: NoGroup
               Combined GpuJoin: enabled
               GPU Preference: GPU0 (NVIDIA A100-PCIE-40GB)
               ->  Parallel Custom Scan (GpuJoin) on flineorder  (cost=17101.66..9716355.33 rows=594409 width=8) (never executed)
                     Outer Scan: flineorder  (cost=17060.26..9711145.81 rows=4162493 width=12) (actual time=156.771..564.225 rows=5993990673 loops=1)
                     Outer Scan Filter: ((lo_discount >= 1) AND (lo_discount <= 3) AND (lo_quantity < 25))
                     Rows Removed by Outer Scan Filter: 5209361385
                     Depth 1: GpuHashJoin(plan nrows: 4162493...1426582, actual nrows: 784629288...119025391)
                              HashSize: 20.50KB (estimated: 63.28KB)
                              HashKeys: flineorder.lo_orderdate
                              JoinQuals: (flineorder.lo_orderdate = date1.d_datekey)
                     GPU Preference: GPU0 (NVIDIA A100-PCIE-40GB) with GPUDirect SQL
                     referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount
                     files0: /opt/pgdata13/flineorder.arrow (read: 89.37GB, size: 681.05GB)
                     ->  Seq Scan on date1  (cost=0.00..78.95 rows=365 width=4) (actual time=0.060..0.317 rows=365 loops=1)
                           Filter: (d_year = 1993)
                           Rows Removed by Filter: 2191
 Planning Time: 2.334 ms
 Execution Time: 4910.442 ms
(24 rows)

上記のようにGpuPreAggの直下にGpuJoinが入っており、加えて「Combined GpuJoin: enabled」と出力されています。
このとき、GpuJoinの処理結果は一度CPUに戻される事なくGPU上で集約されるため、最も効率のよいパターンとなります。
また、Apache Arrowファイルは全体で681GBあり、そのうち89GBを読み出した事が分かります。つまり、20GB/s程度の読み出し速度があれば、5秒程度で処理を終えるのは不思議な事ではないという事になります。


Q1_2のEXPLAIN ANALYZE結果

postgres=# explain analyze
select sum(lo_extendedprice*lo_discount) as revenue
from flineorder, date1
where lo_orderdate = d_datekey
  and d_yearmonthnum = 199401
  and lo_discount between 4 and 6
  and lo_quantity between 26 and 35;
                                                                                  QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=10085531.30..10085531.31 rows=1 width=8) (actual time=13043.719..13155.506 rows=1 loops=1)
   ->  Gather  (cost=10085531.08..10085531.29 rows=2 width=8) (actual time=12720.924..13155.496 rows=3 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=10084531.08..10084531.09 rows=1 width=8) (actual time=12709.551..12709.556 rows=1 loops=3)
               ->  Hash Join  (cost=17686.41..10084527.30 rows=757 width=8) (actual time=4078.065..12645.401 rows=1404451 loops=3)
                     Hash Cond: (flineorder.lo_orderdate = date1.d_datekey)
                     ->  Parallel Custom Scan (GpuScan) on flineorder  (cost=17607.07..10084283.79 rows=62438 width=12) (actual time=313.555..6825.444 rows=108984171 loops=3)
                           GPU Filter: ((lo_discount >= 4) AND (lo_discount <= 6) AND (lo_quantity >= 26) AND (lo_quantity <= 35))
                           Rows Removed by GPU Filter: 5667038161
                           GPU Preference: GPU0 (NVIDIA A100-PCIE-40GB) with GPUDirect SQL
                           referenced: lo_orderdate, lo_quantity, lo_extendedprice, lo_discount
                           files0: /opt/pgdata13/flineorder.arrow (read: 89.37GB, size: 681.05GB)
                     ->  Hash  (cost=78.95..78.95 rows=31 width=4) (actual time=0.521..0.522 rows=31 loops=3)
                           Buckets: 1024  Batches: 1  Memory Usage: 10kB
                           ->  Seq Scan on date1  (cost=0.00..78.95 rows=31 width=4) (actual time=0.171..0.513 rows=31 loops=3)
                                 Filter: (d_yearmonthnum = '199401'::numeric)
                                 Rows Removed by Filter: 2525
 Planning Time: 2.308 ms
 Execution Time: 13268.534 ms
(20 rows)

一方、Q1_2の結果を見てみると、GpuPreAggが選択されておらず、GpuScanによるフィルタを実行した後はCPUでHashJoinとAggregateを実行しています。GpuScanの返す行数の推定値が62438行足らずなので『そんな程度の処理にGPUを使うまでもない』というのは分かるのですが、実際には3億行ちょい(1億行×3ワーカー)を読み出している事になるので、読みが大ハズレだったというわけです。

ハードウェアの増強だけでは高速化を達成する事はできないというよい例ですが、この辺は、追って調べてみたいと思います。

なお、今回のこの測定結果を含む、大量データを処理するための PG-Strom の諸機能については、11月12日(金)のPostgreSQL Conference Japan 2021にて発表を行います。

このご時世、なかなか顔を突き合わせたディスカッションの機会が少ないのですが、こちらはオフラインでの開催予定となっております。
ぜひのご参加をお待ちしております。

HyperLogLogを使ったカーディナリティの推測(補足)

少し、こちらのフォローアップ記事となります。

kaigai.hatenablog.com

ブログ公開後、何件かコメントをいただきました。

なるほど確かに、GUCパラメータの値に応じてCOUNT(distinct KEY)を置き換える構造だと、そのつもりがないのに、HyperLogLogを使ったカーディナリティの推計を行ってしまう・・・という事故が発生してしまうかもしれぬ。

という事で、前回の記事で説明したpg_strom.enable_hll_countは廃止し、代わりに、ユーザが明示的にHyperLogLogを使用する事を教えるために、hll_count(KEY)という集約関数を追加しています。

使用法としてはこんな感じ。

=# select hll_count(lo_custkey) from lineorder ;
 hll_count
-----------
   2005437
(1 row)

さらにもう一点、hll_count(KEY)はHyperLogLogを使って作成したHLL Sketch(前回記事でHLL Registersと呼んでいたもの。用語を統一。)を元に推計値を出す関数ですが、推計値を出すのではなく、そのままHLL Sketchをbytea型で保存できるようにしました。
こちらは、hll_sketch(KEY)という集約関数になり、あとで保存しておいたHLL Sketchをhll_merge(SKETCH)に食わせて、改めて推計値を出力できるようになります。

使い方としては、例えば、予め週次や月次のデータで HLL Sketch を作成しておけば、あとで必要な範囲だけの HLL Sketch をマージしてカーディナリティの推計値を出力するといった使い方が考えられます。

使用法としてはこんな感じ。

--- 年単位で HLL Sketch を出力する
=# select lo_orderdate / 10000 as year, hll_sketch(lo_custkey) as sketch
     into pg_temp.annual
     from lineorder group by 1;
SELECT 7

--- HLL Sketchをヒストグラムにして出力する
=# select year, hll_sketch_histogram(sketch) from pg_temp.annual order by year;
 year |                 hll_sketch_histogram
------+-------------------------------------------------------
 1992 | {0,0,0,0,0,0,0,0,0,22,73,132,118,82,39,26,12,2,4,2}
 1993 | {0,0,0,0,0,0,0,0,0,9,59,118,125,96,50,30,15,2,6,2}
 1994 | {0,0,0,0,0,0,0,0,0,4,33,111,133,113,53,36,17,4,6,2}
 1995 | {0,0,0,0,0,0,0,0,0,2,21,99,131,121,62,42,18,5,7,3,1}
 1996 | {0,0,0,0,0,0,0,0,0,1,17,84,119,131,73,50,20,5,7,4,1}
 1997 | {0,0,0,0,0,0,0,0,0,0,14,71,118,128,82,53,23,10,7,4,2}
 1998 | {0,0,0,0,0,0,0,0,0,0,13,64,114,126,86,61,23,11,8,4,2}
(7 rows)

--- 累積値で lo_custkey のカーディナリティを推測
=# select max_y, (select hll_merge(sketch) from pg_temp.annual where year < max_y)
     from generate_series(1993,1999) max_y;
 max_y | hll_merge
-------+-----------
  1993 |    854093
  1994 |   1052429
  1995 |   1299916
  1996 |   1514915
  1997 |   1700274
  1998 |   1889527
  1999 |   2005437
(7 rows)

例えば、ユニークユーザ数の集計を日次・週次で集計する時など、毎回 COUNT(distinct KEY)でやっていては遅くてたまらない、みたいな状況であれば、利用価値のある手法かもしれません。

本日、PostgreSQL Unconference (online) にてこの辺のトピックについて話しますので、お時間ある方はぜひご覧ください。
pgunconf.connpass.com

HyperLogLogを使ったカーディナリティの推測

高校生の頃までは滋賀県に住んでいた事もあり、夜、勉強の合間に、KBS京都で放送されていた『日髙のり子のはいぱぁナイト』を聞いており、日々ネタを考えては、番組へハガキを投稿する常連だった*1のですが(←勉強はどうした)、今回は、PG-Stromに実装した『はいぱぁ』な機能を紹介したいと思います。

ja.wikipedia.org

SELECT COUNT(distinct KEY) は結構難しい

SELECT COUNT(KEY) FROM my_table;

が

SELECT COUNT(distinct KEY) FROM my_table;

になった瞬間、特にサイズの大きなテーブルをスキャンする場合には、非常に難しい問題になってしまいます。

最初の例は、KEYが非NULLである行数を全部カウントして返せば良いのですが、後者の場合はKEYが重複する場合にはカウントしないため、重複排除を行うための工夫が必要になります。これをカーディナリティを計算すると言います。

これをDBで実装するには2通りの方法が考えられます。

方法①
入力ストリームを予めKEY値でソートしておき、KEY値が変わるたびにカウンタをインクリメントする。
KEY値にインデックスが張られている場合などには有効な方法だが、そうでなければ、実行時にテーブル全体のソートが必要になる。しかも、領域分割による並列処理が不可能であるので、仮に入力レコードが数億行あったとすると、律儀にCOUNT(distinct KEY)関数を数億回実行せねばならない。

方法②
集約関数を実行する Agg ノードでハッシュ表を持っておき、KEY値がそれまでにスキャンしたレコードに含まれているかどうかを判定する。最終的なCOUNT(distinct KEY)の結果は、このハッシュ表のエントリ数となる。
ソートは必要ないが、メモリ消費量が事前に予測不可能で、ハッシュ表のサイズによっては並列処理も難しい。(通常、メモリ消費が問題になるような状況ではマージ処理も大変な負荷になる)

なので、大量のデータセットの中から正確なカーディナリティを出力しようとすると、そこそこ大変(= 処理時間がかかってしまう)という事になります。

「ざっくり」でもよくないですか?

ただこれは、厳密な重複排除を行った集計を行う上での制限事項で、世の中には「ざっくりとした数が知りたい」で十分なケースが存在します。例えば、アクセスログからアクティブなユーザ数を集計してグラフに出したい、といった場合など、多少の誤差は許容できるユースケースです。

これを比較的精度よく推定できる方法として、HyperLogLogという手法が知られており、いくつかのビッグデータ処理向けデータベースに実装されているものもあります。

en.wikipedia.org

今回は、GPU上でのGROUP BY処理を行うGpuPreAgg機能の拡張として、PG-StromにHyperLogLogを実装してみました。

HyperLogLogアルゴリズムの考え方

HyperLogLogアルゴリズムの考え方をざっくり説明します。

  • 前提①:COUNT(distinct KEY)のKEY値をハッシュ関数にかけると、ランダムなビット列が生成されるハズである。
  • 前提②:KEY値のカーディナリティが高ければ、...10100000のように0が連続するパターンも含まれるハズである。

したがって、テーブルをスキャンしてKEY値のハッシュを計算し、その中で下位ビットから連続する0の個数の最大値を記録しておけば、その集合のカーディナリティは2^n程度であると推定する事ができます。
もちろん、このようにnの値に応じて2^nで増えていく推定値というのはあまりにも誤差が大きいですので、もう少し工夫を加えます。
ハッシュ値の下位bビット分をm互いに独立したカウンタであるHLLレジスタのインデックスと見なし、残りのビット列から連続する0の個数をカウントして、インデックスされたHLLレジスタにその最大値を記録します。
最後に、これの平均値を計算する事で、しばしば混じってしまう例外的なハッシュ値の影響を排除し、もう少し真の値に近いKEY値のカーディナリティを推定する...という流れになります。

この手法の良いところは、入力値をソートする必要がなく、また、テーブルを分割統治して互いに独立なHLLレジスタを作ったとしても、それほど大量のメモリを消費しないため、並列処理に向いているところです。
例えば、64bitのハッシュ値でレジスタのセレクタに10bitを使った場合、各レジスタは8bitあればカウンタとして十分に機能するため、HLLレジスタとして必要なのは僅か1.0kBだけという事になります。

PG-StromにおけるHyperLogLog

ここでは例として、Star Schema Benchmarkデータセットの lineorder テーブルから、lo_custkey*2のカーディナリティを調べてみる事にします。
scale factorは100なので、テーブルのサイズは概ね87GBとなります。

nvme=# \d+
                            List of relations
 Schema |   Name    | Type  | Owner  | Persistence |  Size  | Description
--------+-----------+-------+--------+-------------+--------+-------------
 public | customer  | table | kaigai | permanent   | 406 MB |
 public | date1     | table | kaigai | permanent   | 416 kB |
 public | lineorder | table | kaigai | permanent   | 87 GB  |
 public | part      | table | kaigai | permanent   | 160 MB |
 public | supplier  | table | kaigai | permanent   | 132 MB |
(5 rows)
nvme=# explain select count(distinct lo_custkey) from lineorder;
                                  QUERY PLAN
------------------------------------------------------------------------------
 Aggregate  (cost=18896094.80..18896094.81 rows=1 width=8)
   Output: count(DISTINCT lo_custkey)
   ->  Seq Scan on public.lineorder  (cost=0.00..17396057.84 rows=600014784 width=6)
         Output: lo_orderkey, ...(snip)..., lo_shipmode
(4 rows)

デフォルト設定では、このように count(distinct ...) を含むクエリをGPUで実行できません。
これは、HyperLogLogによる推定値で count(distinct ...) を代替する事で結果が変わってしまうため、デフォルトでは無効化されているためです。

nvme=# select count(distinct lo_custkey) from lineorder;
  count
---------
 2000000
(1 row)

Time: 409851.751 ms (06:49.852)

実行すると、厳密なcount(distinct lo_custkey)は 2,000,000 である一方、その実行には 409 秒を要している事が分かります。
(Sortの高速化を目的としたCPU並列クエリすら有効になっていないので、当然と言えば当然と言えます。)

次に、PG-StromのHyperLogLog機能による count(distinct ...) の置き換えを有効にします。

nvme=# set pg_strom.enable_hll_count = on;
SET

実行計画を見てみましょう。

nvme=# explain verbose select count(distinct lo_custkey) from lineorder;
                                                   QUERY PLAN
-----------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=7444397.37..7444397.38 rows=1 width=8)
   Output: pgstrom.hll_count((pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey))))
   ->  Gather  (cost=7444397.14..7444397.35 rows=2 width=32)
         Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
         Workers Planned: 2
         ->  Parallel Custom Scan (GpuPreAgg) on public.lineorder  (cost=7443397.14..7443397.15 rows=1 width=32)
               Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
               GPU Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
               GPU Setup: pgstrom.hll_hash(lo_custkey)
               Reduction: NoGroup
               Outer Scan: public.lineorder  (cost=2833.33..7365270.22 rows=250006160 width=6)
               GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
               Kernel Source: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_35128.1.gpu
               Kernel Binary: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_35128.2.ptx
(14 rows)

GPUを用いた集約関数であるGpuPreAggが選択されているほか、元々count(DISTINCT lo_custkey)を出力していた Aggregate ノードが、代わりにpgstrom.hll_count((pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))) の実行結果を出力するように書き換えられています。

内側から順に説明すると、pgstrom.hll_hash(lo_custkey)関数は、HyperLogLogに使用するハッシュ値を計算するための関数で、ここでは軽量かつ比較的ランダムな64bitのハッシュ値が得られるという事でSipHashアルゴリズムを使用しています。
次に、pgstrom.hll_pcount(HASH)関数は、HLLレジスタ配列をセットアップし、引数として与えられた64bitのハッシュ値を元にこれを次々と更新していきます。重要なのは、pgstrom.hll_pcount(HASH)関数はHLLレジスタ配列だけを出力するため、どれだけ巨大なテーブルをスキャンする事になったとしても、pgstrom.hll_pcount(HASH)関数より後の工程ではたった1行しか(GROUP BY句が指定されている場合はグループの数だけしか)返さないという事です。

したがって、各ワーカープロセスから返却されるものも含め、CPUでHLLレジスタ配列をマージする事になるpgstrom.hll_count()関数は、僅か1行 x 3プロセス分の結果を処理するだけで、HyperLogLogによるlo_custkey値のカーディナリティの推定が可能になるという事です。

このような性質により、GPU/CPUの並列処理の恩恵を最大限に受ける事ができるため、パフォーマンスも良好です。
厳密な集計値を導出するために409秒を要していた一方、実際の値と 0.3% 程度しかズレのない 2,005,437 という推定値を9.2秒で導出しています。

nvme=# select count(distinct lo_custkey) from lineorder;
  count
---------
 2005437
(1 row)

Time: 9212.712 ms (00:09.213)

実行計画の詳細を見てみましょう。

nvme=# explain (verbose, analyze) select count(distinct lo_custkey) from lineorder;
                                                                           QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=4992387.95..4992387.96 rows=1 width=8) (actual time=9045.729..9081.690 rows=1 loops=1)
   Output: pgstrom.hll_count((pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey))))
   ->  Gather  (cost=4992387.72..4992387.93 rows=2 width=32) (actual time=8892.195..9081.633 rows=3 loops=1)
         Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
         Workers Planned: 2
         Workers Launched: 2
         ->  Parallel Custom Scan (GpuPreAgg) on public.lineorder  (cost=4991387.72..4991387.73 rows=1 width=32) (actual time=8760.881..8760.885 rows=1 loops=3)
               Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
               GPU Output: (pgstrom.hll_pcount(pgstrom.hll_hash(lo_custkey)))
               GPU Setup: pgstrom.hll_hash(lo_custkey)
               Reduction: NoGroup
               Outer Scan: public.lineorder  (cost=2833.33..4913260.79 rows=250006160 width=6) (actual time=159.316..2800.578 rows=600037902 loops=1)
               GPU Preference: GPU0 (Tesla V100-PCIE-16GB)
               GPUDirect SQL: load=11395910
               Kernel Source: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_39266.2.gpu
               Kernel Binary: /var/lib/pgdata/pgsql_tmp/pgsql_tmp_strom_39266.3.ptx
               Worker 0:  actual time=8694.640..8694.644 rows=1 loops=1
               Worker 1:  actual time=8699.829..8699.833 rows=1 loops=1
 Planning Time: 0.129 ms
 Execution Time: 9194.200 ms
(20 rows)

このクエリには全体で9.2秒を要していますが、そのうち8.760秒が GpuPreAgg での実行に要しています。
ここでは GPU-Direct SQL を用いて、4台のNVME-SSDから 10GB/s 程度のスループットで合計6億行を読み出していますが、GpuPreAggが出力しているのはHLLレジスタ配列の1行だけであるので、非常に効率的なデータ転送が行われていると言えます。

下の図で言えば、テーブル(ディスク)からデータを読み出し、GPU上で実行される hll_pcount() 関数にロードするところまでが、スループット番長である PG-Strom の真骨頂で、クエリの書き換えとアルゴリズムの工夫により、厄介なCOUNT(distinct KEY)をこのような形態の処理に書き換えるところが HyperLogLog の恩恵と言えるでしょう。

結論

  • COUNT(distinct KEY)関数で「大まかな推定値」を得れば十分である場合、HyperLogLogを使って相応に精度の良い推定値を得る事ができる。
  • COUNT(distinct KEY)関数を、distinct句の付かない集約関数に書き換える事で、領域分割と並列処理が可能な形式に変換できる。このパターンに落とす事ができれば、GPU-Direct SQLでほぼほぼハードウェアの限界に近い速度で集計処理を回すことができる。

ひとまず、現状では論文に書かれている内容をそのまま何も考えた形なので、例えばカーディナリティが小さい時の推定値のズレや、より正確な推定値を得るための補正(関連研究でそういうのがあるらしい)については、全く何も入っていません。誰かそういうのに強い人がパッチを書いてくれたりすると助かります(ボソッ

*1:ちなみに、『リスナーと電話をつないでクイズに答える』というコーナーで、日髙さんと一度だけ15秒くらい喋った事がある。

*2:customer表に対するキー