Skip to content

Tombstone handling in s3 backup/recovery scenario #1365

@warmuuh

Description

@warmuuh

when sending tombstones to a topic which is backuped via s3-sink-connector (with envelope, json format), it is backed up as:

{"headers":{},"metadata":{"partition":5,"offset":0,"topic":"test-topic-backup","timestamp":1726213691895},"keyIsArray":true,"value":null,"key":"aWQx"}

(valueIsArray is not set)

and source-connector then chokes:

Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRING
    at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:61)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67)

i suppose, this is because the attribute is not written if value is null?

What version of the Stream Reactor are you reporting this issue for?

kafka-connect-aws-s3-assembly-7.4.5.jar

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

kafka version 3.8.0

Do you have a supported version of the data source/sink .i.e Cassandra 3.0.9?

source/sink = s3

Have you read the docs?

yes

What is the expected behaviour?

probably that the null value is correctly written to kafka

What was observed?

exception as shown above

What is your Connect cluster configuration (connect-avro-distributed.properties)?

using strimzi, so slightly different format:

    config:
      group.id: connect-test-connect
      ....
      key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
      value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

What is your connector properties configuration (my-connector.properties)?

  class: io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
  tasksMax: 1
  config:
    topics: test-topic-backup
    # 10 second flush.interval for testing
    connect.s3.kcql: >-
      INSERT INTO backup-bucket
      SELECT * FROM test-topic-backup 
      STOREAS `JSON` 
      PROPERTIES ('store.envelope'=true, 'flush.interval'=10)
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    connect.s3.aws.auth.mode: Credentials
    connect.s3.aws.access.key: ...
    connect.s3.aws.secret.key: ...
    connect.s3.aws.region: eu-central-1
 class: io.lenses.streamreactor.connect.aws.s3.source.S3SourceConnector
  tasksMax: 1
  config:
    connect.s3.kcql: >-
      INSERT INTO test-topic-restore
      SELECT * FROM backup-bucket
      STOREAS `JSON` 
      PROPERTIES ('store.envelope'=true)
    key.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
    connect.s3.aws.auth.mode: Credentials
    connect.s3.aws.access.key: ...
    connect.s3.aws.secret.key: ...
    connect.s3.aws.region: eu-central-1
    connect.s3.source.partition.extractor.type: hierarchical
    connect.partition.search.continuous: true
    connect.s3.source.partition.search.interval: 1000 # check every second for tests

Please provide full log files (redact and sensitive information)

Caused by: org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRING
    at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:61)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67)

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions