Skip to content

[Bug]: MalformedInputException in MQTT Source connector in Azure K8s #939

Open
@mukkchir

Description

@mukkchir

Hi,

I have deployed Strimzi kafka clusters in azure k8s. After deploying the MQTT source connector, I get the following error upon
kubectl describe kctr -n kafka -

Error Trace

java.nio.charset.MalformedInputException: Input length = 1
              at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:274)
              at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
              at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188)
              at java.base/java.io.InputStreamReader.read(InputStreamReader.java:177)
              at java.base/java.io.BufferedReader.fill(BufferedReader.java:162)
              at java.base/java.io.BufferedReader.read(BufferedReader.java:183)
              at scala.io.BufferedSource.$anonfun$iter$2(BufferedSource.scala:41)
              at scala.io.Codec.wrap(Codec.scala:74)
              at scala.io.BufferedSource.$anonfun$iter$1(BufferedSource.scala:41)
              at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
              at scala.collection.Iterator$$anon$27.next(Iterator.scala:1135)
              at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:637)
              at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
              at scala.io.Source.hasNext(Source.scala:253)
              at scala.collection.Iterator.isEmpty(Iterator.scala:466)
              at scala.collection.Iterator.isEmpty$(Iterator.scala:466)
              at scala.io.Source.isEmpty(Source.scala:205)
              at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1165)
              at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1164)
              at scala.io.Source.mkString(Source.scala:205)
              at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1179)
              at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1190)
              at scala.io.Source.mkString(Source.scala:205)
              at com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceTask.start(MqttSourceTask.scala:48)
              at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
              at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
              at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
              at java.base/java.lang.Thread.run(Thread.java:833)

MQTT source connector config

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mqtt-hm-source-connector
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
  tasksMax: 1
  config:
    topics: test
    connect.mqtt.clean: false
    connect.mqtt.kcql: INSERT INTO test SELECT * FROM `MY_TOPIC` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
    connect.mqtt.converter.throw.on.error: true
    connect.mqtt.client.id: "CLIENT_ID"
    connect.mqtt.hosts: "ssl://mqtt.broker.com:8883" (THIS IS A THIRD PARTY CONNECTION)
    connect.mqtt.ssl.ca.cert: "/mycerts/ca_file.pem"
    connect.mqtt.ssl.cert: "/mycerts/crt_file.pem.crt"
    connect.mqtt.ssl.key: "/mycerts/key_file.pem.key"
    connect.mqtt.service.quality: 1
    connect.progress.enabled: true
    connect.mqtt.log.message: true
    errors.log.include.messages: true
    errors.log.enable: true
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false 

Strimzi kafka config

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: messaging
spec:
  kafka:
    version: 3.4.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: nodeport
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
      inter.broker.protocol.version: "3.4"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

Steps to reproduce in azure k8s

  1. kubectl create namespace kafka
  2. kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
  3. kubectl create -f ABOVE_KAFKA_CONFIG.yaml -n kafka
  4. kubectl create -f ABOVE_MQTT_SOURCE_CONFIG.yaml -n kafka
  5. kubectl get kctr -n kafka (connector status appears not ready)
  6. kubectl describe kctr -n kafka (here the error trace can be seen)
  7. kubectl delete kctr MQTT_CONNECTOR_NAME -n kafka (to re-attach the connector with changes its better to delete the existing connector and repeat step-6)

Previously used versions

kafka - 3.2.0
Strimzi cluster operator - 0.29.0
stream reactor kafka connector version- 4.0.0

Currently used versions

Kafka - 3.4.0
Strimzi operator- 0.34.0
stream reactor kafka connector version- 4.2.0

In both of these versions, the same error above persisted. I think I'm missing some config but not sure what it is. Any guidance will be greatly helpful.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions