豪鬼メモ

MT車練習中

C++/Java/Ruby/Pythonの並列I/O用ファイルインターフェイス

ファイルの読み書きをマルチスレッドで並列に行うための仕組みを、各種プログラミング言語(C++、C、Java、Ruby、Python)で利用できるようにライブラリを整備した。メモリマップI/O、通常I/O、ダイレクトI/Oを同じインターフェイスで利用できる。Tkrzw-0.9.38からこの機能が利用できる。メモリマップI/Oと通常I/Oは全ての処理系で、ダイレクトI/OはLinuxとMacとWindowsで利用できる。


データベースライブラリTkrzwはマルチスレッド環境での並列処理性能を重視して開発しているが、そこで利用しているファイル層の機能を直接利用できるようにAPIを整備した。データベースとしてではなく、単なるフラットなファイルとして利用するためのAPIである。テキストにもバイナリにも利用できる。

大前提として、マルチスレッドで並列に読み書きをするには、ファイルオブジェクトから現在位置という概念を捨てる必要がある。読み書きする位置を、ファイル先頭からのオフセットで指定して、読み書きを行う必要がある。例えば、ファイルの101バイト目(先頭からのオフセットは100)から "hello" という5バイトを書き込み、またそれを読み出すには、C++では以下のようにする。

file.Write(100, "hello", 5);
char buf[5];
file.Read(100, buf, 5);

そのスレッドがどの位置を読み書きしたいのかは、そのスレッドが管理すべき問題だ。次のレコードを読むには、次のレコードの位置を知る必要があるが、それをどうやってやるかはファイル自体は知らない。レコードのサイズを一定にして、IDとサイズの掛け算で場所を特定する固定長レコード方式でも良いし、個々のレコードに自分の長さの情報を埋め込む可変長レコード方式でも良い。末尾に改行をつけたテキストは、各行がレコードである可変長レコードのリストであるとみなせる。

特定の位置というわけではなく、その時点でのファイルの末尾に書き込むならば、Appendというメソッドを用いる。適当な文字列と改行文字を連続して書き込むには、以下のようにする。

file.Append("hello\n");
file.Append("thank you\n");
file.Append("good bye\n");

Appendの呼び出しは並列に行えるが、その際には追記するデータが混ざらないように排他制御がなされる。よって、メタデータや番兵で区切りを示す可変長レコードの書き込みを並列に行うことができる。各レコードが末尾に改行文字を伴う文字列である場合、結果的に改行区切りのテキストを並列に構築することができる。

hello
thank you
good bye

改行区切りテキストからgrep的な検索を行うユーティリティも提供されている。文字列の中間一致、前方一致、後方一致、正規表現、最短編集距離を条件にすることができる。ファイルの先頭から末尾までスキャンする愚直な処理だが、ライブラリ内のネイティブコードで実行されるので、Java/Ruby/Pythonで該当の処理を実装した場合よりはかなり高速に処理ができる。例えば、"tokyo" に最も編集距離が近い10行を抜き出すには、以下のようにする。

std::string pattern = "tokyo";
std::vector<std::string> lines;
file.Search("edit", pattern, &lines, 10);


APIの全ての機能を説明するのは面倒なので、クックブック的に、面白い使い方に絞って紹介していこう。以後、コード例はRubyで記述するが、C++でもCでもJavaでもPythonでも同じような書き方ができるようになっている。

まずは、改行区切りで文字列をファイルに書き込んでみる。openの第1引数はファイル名で、第2引数は書き込み可能かどうかで、第3引数以降はオプションフラグだ。truncateにtrueを指定すると、ファイルに既存の内容があった場合に空に初期化してくれる。openやその他多くのメソッドはStatusオブジェクトを返すのだが、そのor_dieメソッドを呼ぶと、処理が失敗であった場合に例外を投げてくれる。

require 'tkrzw'
file = Tkrzw::File.new

file.open("data.txt", true, truncate: true).or_die
file.append("melon\n")
file.append("apple\n")
file.append("international\n")
file.close.or_die

メモリマップI/Oでテキストファイルのデータを読み込んで、正規表現にマッチする100行を抜き出すには以下のようにする。grepのmmapモードをAPIで使えるようなものだ。

file.open("data.txt", false, concurrent: true).or_die
p file.search("regex", "national$", 100)
file.close.or_die

なお、concurrent: true というオプションをopenメソッドに付けることで、Rubyインタープリタのグローバルロックを解除した状態で入出力処理が行われるようになる。マルチスレッドで大きなファイルを扱う場合、このオプションを指定するとスループットが向上する。Pythonも同様にグローバルロックを使うタイプの処理系なので、Openメソッドにconcurret=Trueを付けるとスループットが向上する場合がある。C++やJavaでは処理系のロックについて気にする必要はない。

編集距離が近いものを取り出したいなら、以下のようにする。これはagrep(多分ambiguous grepの略)の機能をAPIで利用するのと同じだ。

file.open("data.txt", false, concurrent: true).or_die
p file.search("edit", "tokyo", 100)
file.close.or_die

デフォルトではmmapを使ったメモリマップI/Oを行うが、read/writeを使った通常I/Oに切り替えることもできる。ファイルがかなり大きい場合には、こちらを使うのも良い。

file.open("data.txt", false, concurrent: true, file: "pos-para").or_die
p file.search("edit", "tokyo", 100)
file.close.or_die

実メモリの搭載量とは比較にならないほど大きいファイルの読み書きを想定しよう。例えば、128バイトの固定長のレコードを10億個書き込んだ120GBくらいのファイルがあるとしよう。このようなファイルを読み書きする場合、ダイレクトI/Oを使う方がよい。入出力のデータをファイルシステムのキャッシュに乗せると、システム全体のメモリが枯渇するからだ。

ダイレクトI/Oを利用するには、ファイルクラス(file)がPositionalParallelFile(pos-para)かPositionalAtomicFile(pos-atom)であり、ブロックサイズ(block_size)が512の倍数である必要がある。そして、アクセスオプション(access_options)に "direct:padding:pagecache" を指定する。directはダイレクトI/Oを有効化し、paddingはファイルサイズがブロックサイズの倍数になるように終了時にパディングを入れてくれ、pagecacheはプロセス内のページキャッシュを有効化して処理効率を向上させる。

require 'tkrzw'

# 固定長レコードの幅
RECORD_WIDTH = 128

# レコードIDに該当するデータを印字する
def print_record(file, id)
  offset = id * RECORD_WIDTH
  data = file.read(offset, RECORD_WIDTH)
  if data == nil
    raise "not found"
  end
  printf("%d: %s\n", id, data.strip)
end

# IDを指定してレコードを書き込む
def set_record(file, id, data)
  if data.size > RECORD_WIDTH
    raise "too large record"
  end
  data += " " * (RECORD_WIDTH - data.size)
  offset = id * RECORD_WIDTH
  file.write(offset, data).or_die
end

# ファイルを書き込みモードで開く
# 並列モードでダイレクトI/Oができるようにオプションを指定
file = Tkrzw::File.new
file.open("fix-128.dat", true,
          concurrent: true, truncate: true,
          file: "pos-para", block_size: 512,
          access_options: "direct:padding:pagecache").or_die()

# レコードを書き込む
set_record(file, 1, "hydrogen")
set_record(file, 3, "lithium")
set_record(file, 11, "natrium")

# レコードを印字する
print_record(file, 1)
print_record(file, 3)
print_record(file, 11)

# ファイルを閉じる
file.close.or_die

上述の例では固定長レコードのランダムアクセスを行っているが、自分でデータ構造を定義すれば、可変長レコードのシーケンシャルアクセスをするラッパーも比較的簡単に実装できる。世間的にRecord I/Oとか呼ばれるやつだ。今回は、先頭4バイトに自身のサイズを持ち、その後に任意のバイナリをつなげた構造を定義する。それをファイルの先頭から埋めていって、さらにそれを読み出すプログラムを書いてみる。

require 'tkrzw'

