全IT関係者が知っておくべき「1-copy-snapshot isolation」

snapshot isolationを分散環境に適用する場合の「基本」の内容のまとめになります。(基本自分用のメモなので、間違っていたらすみません)

まずワーディングの整理

・snapshot isolation
 TXの分離レベルとしてのsnapshot isolation(以下SI)は、現在のRDBMSのTX管理では、ほぼ実装的にはデファクトと見ていいと思います。ただしANSIの規定のISOLATION_LEVELには定義がないので、どのあたりに位置づけるのかは、DB実装のそれぞれの取り扱いにより異なります。とはいえ、どのDBでもほぼSERIALIZABLEに近い位置づけにしているところが多いですね、というか、SI(特にSerializable SI)ぐらいでないとserializableに現実的には近づけないというのが実態かと思います。(勿論理論上はS2PLで実装は可能ですが、まぁパフォーマンスがアレすぎるので)

この上で・・・

・snapshot isolationと分散環境の親和性
 ちょっと考えればすぐにわかりますが、めちゃめちゃ高いわけです。基本的にsnapshotの実行先=別ノード(別プロセス)と見ればよいわけです。read-onlyなTXであれば、そのまま別プロセスで実行することで、単ノードであろうと、分散ノードであろうと、ほぼ透過的な扱いにすることが可能です。要するに、snapshotをとることとreplicationの実行することが、まぁ基本的に同じ(厳密には全然違いますが)と見なすことで、なんとなく同じようなことができそうだ、という匂いはするわけです。んで、まぁ実際、この分散環境下でのsnapshot-isolationは、割と応用範囲が広い感じで受け止められているので、一応、ほぼ今後の分散トランザクションのデファクトとして見てよいでしょう。基本として押さえておくべきです。・・当然、更新が問題になるわけですが。

ここで、1-copy-snapshot isolationに入る前にですね・・・

・1-copy-hogehoge
 一般に1-copy-hogehogeという場合は、「単ノードで実現できていたhogeghogeが分散環境でも透過的に達成できる」ことを意味します。たとえば 1-copy-serializableということであれば、単ノードでの処理がserializableな実行を、そのまま“同じように”分散環境で実行できる、ということを意味します。それでよくあるのが、単ノード環境でのACID特性をそのまま分散環境でも実行できます、という概念で、1-copy-atomicityとか1-copy-isolationという言い方をします。1-copy-durabilityというのもありますね。例えば、1-copy-okachimachiというと単ノードのokachimachi属性が分散環境でも透過的に実現できているという感じです。
 よってここでいう 1-copy-snapshot isolationということであれば、これは単ノードで実行できていたsnapshot isolationを分散ノードでも実行できるということを意味します。そもそも「同じ」ってのはどういうことかっていう議論はあるのですが、まぁ端(client)から見ていて同じに見えるという意味でここでは逃げましょう。尚、Txの世界では、「同じ」っていうことは、より厳密にいうとTxを構成する一連の手続きの意味論が同じある種の系に属する、という事になります。

・1-copy-snapshot isolation詳説
 というわけで、個人的に、ほぼ世界の常識として「全人類が知っておくべき1-copy-snapshot isolation」を順を追って解説しておきます。というか、割と簡単なんですけど、transaction系のお話は、忘れる時は忘れるので、メモっておきます。(以下はある程度replicationの常識がわかっている前提もありますが、面倒な人は下の方のバリエーションを読めばわかると思います。)

1) eagerかlazyか
 基本eagerです。というかlazyはそもそも本来的に一貫性が取りにくいです。lazyだとupdateのconflictがcommit後に検出されるので、そもそもserializableとかそういう議論にはなり難いです。一回commitしたやつをひっくり返すとかアレ過ぎますからね。
 propagationは、基本的に非対称(要するにSQLコピー丸投げじゃなくて、差分binary的なアレ)でwrite-setを取得して、取得各replicaへ転送します。commitのタイミングは通常、write-setを取得した段階で行われます。・・・そしてここが話題になります。

2) primary-backupとupdate-anywhere
両者あります。現行の運用レベルでみれば、妥当なものはprimaryだと思いますが、今後のマルチDCとかgeo-replicationとか考えると、ある程度update-anywhereも想定した方がよいと思われます。とはいえ基本的にROWAをベースに敷いています。まぁfull-replicationが基本ですね。部分replicationにすると、後述のvalidationのタイミングで結論が一致しないという可能性も出てきます。(なので、そのときどうするか、ということで対応策が別にあります。)

 尚、full-replicationは、write-intensiveになるとスケールアウトがしづらいので、モデルとしてのROWAはわかりやすく優秀だとは思いますが、現実路線を見たときに部分replicationを考える必要があると思っています。ただし、これはデータのパーティショニングがベースになるはずなので、むしろ業務的なデザインの問題になるはずです。(必要なデータのすべての一斉更新とか、ROWAだとほぼ最悪の処理に近いはずですが、しかし至極普通にあるわけですよ。)

3) centralizedとde-centralized
snapshot isolationは基本的にoptimistic concurrency controlです。従ってcommit validationをどこでやるか?ということが問題になります。centralizedは一カ所で、de-centralizedは複数箇所で行います。可用性を考えればたぶんde-centralizedだと思いますが、とはいえ、実装や品質を考えるとcentralizedも検討の範囲ではあると思います。ただし、これもマルチDCとかgeo-replicationとかを考えるとde-centralizedが必要になります。

ということを踏まえた上で・・・基本となるパターンを順番に列挙していきます。

■ベースプロトコル:
まず前提として、Txをパースした時点でread onlyかwriteを含むかを判断します。

read only Tx:Tx開始時点にアサインされた、各replicaローカルの最新のversionを読みます。readがwriteにブロックされません。

write を含むTx:書き込みはすべて一度ローカルに書き出します。ただしコミットされるまでは、書き出したTx以外は触れることはできません。

commit:optimistic concurrency controlです。write操作を含むTxについてはcommit時点でTxの成否を判断します。基本的にTx開始時点のtimestamp(tx_begin)とcommit時点の最新のversionのtimestamp(current_version)を比較して、後者が前者よりもあと(timestamp(tx_begin)<timestamp(current_version))の場合はabortする、いわゆるfirst-commit-winルールを適用します。
validationに成功した場合は、書きこまれたその値を最新versionにし、各replicaにばらまきます。尚、仕組みとしては、全順序が分かればいいというレベルの要求になります。順番がわからないとアウトです。

■バリエーション:
まず共通のケースモデル:
Tx1 w1(x)c1
Tx2 r2(y)w2(x)c2→a2
Tx3 r3(y)r3(x)c3
Tx4 r4(x)r4(y)c4

history:w1(x)r2(y)r3(y)c1r4(x)w2(x)c2→a2r3(x)c3r4(y)c4
Tx1からTx4まですべてconcurrentになりますがTx1とTx4はオーバーラップしません。
Tx2ではxを書きにいきますが、c1が先になっているので、first-commit-winで負けてabortになります。
Tx3/Tx4はread onlyなので、replicaで処理されます。

1) Primary-backup & Central
もっとも単純でわかりやすい。これ以降はこのバリエーションになります。
Txはすべてcentral middlewareで一回ハンドリングします。read-only Txと判断できるときは、指定したローカルreplicaに処理を飛ばします。ここではTx3→secondary1 Tx4→secondary2にアサインします。writeを含む、read-only Tx以外のTxについては、writeをprimaryで処理します。commitリクエストが来たタイミングで、primaryからwriteの内容(write set)を取得して、validationを行います。成功した場合はコミットをprimaryに書き込んで、write setを各replicaに配布します。各replicaはコミット順に配布されたwrite setを自分のローカルでupdateしていきます。・・・以下具体的に

・w1(x) Tx begin。primaryに書きだす。
・r2(y) Tx begin。writeもあるTx。まずはreadをsecondary2へ飛ばして、y0を取得。
・r3(y) Tx begin。read-onlyで、処理はsecondary1へ飛ばす。
・c1 Tx1 commit。書き込まれたwrite set(以下ws) を取得、xの最新versionはx0なので、validationはOK。新しいversionを生成(x1)。その直後に、wsを各secondaryに配布し、各secondaryはそのwsを更新し、versionを生成。
・r4(x) Tx begin。read-onlyで、処理はsecondary2へ飛ばす。ただし、既に最新versionは更新されているので、読む値はx1になる。
・w2(x) primaryに書きだす。
・c2→a2 Tx2 commit。書き込まれたwrite setを取得、xの最新versionはx1なので、すでに更新済み。Tx開始時点のversionはx0なので、validationはNG。よってabort
・r3(x) secondary1でread。ただし、Tx3のbeginがc1よりも前なので、取得するxはx0
・c3 Tx3 commit。
・r4(y) secondary2でread。y0を取得。
・c4 Tx4 commit。

2) Update-anywhere & Central
 Update-anywhereではprimaryが存在せず、すべてreplicaになります。従って、writeは一義的に、各replicaに書き込まれ、他のreplicaに伝播します。ただしvalidationの制御はcentral middlewareで行います。

Txのvalidation:
 Txの成功条件は、「そのTxが開始した時点でのデータセットについての、書き込みについてコミット競合が発生していない。」です。
 繰り返しになりますが、書き込みが競合するTx開始時点のtimestamp(tx_begin)とcommit時点の最新のversionのtimestamp(current_version)を比較して、timestamp(tx_begin)<timestamp(current_version)の場合はabortし、それ以外はcommitする、ということになります。んで、さらに「重要なのは順序であり、時間そのものではない」ということです。
 仕組みとしては、それぞれのコミットのカウンターを利用します。コミットが成功するタイミングで、カウンターをインクリメントして、timestampの比較の代わりに利用します。

central middlewareが保持するもの
・成功したTxのカウンター (lastValidated)
・各replica R(j)でコミットされた更新Txの数 (lastCommitted[j])
・直前に成功したTxのSet (validatedSet)(どこかでGCするという前提)

Txが開始する時:
各TxにstartTsとして、開始時点の更新するreplicaのlastCommitted[j]をセットする

Commitリクエスト時:
・concurrentなTxで、wsが競合し、startTs<commitTsであるものがある場合はabortし、それ以外はcommitする。(判定するTxはvalidatedSetを逆から見ていく)

・成功した場合、lastValidatedのカウンターをインクリメントする。
・ローカルのlastCommitted[j]をインクリメント。
・TxのcommitTsとして、lastValidatedの値をセットする。
・当該TxをvalidatedSetにappendする。
・wsをreplicaの更新queueに積む。

値のPropagation:
・各replicaに積まれたqueueから順序を保って、ローカルの値を更新する。
・lastCommitted[j]++

以下具体的に・・・尚Tx1/Tx3はreplica Aで、Tx2/Tx4はreplica Bでそれぞれ分散処理をします。トランザクション単位でconcurrentに分散処理を行い、かつ全体で一貫性を担保する形になります。分散snapshot isolationの本領発揮になります。

initialization:
・lastValidated=0
・lastCommitted[A]=0
・lastCommitted[B]=0
・validatedSet=φ

・w1(x) Tx begin。startTs:=0 その上で、replica Aに値を書きます。
・r2(y) Tx begin。startTs:=0 readをreplica Bへ飛ばして、y0を取得。
・r3(y) Tx begin。startTs:=0 read-onlyで、処理はreplica Aへ飛ばす。
・c1 Tx1 commit。書き込まれたws1をreplica Aから取得。validatedSet に何もないので、従ってvalidationはOK。よって、commit versionを生成。
lastValidated=1
Tx1.commitTs:= lastValidated
validatedSet.append(Tx1)
それから、該当するws1を各replicaに飛ばします。まずreplica Aではコミット処理。んで、その他の各replicaはそのws1を更新し、versionを生成。加えて、lastCommitted[]++を処理。(ここではlastCommitted[A]=1 lastCommitted[B]=1)
・r4(x) Tx begin。startTs:=1(lastCommitted[B]) read-onlyで、処理はreplica Bへ飛ばす。既にversionは更新されているので、読む値はx1になる。
・w2(x) replica Bに書きだす。
・c2→a2 Tx2 commit。書き込まれたws2をreplica Bから取得。ここでvalidationする。
ws2∪ws1 でかつ、Tx2.startTS=0 < validatedSet.Tx1.commitTs=1なので、abort
・r3(x) replica Aでread。ただし、Tx3のbeginがc1よりも前なので、取得するxはx0
・c3 Tx3 commit。
・r4(y) replica Bでread。y0を取得。
・c4 Tx4 commit。

3) Update-anywhere & De-Central
 2)と同様に、primaryが存在せず、すべてreplicaになります。同じくwriteは各replicaに書き込まれ、他のreplicaに伝播します。これに加えて論理的なvalidation制御のmiddlewareも分散させます。通常は、replicaとペアにして、制御ミドルをばら撒く形になるので、middle+replicaの形でノードをガシガシ足していく形のクラスターになります。文字通りの分散クラスターです。
 middleware間の仕組みは、基本的に2)と同じです。順序保証ができていれば、validationが可能になります。つまり、「順序保証を前提にできれば、同じ種類のノードを足すことでスケールアウト可能な分散クラスター上で、一貫性を保証したsnapshot isolationのトランザクション管理ができる」というアーキテクチャになります。要するに(本当にうまく行けば)無敵です。
 分散クラスター上でのトランザクション処理として、このアーキテクチャが有力視されていることは、なんとなくわかると思います。(ただし、write setを各replicaで飛ばしっぱなしになるので、障害発生時の復旧は、それなりに考えないといけません。ということでここが話題)

基本的な仕組みは2)と同じですが、各replicaがmiddleware機能を持っています。

各middlewareが保持するもの
・自身のreplicaでコミットされた更新Txの数 (lastCommitted)
・直前に成功したTxのSet (validatedSet)(どこかでGCするという前提)

Txが開始する時:
各TxにstartTsとして、開始時点の更新するreplicaのlastCommittedをセットする

Commitリクエスト時:
・wsを取得して、validationのためにマルチキャストする。ローカルな場合は手元で処理します。

・各replicaで、wsが競合し、startTs<commitTsであるものがある場合はabortし、それ以外はcommitする。(判定するTxはvalidatedSetを逆から見ていく)
・成功した場合、lastCommittedをインクリメント。
・replicaを更新
・TxのcommitTsとして、lastCommittedの値をセットする。
・当該TxをvalidatedSetにappendする。

以下具体的に・・・尚Tx1/Tx3はmiddleA-replica Aで、Tx2/Tx4はmiddelB-replica Bでそれぞれ分散処理をします。

initialization:各replicaのセットアップ
・replicaA.lastCommitted=0
・replicaA.validatedSet=φ
・replicaB.lastCommitted=0
・replicaB.validatedSet=φ

・replicaA w1(x) Tx begin。startTs:=0 その上で値を書きます。
・replicaB r2(y) Tx begin。startTs:=0 y0を取得。
・replicaA r3(y) Tx begin。startTs:=0 y0を取得。
・replicaA c1 Tx1 commit。書き込まれたws1をreplica Aから取得。ブロードキャスト。
ローカルでは、validatedSet に何もないので、従ってvalidationはOK。よって、commit version(replicaA.x1)を生成。
replicaA.lastCommitted=1
Tx1.commitTs:= replicaA.lastCommitted
replicaA .validatedSet.append(Tx1)
ブロードキャスト先(replicaB)でも、validatedSet に何もないので、従ってvalidationはOK。よって、commit version(replicaB.x1)を生成。
replicaB.lastCommitted=1
Tx1.commitTs:= replicaB.lastCommitted
replicaB .validatedSet.append(Tx1)

・replicaB r4(x) Tx begin。startTs:=1(replicaB.lastCommitted) readで読む値はx1になる。
・replicaB w2(x) replica Bに書きだす。
・replicaB c2→a2 Tx2 commit。書き込まれたws2をreplica Bから取得。ブロードキャスト。
ローカルでは、ws2∪ws1 でかつ、Tx2.startTS=0 < replicaB.validatedSet.Tx1.commitTs=1なので、abort
ブロードキャスト先(replicaA)でも、ws2∪ws1 でかつ、Tx2.startTS=0 < replicaA.validatedSet.Tx1.commitTs=1なので、abort

