Skip to content

Commit 0f12f84

Browse files
Add custom serializer and deserializer including the message dto based in lombok
1 parent cb62fec commit 0f12f84

File tree

5 files changed

+183
-0
lines changed

5 files changed

+183
-0
lines changed

apache-kafka/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,12 @@
161161
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
162162
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
163163
</dependency>
164+
<dependency>
165+
<groupId>org.projectlombok</groupId>
166+
<artifactId>lombok</artifactId>
167+
<version>${lombok.version}</version>
168+
<scope>provided</scope>
169+
</dependency>
164170
</dependencies>
165171

166172
<properties>
@@ -175,6 +181,7 @@
175181
<graphframes.version>0.8.1-spark3.0-s_2.12</graphframes.version>
176182
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
177183
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
184+
<lombok.version>1.18.20</lombok.version>
178185
</properties>
179186

180187
</project>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.baeldung.kafka.dto;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Data;
6+
import lombok.NoArgsConstructor;
7+
8+
@Data
9+
@AllArgsConstructor
10+
@NoArgsConstructor
11+
@Builder
12+
public class MessageDto {
13+
private String message;
14+
private String version;
15+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.baeldung.kafka.serdes;
2+
3+
import com.baeldung.kafka.dto.MessageDto;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import org.apache.kafka.common.errors.SerializationException;
6+
import org.apache.kafka.common.serialization.Deserializer;
7+
8+
import java.util.Map;
9+
10+
public class CustomDeserializer implements Deserializer<MessageDto> {
11+
12+
private ObjectMapper objectMapper = new ObjectMapper();
13+
14+
@Override
15+
public void configure(Map<String, ?> configs, boolean isKey) {
16+
}
17+
18+
@Override
19+
public MessageDto deserialize(String topic, byte[] data) {
20+
try {
21+
if (data == null){
22+
System.out.println("Null received at deserializing");
23+
return null;
24+
}
25+
System.out.println("Deserializing...");
26+
return objectMapper.readValue(new String(data, "UTF-8"), MessageDto.class);
27+
} catch (Exception e) {
28+
throw new SerializationException("Error when deserializing byte[] to MessageDto");
29+
}
30+
}
31+
32+
@Override
33+
public void close() {
34+
}
35+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.baeldung.kafka.serdes;
2+
3+
import com.baeldung.kafka.dto.MessageDto;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import org.apache.kafka.common.errors.SerializationException;
6+
import org.apache.kafka.common.serialization.Serializer;
7+
8+
import java.util.Map;
9+
10+
public class CustomSerializer implements Serializer<MessageDto> {
11+
private final ObjectMapper objectMapper = new ObjectMapper();
12+
13+
@Override
14+
public void configure(Map<String, ?> configs, boolean isKey) {
15+
}
16+
17+
@Override
18+
public byte[] serialize(String topic, MessageDto data) {
19+
try {
20+
if (data == null){
21+
System.out.println("Null received at serializing");
22+
return null;
23+
}
24+
System.out.println("Serializing...");
25+
return objectMapper.writeValueAsBytes(data);
26+
} catch (Exception e) {
27+
throw new SerializationException("Error when serializing MessageDto to byte[]");
28+
}
29+
}
30+
31+
@Override
32+
public void close() {
33+
}
34+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.baeldung.kafka.serdes;
2+
3+
import com.baeldung.kafka.dto.MessageDto;
4+
import org.apache.kafka.clients.consumer.ConsumerConfig;
5+
import org.apache.kafka.clients.consumer.ConsumerRecords;
6+
import org.apache.kafka.clients.consumer.KafkaConsumer;
7+
import org.apache.kafka.clients.producer.KafkaProducer;
8+
import org.apache.kafka.clients.producer.ProducerConfig;
9+
import org.apache.kafka.clients.producer.ProducerRecord;
10+
import org.junit.Before;
11+
import org.junit.ClassRule;
12+
import org.junit.Test;
13+
import org.testcontainers.containers.KafkaContainer;
14+
import org.testcontainers.utility.DockerImageName;
15+
16+
import java.time.Duration;
17+
import java.util.Arrays;
18+
import java.util.Properties;
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import static org.junit.Assert.assertEquals;
22+
23+
public class KafkaSerDesLiveTest {
24+
private static final String CONSUMER_APP_ID = "consumer_id";
25+
private static final String CONSUMER_GROUP_ID = "group_id";
26+
27+
@ClassRule
28+
public static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
29+
private final String TOPIC = "mytopic";
30+
31+
private static KafkaConsumer<String, MessageDto> createKafkaConsumer() {
32+
33+
Properties props = new Properties();
34+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
35+
props.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
36+
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
37+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
38+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
39+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomDeserializer");
40+
41+
return new KafkaConsumer<>(props);
42+
43+
}
44+
45+
private static KafkaProducer<String, MessageDto> createKafkaProducer() {
46+
47+
Properties props = new Properties();
48+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
49+
props.put(ProducerConfig.CLIENT_ID_CONFIG, CONSUMER_APP_ID);
50+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
51+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "com.baeldung.kafka.serdes.CustomSerializer");
52+
53+
return new KafkaProducer(props);
54+
55+
}
56+
57+
@Before
58+
public void setUp() {
59+
}
60+
61+
@Test
62+
public void givenKafkaClientShouldSerializeAndDeserialize() throws InterruptedException {
63+
64+
MessageDto msgProd = MessageDto.builder().message("test").version("1.0").build();
65+
66+
KafkaProducer<String, MessageDto> producer = createKafkaProducer();
67+
producer.send(new ProducerRecord<String, MessageDto>(TOPIC, "1", msgProd));
68+
System.out.println("Message sent " + msgProd);
69+
producer.close();
70+
71+
Thread.sleep(2000);
72+
73+
AtomicReference<MessageDto> msgCons = new AtomicReference<>();
74+
75+
KafkaConsumer<String, MessageDto> consumer = createKafkaConsumer();
76+
consumer.subscribe(Arrays.asList(TOPIC));
77+
78+
ConsumerRecords<String, MessageDto> records = consumer.poll(Duration.ofSeconds(1));
79+
records.forEach(record -> {
80+
msgCons.set(record.value());
81+
System.out.println("Message received " + record.value());
82+
});
83+
84+
consumer.close();
85+
86+
assertEquals(msgProd, msgCons.get());
87+
88+
}
89+
90+
}
91+
92+

0 commit comments

Comments
 (0)