-
Notifications
You must be signed in to change notification settings - Fork 381
Open
Labels
Description
when sending tombstones to a topic which is backuped via s3-sink-connector (with envelope, json format), it is backed up as:
{"headers":{},"metadata":{"partition":5,"offset":0,"topic":"test-topic-backup","timestamp":1726213691895},"keyIsArray":true,"value":null,"key":"aWQx"}(valueIsArray is not set)
and source-connector then chokes:
Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRING
at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:61)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67)
i suppose, this is because the attribute is not written if value is null?
What version of the Stream Reactor are you reporting this issue for?
kafka-connect-aws-s3-assembly-7.4.5.jar
Are you running the correct version of Kafka/Confluent for the Stream reactor release?
kafka version 3.8.0
Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?
source/sink = s3
Have you read the docs?
yes
What is the expected behaviour?
probably that the null value is correctly written to kafka
What was observed?
exception as shown above
What is your Connect cluster configuration (connect-avro-distributed.properties)?
using strimzi, so slightly different format:
config:
group.id: connect-test-connect
....
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverterWhat is your connector properties configuration (my-connector.properties)?
class: io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
tasksMax: 1
config:
topics: test-topic-backup
# 10 second flush.interval for testing
connect.s3.kcql: >-
INSERT INTO backup-bucket
SELECT * FROM test-topic-backup
STOREAS `JSON`
PROPERTIES ('store.envelope'=true, 'flush.interval'=10)
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
connect.s3.aws.auth.mode: Credentials
connect.s3.aws.access.key: ...
connect.s3.aws.secret.key: ...
connect.s3.aws.region: eu-central-1 class: io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
tasksMax: 1
config:
connect.s3.kcql: >-
INSERT INTO test-topic-restore
SELECT * FROM backup-bucket
STOREAS `JSON`
PROPERTIES ('store.envelope'=true)
key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
connect.s3.aws.auth.mode: Credentials
connect.s3.aws.access.key: ...
connect.s3.aws.secret.key: ...
connect.s3.aws.region: eu-central-1
connect.s3.source.partition.extractor.type: hierarchical
connect.partition.search.continuous: true
connect.s3.source.partition.search.interval: 1000 # check every second for testsPlease provide full log files (redact and sensitive information)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRING
at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:61)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67)
wilhg