Deploy a highly-available Kafka cluster on GKE


Kafka is an open source, distributed publish-subscribe messaging system for handling high-volume, high-throughput, and real-time streaming data. You can use Kafka to build streaming data pipelines that move data reliably across different systems and applications for processing and analysis.

This tutorial is intended for platform administrators, cloud architects, and operations professionals interested in deploying highly-available Kafka clusters on Google Kubernetes Engine (GKE).

Objectives

In this tutorial, you will learn how to:
  • Use Terraform to create a regional GKE cluster.
  • Deploy a highly-available Kafka cluster.
  • Upgrade Kafka binaries.
  • Backup and restore the Kafka cluster.
  • Simulate GKE node disruption and Kafka broker failover.

Architecture

This section describes the architecture of the solution you'll build in this tutorial.

A Kafka cluster is a group of one or more servers (called brokers) that work together to handle incoming data streams and publish-subscribe messaging for Kafka clients (called consumers).

Each data partition in a Kafka cluster has one leader broker and can have one or more follower brokers. The leader broker handles all reads and writes to the partition. Each follower broker passively replicates the leader broker.

In a typical Kafka setup, you also use an open source service called ZooKeeper to coordinate your Kafka clusters. This service helps in electing a leader among the brokers and triggering failover in case of failures.

In this tutorial, you deploy the Kafka clusters on GKE by configuring the Kafka brokers and Zookeeper service as individual StatefulSets. To provision highly-available Kafka clusters and prepare for disaster recovery, you'll configure your Kafka and Zookeeper StatefulSets to use separate node pools and zones.

The following diagram shows how your Kafka StatefulSet runs on multiple nodes and zones in your GKE cluster.

Diagram shows an example architecture of a Kafka StatefulSet on GKE deployed across multiple zones.
Figure 1: Deploying your Kafka StatefulSet on GKE nodes across three different zones.

The following diagram shows how your Zookeeper StatefulSet runs on multiple nodes and zones in your GKE cluster.

Diagram shows an example architecture of a Zookeeper StatefulSet on GKE deployed across multiple zones.
Figure 2: Deploying your Kafka Zookeeper on GKE nodes across three different zones.

Node provisioning and Pod scheduling

If you are using Autopilot clusters, Autopilot handles provisioning nodes and scheduling the Pods for your workloads. You'll use Pod anti-affinity to ensure that no two Pods of the same StatefulSet are scheduled on the same node and same zone.

If you are using Standard clusters, you'll need to configure the Pod toleration and node affinity. To learn more, see Isolate your workloads in dedicated node pools.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

Set up your project

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, click Create project to begin creating a new Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Google Kubernetes Engine, Backup for GKE, Artifact Registry, Compute Engine, and IAM APIs.

    Enable the APIs

Set up roles

  1. Grant roles to your user account. Run the following command once for each of the following IAM roles: role/storage.objectViewer, role/logging.logWriter, role/artifactregistry.Admin, roles/container.clusterAdmin, role/container.serviceAgent, roles/iam.serviceAccountAdmin, roles/serviceusage.serviceUsageAdmin, roles/iam.serviceAccountAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:[email protected].

    • Replace ROLE with each individual role.

Set up your environment

In this tutorial, you use Cloud Shell to manage resources hosted on Google Cloud. Cloud Shell comes preinstalled with the software you'll need for this tutorial, including Docker, kubectl, the gcloud CLI, Helm, and Terraform.

To set up your environment with Cloud Shell, follow these steps:

  1. Launch a Cloud Shell session from the Google Cloud console, by clicking Cloud Shell activation icon Activate Cloud Shell in the Google Cloud console. This launches a session in the bottom pane of Google Cloud console.

  2. Set environment variables.

    export PROJECT_ID=PROJECT_ID
    export REGION=us-central1
    

    Replace the following values:

  3. Set the default environment variables.

    gcloud config set project PROJECT_ID
    
  4. Clone the code repository.

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  5. Change to the working directory.

    cd kubernetes-engine-samples/streaming/gke-stateful-kafka
    

Create your cluster infrastructure

In this section, you'll run a Terraform script to create two regional GKE clusters. The primary cluster will be deployed in us-central1.

To create the cluster, follow these steps:

Autopilot

In Cloud Shell, run the following commands:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

When prompted, type yes.

Standard

In Cloud Shell, run the following commands:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID 

When prompted, type yes.

The Terraform configuration files create the following resources to deploy your infrastructure:

  • Create a Artifact Registry repository to store the Docker images.
  • Create the VPC network and subnet for the VM's network interface.
  • Create a two GKE clusters.

Terraform creates a private cluster in the two region, and enables Backup for GKE for disaster recovery.

Deploy Kafka on your cluster

In this section, you'll deploy Kafka on GKE using a Helm chart. The operation creates the following resources:

  • The Kafka and Zookeeper StatefulSets.
  • A Kafka exporter deployment. The exporter gathers Kafka metrics for Prometheus consumption.
  • A Pod Disruption Budget (PDB) that limits the number of offline Pods during a voluntary disruption.

To use the Helm chart to deploy Kafka, follow these steps:

  1. Configure Docker access.

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Populate Artifact Registry with the Kafka and Zookeeper images.

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. Configure kubectl command line access to the primary cluster.

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --region=${REGION} \
        --project=${PROJECT_ID}
    
  4. Create a namespace.

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Install Kafka using the Helm chart version 20.0.6.

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    The output is similar to the following:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Verify that your Kafka replicas are running (this might take a few minutes).

    kubectl get all -n kafka
    

    The output is similar to the following:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

Create test data

In this section, you will test the Kafka application and generate messages.

  1. Create a consumer client Pod for interacting with the Kafka application.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. Create a topic named topic1 with three partitions and a replication factor of three.

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Verify that the topic partitions are replicated across all three brokers.

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    The output is similar to the following:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    In the example output, notice that topic1 has three partitions, each with a different leader and set of replicas. This is because Kafka uses partitioning to distribute the data across multiple brokers, allowing for greater scalability and fault tolerance. The replication factor of three ensures that each partition has three replicas, so that data is still available even if one or two brokers fail.

  4. Run the following command to generate message numbers in bulk into topic1.

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. Run the following command to consume topic1 from all partitions.

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    Type CTRL+C to stop the consumer process.

Benchmark Kafka

To accurately model a use case, you can run a simulation of the expected load on the cluster. To test performance, you will use the tools included in the Kafka package, namely the kafka-producer-perf-test.sh and kafka-consumer-perf-test.sh scripts in the bin folder.

  1. Create a topic for benchmarking.

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Create load on the Kafka cluster.

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    The producer will generate 10,000,000 records on topic-benchmark. The output is similar to the following:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    Once all records have been sent, you should see additional metrics in the output, similar to the following:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    To exit the watch, type CTRL + C.

  3. Exit the Pod shell.

    exit
    

Manage upgrades

Version updates for Kafka and Kubernetes are released on a regular schedule. Follow operational best practices to upgrade your software environment regularly.

Plan for Kafka binary upgrades

In this section, you will update the Kafka image using Helm and verify that your topics are still available.

To upgrade from the earlier Kafka version from the Helm chart you used in Deploy Kafka on your cluster, follow these steps:

  1. Populate Artifact Registry with the following image:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. Perform these steps to deploy a Helm chart with the upgraded Kafka and Zookeeper images. For version-specific guidance, refer to the Kafka instructions for version upgrades.

    1. Update the Chart.yaml dependency version:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. Deploy the Helm chart with the new Kafka and Zookeeper images, as shown in the following example:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Watch the Kafka Pods get upgraded:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    To exit the watch, type CTRL + C.

  3. Connect to the Kafka cluster using a client Pod.

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. Verify you can access messages from topic1.

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    The output should show the generated messages from the previous step. Type CTRL+C to exit the process.

  5. Exit the client Pod.

    exit
    

Prepare for disaster recovery

To ensure that your production workloads remain available in the event of a service-interrupting event, you should prepare a disaster recovery (DR) plan. To learn more about DR planning, see the Disaster recovery planning guide.

To backup and restore your workloads on GKE clusters, you can use Backup for GKE.

Example Kafka backup and restore scenario

In this section, you will take a backup of your cluster from gke-kafka-us-central1 and restore the backup into gke-kafka-us-west1. You will perform the backup and restore operation at the application scope, using the ProtectedApplication Custom Resource.

The following diagram illustrates the components of the disaster recovery solution, and how they relate to each other.

Diagram shows an example backup-and-recovery solution for a highly-available Kafka cluster.
Figure 3: Example backup-and-recovery solution for a highly-available Kafka cluster.

To prepare to backup and restore your Kafka cluster, follow these steps:

  1. Set up the environment variables.

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. Verify the cluster is in a RUNNING state.

    gcloud container clusters describe $CLUSTER_NAME --region us-central1 --format='value(status)'
    
  3. Create a Backup Plan.

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. Manually create a Backup. While scheduled backups are typically governed by the cron-schedule in the backup plan, the following example shows how you can initiate a one-time backup operation.

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. Create a Restore Plan.

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. Manually restore from a Backup.

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. Watch the restored application come up in the backup cluster. It may take a few minutes for all the Pods to be running and ready.

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --region us-west1
    kubectl get pod -n kafka --watch
    

    Type CTRL+C to exit the watch when all Pods are up and running.

  8. Validate that previous topics can be fetched by a consumer.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    The output is similar to the following:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    Type CTRL+C to exit the process.

  9. Exit the Pod.

    exit
    

Simulate a Kafka service disruption

In this section, you will simulate a node failure by replacing a Kubernetes node hosting the broker. This section applies to Standard only. Autopilot manages your nodes for you, so node failure cannot be simulated.

  1. Create a client pod to connect to the Kafka application.

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. Create topic topic-failover-test and produce test traffic.

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Determine which broker is the leader for the topic-failover-test topic.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    The output is similar to the following:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    In the output above, Leader: 1 means that the leader for topic-failover-test is broker 1. This corresponds to Pod kafka-1.

  4. Open a new terminal and connect to the same cluster.

    gcloud container clusters get-credentials gke-kafka-us-west1 --region us-west1 --project PROJECT_ID
    
  5. Find which node Pod kafka-1 is running on.

    kubectl get pod -n kafka kafka-1 -o wide
    

    The output is similar to the following:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    In the output above, you see Pod kafka-1 is running on node gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

  6. Drain the node to evict the Pods.

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    Replace NODE with the node pod kafka-1 is running on. In this example, the node is gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

    The output is similar to the following:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. Find which node Pod kafka-1 is running on.

    kubectl get pod -n kafka kafka-1 -o wide
    

    The output should look similar to the following:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    From the output above, you see the application is running on a new node.

  8. In the terminal connected to the kafka-client Pod, determine which broker is the leader for topic-failover-test.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    The output should look similar to the following:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    In the example output, the leader is still 1 . However, it's now running on a new node.

Testing Kafka leader failure

  1. In Cloud Shell, connect to the Kafka client, and use describe to view the elected leader for each partition in topic1.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    The output is similar to the following:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. In the Cloud Shell not connected to the Kafka client, delete the kafka-0 leader broker to force a new leader election. You should delete the index that maps to one of the leaders in the previous output.

    kubectl delete pod -n kafka kafka-0 --force
    

    The output is similar to the following:

    pod "kafka-0" force deleted
    
  3. In the Cloud Shell connected to the Kafka client, and use describe to view the elected leader.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    The output is similar to the following:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    In the output, the new leader for each partition changes, if it was assigned to the leader that was interrupted (kafka-0). This indicates that the original leader was replaced when the Pod was deleted and re-created.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

The easiest way to avoid billing is to delete the project you created for the tutorial.

Delete a Google Cloud project:

gcloud projects delete PROJECT_ID

What's next