# 可変長レコードを追加する
def add_record(file, data)
  packed = [data.size].pack("N") + data
  file.append(packed)
end

# 可変長レコードを読み込み、そのデータと、次のレコードのオフセットを返す
# もうレコードがなければ、nilを返す
def read_record(file, off)
  if off >= file.get_size
    return [nil, nil]
  end
  size = file.read(off, 4).unpack("N")[0]
  data = file.read(off + 4, size)
  [data, off + 4 + size]
end

# ファイルを書き込みモードで開く
# 並列モードでダイレクトI/Oができるようにオプションを指定
file = Tkrzw::File.new
file.open("data.rio", true,
          concurrent: true, truncate: true,
          file: "pos-para", block_size: 512,
          access_options: "direct:padding:pagecache").or_die()

# レコードを追加する
["hop", "step", "jump"].each { |record|
  add_record(file, record)
}

# レコードを先頭から末尾まで読み込んで印字する
off = 0
while true do
  data, off = read_record(file, off)
  break unless off
  p data
end

# ファイルを閉じる
file.close().or_die()

可変長レコードのランダムアクセスをしたい場合、別途インデックスを作る必要がある。appendメソッドは、その時に保存したレコードの先頭のオフセットを返すので、そのオフセットを別ファイルに追記すれば、それがインデックスとして機能するようになる。オフセットを固定長で表現すればIDとの掛け算で検索できるし、キーとオフセットのペアを文字列として保存すれば、searchメソッドで検索できるようになる。とはいえ、そこまで複雑化したなら、もうDBMを使ったほうがいい。


ところで、 マルチスレッドによる並列のAppendの呼び出しでは排他制御がなされると書いたが、それはどうやって実現されているのか。 Appendが呼び出されると、書き込むデータのサイズの分だけ、ファイルサイズのメタデータがアトミックに更新される。 Appendを処理中のスレッドだけが、更新前のファイルサイズを得ることになるので、それをオフセットとして書き込みを行えば、異なるスレッドが同じ領域に書き込みを行うことはないし、一連のデータが連続した領域に書き込まれることが保証される。しかも、ロックフリーな並列書き込みが実現できている。

メタデータの更新はアトミックに行われるが、データの読み書きがアトミックに行われるかどうかは、ファイルクラスに依存する。デフォルトのファイルクラス( MemoryMapParallelFile ) は、読み書き自体にはロックをかけない。したがって、WriteやAppendをしている最中の領域をReadすることができる。その際には、更新中のデータが読み込まれる。つまり、前半は更新前で後半は更新後という状態のデータが読み込まれる可能性がある。それが嫌な場合は、呼び出し側が自前で排他制御を行うことになる(DBM層ではそれを行なっている)。PositionalParallelFileクラスは、通常I/OまたはダイレクトI/Oで、同様の並列性の読み書きを行う。

一方で、MemoryMapAtomicFileとPositionalAtomicFileクラスは、Readにはファイル全体のリーダロックを、WriteとAppendにはファイル全体のライタロックをかける。よって、どこかの領域に書き込みを行っているスレッドがいる状態では、他のスレッドによる書き込みと読み込みはブロックされる。読み込み同士は並列に行われる。したがって、更新途中のデータが読み込まれるということはなくなるが、書き込み処理が多い場合には並列性が著しく下がる。よって、アプリケーションの並列設計によって、適切なクラスを選択することが肝要だ。マルチスレッドを使わない場合にはロックをかける意味はないので、 MemoryMapParallelFileかPositionalParallelFileを使うべきだ。


まとめ。メモリマップI/OやダイレクトI/Oを、移植可能な方法で行うのは大変だ。しかし、Tkrzwが処理系の違いを吸収してくれるので、JavaやPythonやRubyからでも簡単にそれらの機能を利用することができる。マルチスレッドを使う場合には排他制御もかなり面倒な要素になってくるのだが、そのあたりもライブラリ内でうまいこと隠蔽してくれるので、プログラミングが気楽になる。並列に読み書きできるファイルがあって初めて、並列に読み書きできるデータベースやリポジトリ的な機能が実装できるようになる。