Open
Description
Hi,
I have deployed Strimzi kafka clusters in azure k8s. After deploying the MQTT source connector, I get the following error upon
kubectl describe kctr -n kafka
-
Error Trace
java.nio.charset.MalformedInputException: Input length = 1
at java.base/java.nio.charset.CoderResult.throwException(CoderResult.java:274)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:188)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:177)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:162)
at java.base/java.io.BufferedReader.read(BufferedReader.java:183)
at scala.io.BufferedSource.$anonfun$iter$2(BufferedSource.scala:41)
at scala.io.Codec.wrap(Codec.scala:74)
at scala.io.BufferedSource.$anonfun$iter$1(BufferedSource.scala:41)
at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
at scala.collection.Iterator$$anon$27.next(Iterator.scala:1135)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:637)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
at scala.io.Source.hasNext(Source.scala:253)
at scala.collection.Iterator.isEmpty(Iterator.scala:466)
at scala.collection.Iterator.isEmpty$(Iterator.scala:466)
at scala.io.Source.isEmpty(Source.scala:205)
at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1165)
at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1164)
at scala.io.Source.mkString(Source.scala:205)
at scala.collection.IterableOnceOps.mkString(IterableOnce.scala:1179)
at scala.collection.IterableOnceOps.mkString$(IterableOnce.scala:1190)
at scala.io.Source.mkString(Source.scala:205)
at com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceTask.start(MqttSourceTask.scala:48)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
MQTT source connector config
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: mqtt-hm-source-connector
labels:
strimzi.io/cluster: kafka-connect-cluster
spec:
class: com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
tasksMax: 1
config:
topics: test
connect.mqtt.clean: false
connect.mqtt.kcql: INSERT INTO test SELECT * FROM `MY_TOPIC` WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter`
connect.mqtt.converter.throw.on.error: true
connect.mqtt.client.id: "CLIENT_ID"
connect.mqtt.hosts: "ssl://mqtt.broker.com:8883" (THIS IS A THIRD PARTY CONNECTION)
connect.mqtt.ssl.ca.cert: "/mycerts/ca_file.pem"
connect.mqtt.ssl.cert: "/mycerts/crt_file.pem.crt"
connect.mqtt.ssl.key: "/mycerts/key_file.pem.key"
connect.mqtt.service.quality: 1
connect.progress.enabled: true
connect.mqtt.log.message: true
errors.log.include.messages: true
errors.log.enable: true
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
Strimzi kafka config
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: messaging
spec:
kafka:
version: 3.4.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
- name: external
port: 9094
type: nodeport
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.4"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
Steps to reproduce in azure k8s
- kubectl create namespace kafka
- kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
- kubectl create -f ABOVE_KAFKA_CONFIG.yaml -n kafka
- kubectl create -f ABOVE_MQTT_SOURCE_CONFIG.yaml -n kafka
- kubectl get kctr -n kafka (connector status appears not ready)
- kubectl describe kctr -n kafka (here the error trace can be seen)
- kubectl delete kctr MQTT_CONNECTOR_NAME -n kafka (to re-attach the connector with changes its better to delete the existing connector and repeat step-6)
Previously used versions
kafka - 3.2.0
Strimzi cluster operator - 0.29.0
stream reactor kafka connector version- 4.0.0
Currently used versions
Kafka - 3.4.0
Strimzi operator- 0.34.0
stream reactor kafka connector version- 4.2.0
In both of these versions, the same error above persisted. I think I'm missing some config but not sure what it is. Any guidance will be greatly helpful.