・replicaA r3(x) read。ただし、Tx3のbeginがc1よりも前なので、取得するxはx0
・replicaA c3 Tx3 commit。
・replicaB r4(y) replica Bでread。y0を取得。
・replicaB c4 Tx4 commit。

以上になります。

一応、よくわからん人向けに3行で説明します。
・一応分散環境で特に制御の頭とかいない
・readは好き勝手どこでも読んでよい。writeも好き勝手どこに書いても良い。
・writeはどこかで同時に起きているTxで先にコミットされたら負け。
たったこれだけで、Txの一貫性が担保される仕組みです。

あとは補足的な感じで

・正常系は申し分ないが、障害対策はちゃんと考えないとまずい。
特にコミットの耐障害性をあげる必要があります。validationの結果の同期が障害でずれた時にはちょっとまずいことになります。今までの方針では2PCが有力でしたが、これもPaxosを利用する方針が広く検討されつつあります。このあたりのアレヤコレヤがWalterだったり、MDCCだったりします。この手のものは、勿論、どれもナイーブなSIではありません。が、達成すべきクライテリアとしては、明確に1-copy-SIが言われることが多いです。

・SSIに拡張できる。
とはいえ、この基本になるプロトコルはwrite skewは排除できません。なので、SSIを導入することになります。基本的にr-wのanti-dependencyのグラフ構造が抽出できればいいので、問題は、分散環境でこのグラフをどう作るか?ということになります。あとでまた、まとめますが、一応できてます。えっと、当然write-setの持ち回りにanti-dependencyの情報を持たせて、一斉に飛ばす形になるのですが、read-setも一緒に配ると崩壊するので、工夫します。えっと、さらに判定基準をちょっと工夫しないとまずいので、replicated serializable snapshot isolation(RSSI)という形でちょっと拡張してます。・・・ぶっちゃけ、この手の論文は、デフォルトで分散SIとSSIがわかっていないとまじめにダルマさん状態になります。

・書き込みのフルバッチには弱い、とはいえある程度のwriteの負荷には強い。
読む方はほぼ無敵でしょう。分散クエリーの実行基盤としては申し分ないでしょう。ただし一般的にこの手の分散トランザクション処理は、特に書き込みフルバッチには、極端に弱いという宿命を持っているので、まー実務上はわけて考えるべきでしょうね。とはいえ、ベンチマークでもwriteに強いのはそこそこ立証されていますので、普通の負荷の読み書き程度であればほぼ問題ないでしょう。

・結局Transactionは要るのか?
そもそもNoSQLの世界でTXとかいるのか?という議論ですが、今のところ要りますね、という結論のようですね。たとえばよく言われるのは、wallへの書き込みの順序性の確保ですね。順序がばらばらになると会話が成立しません。んで、これは一種のconstraintになるので、TX制御の話になります。(今はなんか手書き実装とかしているという話もあるようですね。)まぁ、某石割り先生のように「No ACID is No Interest」と言い切るのアレすぎるとは思いますが、似たような人は多いかと。

・個人的見解
 いずれにしろ、分散であろうと単ノードであろうcorrectのコンセプトは維持していかないとそもそも議論の土台にものりません。特にconstraintについては追加的にいろいろ議論は発生すると個人的には思っています。SSIでカタがついた、という見解もあるとは思いますが、果たしてそうなのかという気もしています。そもそも、SSIにしても制限的にしているので、serializable全体の集合をカバーしているわけではないですからね。我々の知らない抜け道がまだある、と考える方が妥当でしょう。・・・なんかこう数学的な話にはなりますが、そういうものかと。いずれにしても基本は変わらないでしょう。
 それはさておき、もう普通にハイパフォーマンスな分散Txがserializableに近い形で利用できる状況になりつつあります。基本になっていた全順序問題は、Googleのspannerが地獄の釜を開けてしまった感もあり、たぶん怒濤の開発競争になっていると思います。某東洋の島国の立ち後れ感は、ぱない感じですが、皆さん頑張りましょう。