概要
前回のRedisを使った分散ロックでは、正確なロックを取るためにはZookeeperやetcdを使うと良い、とまとめていました。
なので今回はetcdを用いて分散ロックを実現します。
環境
- etcd v3.4.15
- pkg.go.dev/go.etcd.io/etcd v3.5.0
- go 1.16.0
事前知識
分散ロックに必要なもの
分散ロックマネージャには以下の機能が必要です。
- 自動リース機能
- CAS
- フェンシングトークンを発行する機能
1つ1つ説明していきます。
自動リース機能
ロックを取得したクライアントが何らかの要因で落ちた時、そのロックが解放されないままだと他のクライアントは永遠にロックを取得することができません。
したがって、ロックを取得したとしても、自動的に解放されるようTTLを付ける必要があります。
またこのTTLの管理はクライアント側でなく分散ロックを提供する側の機能である必要があります。
というのもクライアント側で生成したtimestampを使ったりすると、クライアント間で内部時計が揃っていないと
- クライアントAではロックが有効な期間中
- クライアントBでは内部時計が↑より未来になっており、ロックの有効期限を過ぎた
といった状態が発生するからです。
etcdの場合
v2では直接TTLを、v3ではLeaseというオブジェクトをセットすることでTTL管理するようになっています。
# grant a lease with 60 second TTL $ etcdctl lease grant 60 lease 32695410dcc0ca06 granted with TTL(60s) # attach key foo to lease 32695410dcc0ca06 $ etcdctl put --lease=32695410dcc0ca06 foo bar OK
CAS
CAS(Compare And Swap/Compare And Set)は、既存の値が期待する値と一致している時、アトミックに新しい値を書き込む処理です。値が一致しない時はエラーやfalseが返ります。
なので
- ロックの値が無ければ(
==null
)ロック(="1"
)する - ロック中(
=="1"
)であればアンロック(="0"
)する - アンロック中(
=="0"
)であればロック(="1"
)する
といった処理をアトミックに処理できます。
これにより例えば「他のクライアントがロック中なのにロックしようとすると、a, cのcompareを満たさないのでfalseになる」という処理が可能になります。
ここで重要なのはアトミックという点で、単純にプログラムでread-modify-writeしようとすると
- readする
- 条件比較する
- 条件を満たしていたのでデータを修正
- 並行で走っている別の処理が先にwriteしてしまう
- 修正したデータをwrite
といった問題が起きます。いわゆる更新のロスト問題(4での更新が消える)です。
etcdの場合
Lock()関数を使いますが、内部の実装は
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put self in lock waiters via myKey; oldest waiter holds lock put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) // reuse key in case this session already holds the lock get := v3.OpGet(m.myKey) // fetch current holder to complete uncontended path with only one RPC getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return nil, err }
ref: etcd/mutex.go at f4001630d9ee47d54613a34ab5238feb88c89f6d · etcd-io/etcd · GitHub
のようにCASになっています。
フェンシングトークンを発行する機能
先にロックを取得したクライアントがGCなどでポーズし、TTL後に処理を続行してしまうケースの対応です。
ref: How to do distributed locking — Martin Kleppmann’s blog
対策としてこのように単調増加なフェンシングトークンもストレージ側に保存するようにしておきます。 そしてストレージにwriteする際にこの値が以前の値より大きいかどうかの条件文を入れるようにします。
etcdの場合
keyをセットした際にレスポンスで返るrevisionが単調増加なのでそれを使います。
実装
実際の分散ロックのコードは以下のようになります。
func Lock(ctx context.Context, cli *clientv3.Client, key string) (int64, func(context.Context) error, error) { ss, err := concurrency.NewSession(cli, concurrency.WithTTL(lockTTL)) if err != nil { return 0, nil, err } m := concurrency.NewMutex(ss, key) // Orphan ends the refresh for the session lease. ss.Orphan() // TryLock returns immediately if lock is held by another session. err = m.TryLock(ctx) if err != nil { return 0, nil, err } return m.Header().Revision, func(ctx context.Context) error { return m.Unlock(ctx) }, nil }
実装上ポイント
ポイントは以下です。
- SessionにTTLをセットする
- Sessionはロックするたびに生成する
- 同じSessionだと排他的にならない。revisionも同一
- Orphan()でTTL(リース期限)が自動延長されないようにする
- これをしない場合、TTLが過ぎてもkeepaliveが続く間ロックが取得できない
- TryLock()はロック取得できなければ即時エラーが返る
- ロック取得できるまでブロックしたければLock()を使う
- Header().Revisionでフェンシングトークンを取得
使い方
使う際は以下のようにします。
func main() { cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { log.Fatal(err) } defer cli.Close() rev, unlocker, err := Lock(context.Background(), cli, lockResource) if err != nil { log.Fatal(err) } // unlock defer func() { if err := unlocker(context.Background()); err != nil { log.Fatal(err) } log.Println("unlocked") }() log.Println("acquired lock rev:", rev) // フェンシングトークン(rev)を使ったwrite処理 }
動作検証
同じリソースに対して2つの処理がロックをかけようとしてみます。
正常系(ロック取得を待たない)
他のトランザクションによってロックが取得されてた場合即時エラーを返すパターンです。
func Lock(ctx context.Context, cli *clientv3.Client, key string) (int64, func(context.Context) error, error) { ss, err := concurrency.NewSession(cli, concurrency.WithTTL(lockTTL)) if err != nil { return 0, nil, err } m := concurrency.NewMutex(ss, key) // Orphan ends the refresh for the session lease. ss.Orphan() // TryLock returns immediately if lock is held by another session. err = m.TryLock(ctx) // here if err != nil { return 0, nil, err } return m.Header().Revision, func(ctx context.Context) error { return m.Unlock(ctx) }, nil }
transaction1
まだロックがかかっていないのでロック取得できます。
$ go run main.go 2021/03/06 09:32:59 acquired lock rev: 5 2021/03/06 09:33:04 unlocked
transaction2
transaction1でロックが取得されているので、こちらはコケてしまいます。
$ go run main.go 2021/03/06 09:33:02 mutex: Locked by another session exit status 1
正常系(ロック取得を待つ)
先程のコードではTryLock()
でしたが、待ちたい場合はLock()
を使います。
func Lock(ctx context.Context, cli *clientv3.Client, key string) (int64, func(context.Context) error, error) { ss, err := concurrency.NewSession(cli, concurrency.WithTTL(lockTTL)) if err != nil { return 0, nil, err } m := concurrency.NewMutex(ss, key) // Orphan ends the refresh for the session lease. ss.Orphan() // acquire lock for ss err = m.Lock(ctx) // here if err != nil { return 0, nil, err } return m.Header().Revision, func(ctx context.Context) error { return m.Unlock(ctx) }, nil }
transaction1
まだロックがかかっていないのでロック取得できます。
$ go run main.go 2021/03/06 09:33:50 acquired lock rev: 9 2021/03/06 09:33:55 unlocked
transaction2
実行後はしばらく待機していますが、transaction1の処理が終了後こちらもロックが取得できました。
$ go run main.go 2021/03/06 09:33:55 acquired lock rev: 11 2021/03/06 09:34:00 unlocked
またrevisionが常に増加していることが確認できます。
先にロックを取った方がアンロック前に死んだケース
自動リースの検証です。
ロックを取得したクライアントが何らかの要因で落ちたケースを想定します。
transaction1
ロックを取得後プロセスを落としてみます。TTLは10sec
です。
$ go run main.go 2021/03/06 09:37:23 acquired lock rev: 19 ^Csignal: interrupt
09:37:23
にロック取得しています。
transaction2
TTLでロックがリースされるとロック取得できるようになります。
$ go run main.go 2021/03/06 09:37:33 acquired lock rev: 21 2021/03/06 09:37:38 unlocked
TTLである10秒後09:37:33
にロック取得できています。
成果物
今回使用したコードはこちら↓
docker-composeのyamlも入っているので簡単に検証できます。
まとめ
分散ロックマネージャにとって必要な機能の説明と、それを満たしているetcdを用いた実際の分散ロックのコードを紹介しました。
参考
分散システムを考える上で以下の書籍がとても勉強になります。