Mutiny で Redis を使う - 非同期アクションを構成

Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。

この記事は、Quarkus.io のブログ記事、Using Redis with Mutiny - Composing asynchronous actions の翻訳記事です。

ユーザーから Redis と Mutiny について興味深い質問を受けました。この問題は特に Redis に関するものではなく、他の多くの API にも適用できるものでしたが、私はこの文脈が面白いと感じました。

ユーザーの Enrico は、こんなことをしたいと思っていました:

1. Redis からすべてのキーを取得
2. 各キーに対して -> 関連するオブジェクトを取得
3. このオブジェクトを JsonArray に追加
4. すべてのオブジェクトを含む JsonArray を生成

Enrico は Vert.x Redis クライアントの Mutiny 型を使用しています。

このクライアントは、私たちの問題を解決するために、いくつかのメソッドを提供しています。

  • RedisClient.keys(pattern) メソッドは Uni<JsonArray> を返します。この配列には、keys メソッドに渡されたパターンに一致するキーのリストが含まれます。この記事を簡単にするために、すべてのキーを返す:keys("*") を使用してみましょう。
  • RedisClient.hgetall(key) メソッドは Uni<JsonObject> を返します。このメソッドは、渡されたキーに関連付けられたオブジェクトを取得します。

どちらのメソッドも非同期(Uniを返す)です。なので、取得したキーごとに 2 番目のメソッドを呼び出す必要があります。言い換えれば、キーのセットを反復処理し、各キーに対して非同期アクションを呼び出す必要があります。最後に、これらの非同期アクションの結果を JsonArray に収集したいと思います。

まずは最初からやってみましょう;Redis クライアントのインスタンスが必要です。

RedisClient redis = RedisClient.create(vertx, new JsonObject()
     .put("port", 6379)
     .put("host", "localhost"));

Quarkus では、Redis 拡張モジュールを直接使用する必要があります。それは同様の API を公開しています。Enrico は Vert.x Redis クライアントを直接使いたいと思っていました。

クライアントを手に入れたので、キーのリストを取得してみましょう。

Uni<JsonArray> keys = redis.keys("*")

これは JsonArray を生成します。ですが、キーのストリームが必要です。繰り返しになりますが、これは非同期メソッドです。返された Uni は利用可能なときに配列を受け取ります。(onItem) を一度受け取ると、この配列からストリームを作成できます。

Multi<String> keys = redis.keys("*")
     .onItem().transformToMulti(array -> Multi.createFrom().iterable(array))
     .onItem().castTo(String.class);

このスニペット:

  1. キーを含む JsonArray を取得します。
  2. これらのキーをストリーミングする Multi を作成します。それは JsonArray が Iterable<Object> を拡張しているので Multi<Object> です。
  3. このMulti アイテムを String に対応付けします。

この時点では、(String)キーのストリームがあります。ということで、ステップ1はこれで終了です。

さて、ステップ2:各キーについて、関連するオブジェクトを取得したいと思います。

そこで、hgetall メソッドを使ってみましょう:

Multi<JsonObject> objects = keys
  .onItem().transformToUniAndMerge(key -> redis.hgetall(key));

このスニペットは少し説明が必要です。

キーのストリームの各アイテムについて、hgetall を呼び出して Uni<JsonObject> を生成します。

そこで、キーを Uni に変換したいと思います(transformToUni)。

アイテムのストリームがあり、各アイテムに対して非同期アクションを呼び出す必要がある場合、結果をどのようにマージするかを選択しなければなりません。Mutiny は2つの戦略を提供します。

  • マージ - Uni で生産されたアイテムを受信したらすぐにそれを下流に送ります。
  • 連結 - 入力ストリームの順序を保持して、下流に送られるアイテムが同じ順序で送られるようにします。

これを図解してみましょう。我々はキー1、2、3を持ち、それをストリーム{1、2、3}に持っていると想像してみてください。また、私たちの Redis データベースでは、キー1 が A、2 が B、3 が C に関連付けられていると考えてみましょう。

マージ戦略を使用した場合、関連するオブジェクトを不確定な順序で取得しています。最終的には{A, C, B}または{B, A, C}で終われます。それは、レイテンシ、スケジューリング、負荷など多くの要因に依存します。しかし、これは、関連するすべてのオブジェクトを並列に取得し、順序を気にせずに結果のストリームを生成できることを意味します。

連結戦略を使用すると、それは入力ストリームからの順序を保持します。なので、必ず{A,B,C}が出てきます。それが望ましいかもしれませんが、それは同時にオブジェクトを取得する能力を低下させる可能性があります。それは、Mutinyが前のオブジェクトのすべての検索を待たなければならないようなものです。例えば、Mutiny が先に C を受信した場合、C を下流に送る前に A と B を待つ必要があります。

この文脈では、順序を保持せずにマージ戦略を使用しましょう。そこで、transformToUniAndMerge を使用します。

コードを何度も実行すると、結果の配列の順序が変わるかもしれません。

ステップ2は完了しました。最後のステップに焦点を当ててみましょう:オブジェクトを JsonArray に蓄積し、すべてのオブジェクトを含む Uni<JsonArray> を生成します。Mutiny はストリームからアイテムをリスト、マップ、セットに収集するメソッドを提供しています。ですが、組み込みの JsonArray のサポートはありません。幸いなことに、Mutiny はあなたがどのような構造のアイテムを収集するためにも使用できるメソッドを提供しています。

Uni<JsonArray> result = objects
   .collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));

collectItems().in は、独自の構造にアイテムを蓄積できます。これは2つのパラメータを取ります。一度だけ呼び出される構造の supplier と、構造と追加するアイテムを受け取る bi-consumer で、各アイテムごとに呼び出されます。

さあ、行きましょう。これで Enrico の疑問を解決できます。

オールインワンのコードは以下の通りです:

Uni<JsonArray> result =
  // ステップ1 - キーを取得
  redis.keys("*")
    .onItem().transformToMulti(keys -> Multi.createFrom().iterable(keys))
    .onItem().castTo(String.class)
  // ステップ2 - 各キーに関連付けられたオブジェクトを取得
    .onItem().transformToUniAndMerge(key -> redis.hgetall(key))
  // ステップ3と4 - 取得したオブジェクトをJsonArrayに蓄積
    .collectItems().in(() -> new JsonArray(), (arr, obj) -> arr.add(obj));

このスニペットでは、いくつかの面白いパターンがあります:

  • コレクションを持っていて、それを Mutiny で反復したい場合は、それを Multi に変換します。
  • ストリームの各アイテムに対して非同期アクションを実行する場合、マージと連結について考えてみてください。自分にとって意味のあるものを使いましょう。
  • アイテムを構造に蓄積するには、collectItems を使用します。それはあなたの選択の構造を生成するための多くのメソッドを提供しています。

このコードが実際に動作しているのを見たい場合は、この gist を確認してください。JBang で直接実行することもできます。

jbang https://gist.github.com/cescoffier/e8c8a18897f9e5ca15f1378876a1bd93

merge を concatenate に置き換えると違いがわかります。

楽しもう!

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。