Skip to content

Commit 2ef3514

Browse files
tomekl007pivovarit
authored andcommitted
Bael 1421 (eugenp#3222)
* BAEL-1421 live test of kafka streams * BAEL-1421 Removed not-needed dependency * BAEL-1421 rearannge * BAEL-1421 rearannge * fix pom
1 parent 85f12cd commit 2ef3514

2 files changed

Lines changed: 100 additions & 10 deletions

File tree

libraries/pom.xml

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -612,16 +612,16 @@
612612
<artifactId>caffeine</artifactId>
613613
<version>${caffeine.version}</version>
614614
</dependency>
615-
<dependency>
616-
<groupId>org.bouncycastle</groupId>
617-
<artifactId>bcprov-jdk15on</artifactId>
618-
<version>1.58</version>
619-
</dependency>
620-
<dependency>
621-
<groupId>org.bouncycastle</groupId>
622-
<artifactId>bcpkix-jdk15on</artifactId>
623-
<version>1.58</version>
624-
</dependency>
615+
<dependency>
616+
<groupId>org.bouncycastle</groupId>
617+
<artifactId>bcprov-jdk15on</artifactId>
618+
<version>1.58</version>
619+
</dependency>
620+
<dependency>
621+
<groupId>org.bouncycastle</groupId>
622+
<artifactId>bcpkix-jdk15on</artifactId>
623+
<version>1.58</version>
624+
</dependency>
625625
<dependency>
626626
<groupId>com.google.http-client</groupId>
627627
<artifactId>google-http-client</artifactId>
@@ -654,6 +654,29 @@
654654
<artifactId>google-api-services-sheets</artifactId>
655655
<version>${google-sheets.version}</version>
656656
</dependency>
657+
<dependency>
658+
<groupId>org.apache.kafka</groupId>
659+
<artifactId>kafka-streams</artifactId>
660+
<version>${kafka.version}</version>
661+
</dependency>
662+
<dependency>
663+
<groupId>org.apache.kafka</groupId>
664+
<artifactId>kafka-clients</artifactId>
665+
<version>${kafka.version}</version>
666+
<exclusions>
667+
<exclusion>
668+
<groupId>org.slf4j</groupId>
669+
<artifactId>slf4j-log4j12</artifactId>
670+
</exclusion>
671+
</exclusions>
672+
</dependency>
673+
<dependency>
674+
<groupId>org.apache.kafka</groupId>
675+
<artifactId>kafka-clients</artifactId>
676+
<version>${kafka.version}</version>
677+
<classifier>test</classifier>
678+
<scope>test</scope>
679+
</dependency>
657680
</dependencies>
658681
<repositories>
659682
<repository>
@@ -669,6 +692,10 @@
669692
<name>bintray</name>
670693
<url>http://dl.bintray.com/cuba-platform/main</url>
671694
</repository>
695+
<repository>
696+
<id>Apache Staging</id>
697+
<url>https://repository.apache.org/content/groups/staging</url>
698+
</repository>
672699
</repositories>
673700
<properties>
674701
<googleclient.version>1.23.0</googleclient.version>
@@ -729,5 +756,6 @@
729756
<caffeine.version>2.5.5</caffeine.version>
730757
<google-api.version>1.23.0</google-api.version>
731758
<google-sheets.version>v4-rev493-1.21.0</google-sheets.version>
759+
<kafka.version>1.0.0</kafka.version>
732760
</properties>
733761
</project>
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.common.serialization.Serde;
5+
import org.apache.kafka.common.serialization.Serdes;
6+
import org.apache.kafka.streams.KafkaStreams;
7+
import org.apache.kafka.streams.StreamsConfig;
8+
import org.apache.kafka.streams.kstream.KStream;
9+
import org.apache.kafka.streams.kstream.KStreamBuilder;
10+
import org.apache.kafka.streams.kstream.KTable;
11+
import org.apache.kafka.test.TestUtils;
12+
import org.junit.Ignore;
13+
import org.junit.Test;
14+
15+
import java.util.Arrays;
16+
import java.util.Properties;
17+
import java.util.regex.Pattern;
18+
19+
public class KafkaStreamsLiveTest {
20+
private String bootstrapServers = "localhost:9092";
21+
22+
@Test
23+
@Ignore("it needs to have kafka broker running on local")
24+
public void shouldTestKafkaStreams() throws InterruptedException {
25+
//given
26+
String inputTopic = "inputTopic";
27+
28+
Properties streamsConfiguration = new Properties();
29+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
30+
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
31+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
32+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
33+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
34+
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
35+
// Use a temporary directory for storing state, which will be automatically removed after the test.
36+
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
37+
38+
//when
39+
KStreamBuilder builder = new KStreamBuilder();
40+
KStream<String, String> textLines = builder.stream(inputTopic);
41+
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
42+
43+
KTable<String, Long> wordCounts = textLines
44+
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
45+
.groupBy((key, word) -> word)
46+
.count();
47+
48+
wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
49+
50+
String outputTopic = "outputTopic";
51+
final Serde<String> stringSerde = Serdes.String();
52+
final Serde<Long> longSerde = Serdes.Long();
53+
wordCounts.to(stringSerde, longSerde, outputTopic);
54+
55+
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
56+
streams.start();
57+
58+
//then
59+
Thread.sleep(30000);
60+
streams.close();
61+
}
62+
}

0 commit comments

Comments
 (0)