Apache Kafka と Rest クライアントの組み合わせ

Red Hat で Solution Architect として Quarkus を担当している伊藤ちひろ(@chiroito)です。

この記事は、Quarkus.io のブログ記事、Combining Apache Kafka and the Rest client の翻訳記事です。


今週もまた面白い質問がありました。今週は、Kafka と Rest Client の組み合わせについて誰かに聞かれました。それは繰り返しになりますが、ほとんどの場合、次のようなプロセスを目標にしています。

https://quarkus.io/assets/images/posts/kafka-rest-client/kafka-rest-architecture.png

つまり、受信した Kafka メッセージごとにリモートサービスを呼び出したいのです。そのため、読み取りしているデータを含む最初のトピック(「in」)、例えば「トランザクション」があります。次に、アーキテクチャの中心的な部分があります。それは処理コンポーネント(図のProcessing Compornent)です。これは、着信したトランザクションを読み取り、それらのそれぞれについて、リモートサービスを呼び出します。また、それは(リモートサービスによって生成された)レスポンスを別の Kafka トピック "out" に書き込みます。

Quarkus でこれを実装するのは簡単です。この記事ではそれについて説明します。Reactive Messaging と Rest Client のおかげで、これには 20 行以上のコードを必要としません。

リモートサービス

まずはリモートサービスから始めてみましょう。Quarkus には、リモート HTTP サービスを呼び出すための複数の方法があります。ですが、HTTP の低レベルの詳細を処理することなく HTTP サービスと対話する優れた方法を提供しているので、Rest Client を使用してみましょう。

みなさんは HTTP API を使うこともできます。ですが、シンプルに言うと、簡単なリモートサービスを考えてみましょう。次のようになります。

@RegisterRestClient(configKey = "transaction-service") 
@Produces(MediaType.APPLICATION\_JSON) 
@Consumes(MediaType.APPLICATION\_JSON) 
public interface TransactionService {

    @Path("/transactions")
    @POST
    TransactionResult postSync(Transaction transaction);

    @Path("/transactions")
    @POST
    Uni<TransactionResult> postAsync(Transaction transaction);

} 

このサービスには、同じ HTTP エンドポイントを呼び出す 2 つのメソッドが含まれています。最初のものは同期的なものです。そのため、応答を受け取るまで呼び出し元のスレッドをブロックします。2つ目は非同期です。返された Uni は受信時にレスポンスを取得します。この場合、呼び出し元のスレッドはブロックされません。そのため、他のことができます。これらのメソッドの使い方は後ほど見ていきます。まずは、その前に少し設定を見ましょう。 application.properties に追加します。

# Configure the transaction-service (rest client)
transaction-service/mp-rest/url=http://localhost:8080 

もちろん、URLを更新します。https://quarkus.io/guides/rest-client では、Rest Client の使用方法や設定について詳しく説明しています。

着信トランザクションごとにサービスを呼び出す

OK、私たちはサービスを呼び出すことはできます。ですが、覚えておいて下さい。私たちはそれを着信トランザクションごとに呼び出したいのです。そしてこれらのトランザクションは Kafka のトピックから来ます。Reactive Messaging を使えば、今すぐに Kafka に対処する必要はありません。私たちはロジックに集中できます。私たちはチャネル(データのストリーム)を持っているとしましょう。これはトランザクションを転送するための物です。これを最初のチャンネル in と呼びます。

また、私たちはリモートサービスからのレスポンスを別の Kafka トピックに書き込みたいと思います。再度言いますが、Kafka に対処する必要はありません。レスポンスを out という名前のチャンネルに書き込んだとします。

そこで、以下のような(不完全な)コードがあります:

@ApplicationScoped 
public class TransactionProcessor {

    @Incoming("in") // 最初のチャネル - 私たちはそこから読み込む
    @Outgoing("out") // 2 つ目のチャネル - 私たちはそこへ書込む
    public TransactionResult sendToTransactionService(Transaction transaction) {
       // ここで私たちのサービスを読み出す必要がある
    }

} 

@Incoming は読み込みチャンネルを設定します。@Outgoing は書き込みチャンネルを設定します。しかし、何かが足りないです...。 私たちはリモートサービスを呼び出す必要があります

@ApplicationScoped 
public class TransactionProcessor {

    private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");

    @Inject @RestClient TransactionService service;

    @Incoming("in")
    @Outgoing("out")
    @Blocking
    public TransactionResult sendToTransactionService(Transaction transaction) {
        LOGGER.infof("Sending %s transaction service", transaction);
        return service.postSync(transaction);
    }

} 

まず最初に、Rest Client を注入します。そして、私たちのメソッドの中でそれを呼び出すだけです。

@Blocking について疑問に思うことがあるかもしれません。リアクティブメッセージングでは、ブロッキングコードを使用している場合はその旨を示す必要があります。デフォルトではそれはイベントループアーキテクチャが使用されているためです。便利な反面、@Blocking を乱用すべきではありません。それは、スレッドプールに依存して同時実行性が制限されるためです。しかし、それはあなたのロジックを同期させます。

非同期操作の使用

TransactionService が実行する 2 番目のメソッド postAsync を使用することで、私たちは @Blocking アノテーションを取り除けます。

@ApplicationScoped 
public class TransactionProcessor {

    private static final Logger LOGGER = Logger.getLogger("TransactionProcessor");

    @Inject @RestClient TransactionService service;

    @Incoming("in")
    @Outgoing("out")
    public Uni<TransactionResult> sendToTransactionService(Transaction transaction) {
        LOGGER.infof("Sending %s transaction service", transaction);
        return service.postAsync(transaction);
    }

} 

post メソッドの async バリアントを使用すると、@Blocking を削除できます。 Uni を直接返します。その Uni はリモートサービスの応答を受信すると、その値を out チャネルに書き込みます。

チャネルを Kafka にマッピング

ここまでは順調です。私たちのコードを Kafka に接続する時が来ました。リアクティブメッセージングでは、チャネルをコネクタにマッピングします。ここでは Kafka です。そこで、in チャンネルと out チャンネルが Kafka のトピックであることを示すようにアプリケーションを設定する必要があります。もう一度、application.properties ファイルを編集し、追加します。

mp.messaging.incoming.in.connector=smallrye-kafka mp.messaging.incoming.in.topic=transactions 
mp.messaging.incoming.in.value.deserializer=org.acme.model.TransactionDeserializer 
mp.messaging.incoming.in.auto.offset.reset=earliest mp.messaging.incoming.in.enable.auto.commit=false

mp.messaging.outgoing.out.connector=smallrye-kafka mp.messaging.outgoing.out.topic=output 
mp.messaging.outgoing.out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer 

最初のブロックは in チャンネルについてです。 それは transactions Kafkaのトピックに接続します。データはカスタムデシリアライザでデシリアライズされます。最後の他のプロパティは、オートコミット(Reactive Messaging が代わりにコミットを処理してくれます)を無効にします。そして、最後にコミットされたオフセット以降のデータを読み込みます。

2 番目のブロックでは、out チャネルを設定します。 output Kafkaトピックと接続します。そして、値のシリアライザを設定します。この簡単な例では、データをJSONで書きます。

トランザクションが Kafka transaction トピックに書き込まれると、それは私たちの処理コンポーネントで受信されます。そして、リモートサービスに送信され、その結果が output Kafka トピックに書き込まれます。

2020-08-27 10:04:44,141 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='MacroHard', amount=20} transaction service 
2020-08-27 10:04:44,196 INFO [TransactionResource] (executor-thread-2) Handling transaction MacroHard / 20 
2020-08-27 10:04:44,240 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='BlueHat', amount=10} transaction service 
2020-08-27 10:04:44,245 INFO [TransactionResource] (executor-thread-2) Handling transaction BlueHat / 10 

output トピックの中を見てみると、TransactionResult が流れているのが確認できます。

https://quarkus.io/assets/images/posts/kafka-rest-client/output.png

終わりました!

数行のコードと少しの設定で、Kafka トピックからデータを読み取り、リモートサービスを呼び出し、その結果を別の Kafka トピックに書き込めます。単純な話です。

自分でやってみたいと思いますか?この GitHub リポジトリのコードをチェックアウトして、readme の指示に従ってください。

Reactive Messaging と Rest クライアントには他にも宝石が含まれています。関連するガイドやドキュメントを確認して詳細を確認してください。

* 各記事は著者の見解によるものでありその所属組織を代表する公式なものではありません。その内容については非公式見解を含みます。