Last active
December 16, 2022 15:29
-
-
Save kdrakon/618f1312f2b96d469492568f8d56e036 to your computer and use it in GitHub Desktop.
Using Avro4s with Confluent Kafka Avro Serializer + Schema Registry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util | |
import com.sksamuel.avro4s.RecordFormat | |
import org.apache.avro.generic.GenericRecord | |
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer} | |
object Avro4s { | |
implicit class CaseClassSerde(inner: Serde[GenericRecord]) { | |
def forCaseClass[T](implicit recordFormat: RecordFormat[T]): Serde[T] = { | |
val caseClassSerializer: Serializer[T] = new Serializer[T] { | |
override def serialize(topic: String, data: T): Array[Byte] = inner.serializer().serialize(topic, recordFormat.to(data)) | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () | |
override def close(): Unit = () | |
} | |
val caseClassDeserializer: Deserializer[T] = new Deserializer[T] { | |
override def deserialize(topic: String, data: Array[Byte]): T = recordFormat.from(inner.deserializer().deserialize(topic, data)) | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () | |
override def close(): Unit = () | |
} | |
new Serde[T] { | |
override def serializer(): Serializer[T] = caseClassSerializer | |
override def deserializer(): Deserializer[T] = caseClassDeserializer | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () | |
override def close(): Unit = () | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Example { | |
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde | |
import scala.collection.JavaConverters._ | |
import Avro4s.CaseClassSerde | |
val confluentGenericSerde = new GenericAvroSerde() | |
val config: java.util.Map[String, _] = Map(("schema.registry.url", "https://localhost:8081")).asJava | |
confluentGenericSerde.configure(config, false) | |
case class Foo(i: Int, s: String) | |
val confluentGenericSerdeWrapped = confluentGenericSerde.forCaseClass[Foo] | |
confluentGenericSerdeWrapped.serializer().serialize("foo", Foo(1, "Baz")) | |
} |
Interesting.
One thing that you could point out is, instead of implicit RecordFormat, form a RecordFormat explicitly by RecordFormat.format(generatedCaseClass.getSchema)
instead of delegating that job to avro4s
which might do things different to confluent. schema
is accessble if you run the task avroScalaGenerateSpecific
in sbt-avro-hugger.
which is the dependency for this lib? io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In Kafka 2.0.0,
Avro4s.scala
can become as simple as this: