-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discussion: Trying to understand the motivations for the Serializer API #853
Comments
Discussions around zio-kafka usually take place on Discord: https://discord.com/channels/629491597070827530/629497941719121960 Would you mind placing your question there? Re. 'Serialization should not fail', serialization might depend on external systems. For example, you may have to fetch an id from a database. IMHO letting the kafka library do the serializing/deserializing has always felt as a kludge to me. Therefore, I always use the byte array serializer/deserializer and do the conversion in my own code. This gives me the freedom to handle failures the way I want to. Also, it makes testing easier because you don't have to mess around with the serdes interfaces of someone else. Your points about using Option instead of a nullable value are something to be looked at indeed. Though it might be hard to change the api in a backward compatible way. |
Hey @lukestephenson, I agree with your observations. I'm not a fan of the current design, either. I'd personally love to see what you can come up with. |
@erikvanoosten Thanks for the comments. I'll also kick off the Discord discussion soon.
I completely disagree with this statement. That is not Serialization in my opinion, that is a program. By the time we hand off the ProducerRecord to the Kafka, the only reason it should fail is because of issues with Kafka.
Agree. What I am proposing will not be backwards compatible, nor do I think that should be a design goal. We can keep the existing API if we want, but the design goals of the API I'm proposing will not be backwards compatible. @guizmaii Thanks as well for the feedback. I had a stab at what it could look like for the Producing side. Here is a rough idea of the API (it's not polished, just for demonstrating the idea). What is exposed in zio-kafka could be something like: package zio.kafka.producer
import org.apache.kafka.clients.producer.RecordMetadata
import org.apache.kafka.common.serialization.{LongSerializer, StringSerializer}
import zio._
case class ZKafkaHeader(key: String, value: Array[Byte])
case class ZProducerRecord[K,V](topic: String, key: K, value: V, headers: List[ZKafkaHeader] = List.empty, partition: Option[Integer] = None, timestamp: Option[Long] = None)
object ZProducerRecord {
// Fairly common to publish a message without a key, so a convenience method for that.
def apply[V](topic: String, value: V): ZProducerRecord[Option[Array[Byte]], V] = new ZProducerRecord[Option[Array[Byte]], V](topic, Option.empty[Array[Byte]], value)
}
trait ByteArrayEncoder[A] {
def apply(a: A): Option[Array[Byte]]
}
object Extensions {
implicit class ZProducerRecordExtensions[K,V](zProducerRecord: ZProducerRecord[K,V]) {
def encode(implicit keyEncoder: ByteArrayEncoder[K], valueEncoder: ByteArrayEncoder[V]): ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]] = {
zProducerRecord.copy(key = keyEncoder(zProducerRecord.key), value = valueEncoder(zProducerRecord.value))
}
}
}
object Encoders {
// Provided encoders
implicit val stringEncoder = new ByteArrayEncoder[String] {
private val kafkaSerializer = new StringSerializer()
override def apply(a: String): Option[Array[Byte]] = Some(kafkaSerializer.serialize(null, a))
}
implicit val longEncoder: ByteArrayEncoder[Long] = new ByteArrayEncoder[Long] {
val kafkaSerializer = new LongSerializer()
override def apply(a: Long): Option[Array[Byte]] = Some(kafkaSerializer.serialize(null, a))
}
implicit val byteArrayEncoder: ByteArrayEncoder[Array[Byte]] = (a: Array[Byte]) => Some(a)
implicit def optionEncoder[A](implicit encoder: ByteArrayEncoder[A]): ByteArrayEncoder[Option[A]] = (a: Option[A]) => a.flatMap(encoder.apply)
}
trait ProducerProposal {
def produceAsync(record: ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]]): Task[Task[RecordMetadata]] = ???
// IMO This makes a lot more sense than the current implementation which doesn't provide flexibility for the different
// records in the Chunk to have different serialisation strategies, even though the Chunk could have records for many different topics
def produceAsyncChunk(records: Chunk[ZProducerRecord[Option[Array[Byte]], Option[Array[Byte]]]]): Task[Task[RecordMetadata]] = ???
} And for an end user, a sample application might look like: package zio.kafka.producer
import Encoders._
import Extensions._
case class ExampleModel(value: String)
object EndUserExample {
implicit val exampleModelEncoder: ByteArrayEncoder[ExampleModel] = (a: ExampleModel) => stringEncoder.apply(a.value)
val producer: ProducerProposal = ???
// I don't care about encoding, I've already provided the raw bytes. No call to `encode` required.
producer.produceAsync(ZProducerRecord(topic = "my.topic", key = None, value = Some("hello".getBytes)))
// Message with a value only
producer.produceAsync(ZProducerRecord(topic = "my.topic", value = "hello").encode)
// Message with a key and value
producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = "hello world").encode)
// compacted topic with value provided
val maybeValueIsPresent: Option[ExampleModel] = Some(ExampleModel("hello"))
producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = maybeValueIsPresent).encode)
// compacted topic with tombstone
val maybeValueIsATombstone: Option[ExampleModel] = None
producer.produceAsync(ZProducerRecord(topic = "my.topic", key = 1234L, value = maybeValueIsATombstone).encode)
} Feel free to share feedback. |
Further comments from me will go to Discord. |
Its too bad that you do this.. I was interested in reading the rest of this discussion and its super cumbersome to do so.. first I need to have another account on another platform (ok, I have that) but then I have to search through thousands of messages to find messages that correspond to this topic which is impossible after some time has passed. |
If you ask me (you shouldn't 😉): you probably know by now what I think about how we support serialization/deserialization in zio-kafka (or how it is done in the java Kafka library for that matter): I think it should not even be there because it lives at the wrong abstraction level. Because of that we get discussions like this issue. Hence my recommendation to always work with BTW, we did improve the documentation https://zio.dev/zio-kafka/serialization-and-deserialization a lot recently. |
BTW, @svroonland please stop writing from the future! 😄 |
Did you guys reach any resolution on this issue? |
Probably repeating a lot of stuff that has been said already, but here's my 2c. Agreed that serialization should not fail, especially in the Scala / type-safe world. Our As a side-note, I wonder how efficient the Confluent AVRO serializer is, serializing records one by one and checking the schema and if we could improve on that by having a custom implementation that performs on chunks. Regarding deserializing in the stream, in essence it's a convenience that we provide the Deserializer abstraction and a stream of typed values of some type (actually a The same convenience argument applies to the possibility of all values coming from Kafka being null. At some point you have to specify that your values are of some type Also:
So in summary, I would propose exploring:
|
Below I'm listing a couple of things that don't feel right to me about the Serializer API. I'm not expecting anyone to go and change the API for me, I'm happy to do that. This was more to see if other contributors / users are in agreement with the points before I take the discussion further or submit a PR which isn't welcome and find out there are good reasons for the current API design. Thanks
Suggestion 1:
Serializer
trait reworkCurrently we have:
And note that
RIO
is an alias forZIO[R, Throwable, A]
Serialization should not fail. Look at libraries like Circe and zio-json (https://zio.dev/zio-json/) - encoding to json always works, we have compiler guarantees that all data can be encoded. Obviously the same can't be said for deserialization where the inbound data is outside of our control.
Suggestion 2 - Serializers / Deserializers should be safe by default
I'm doing Scala / FP I like avoiding bugs by taking advantage of a powerful type system. A building block for me in this regards is making use of Option to avoid any nasty NPEs. Give me this first before an effect system. Anyway, the Serializer / Deserializers invoked by zio-kafka can currently pass in null because the base representation of ConsumerRecord / ProducerRecord is the standard Java implementation. It's not safe by default. For example, if I want to handle that a key / value then I have to remember to add
.asOption
to my Deserializer. I feel like the safe behaviour, and highlighting that Kafka doesn't guarantee the key / value are present should be the default, and if I want to acknowledge that I'm happy to discard that type safety (of havingOption[V]
as the default), then I tell the Deserializer it is ok by calling.valueWillAlwaysExist
(or some better method name).Even on my
Serializer
I have to callasOption
. And this is in my nice Scala code where I represent all my models with Option if appropriate, but throw that away at the last minute on the boundary to working with Java (ie when creating the kafka ProducerRecord). Then I get a callback which can potentially fail because the type safety has been given up too early.The text was updated successfully, but these errors were encountered: