Skip to content

Commit 3fb8099

Browse files
committed
Reactive Batch Processing Example
1 parent 3461549 commit 3fb8099

11 files changed

Lines changed: 491 additions & 0 deletions

File tree

reactive/build.gradle

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
buildscript {
2+
repositories {
3+
mavenCentral()
4+
}
5+
}
6+
7+
apply plugin: 'java'
8+
9+
version = '0.0.1-SNAPSHOT'
10+
sourceCompatibility = 11
11+
12+
repositories {
13+
mavenLocal()
14+
mavenCentral()
15+
}
16+
17+
dependencies {
18+
compile 'io.reactivex.rxjava2:rxjava:2.2.17'
19+
testCompile 'org.junit.jupiter:junit-jupiter-engine:5.0.1'
20+
testCompile 'org.awaitility:awaitility:3.0.0'
21+
}
22+
23+
test {
24+
useJUnitPlatform()
25+
}
53.4 KB
Binary file not shown.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
distributionBase=GRADLE_USER_HOME
2+
distributionPath=wrapper/dists
3+
zipStoreBase=GRADLE_USER_HOME
4+
zipStorePath=wrapper/dists
5+
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip

reactive/gradlew

Lines changed: 172 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

reactive/gradlew.bat

Lines changed: 84 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
public class Logger {
4+
5+
public void log(String string) {
6+
System.out.println(String.format("%s %s: %s", System.currentTimeMillis(), Thread.currentThread().getName(), string));
7+
}
8+
9+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
public class Message {
4+
5+
private final String content;
6+
7+
public Message(String content) {
8+
this.content = content;
9+
}
10+
11+
public String getContent() {
12+
return content;
13+
}
14+
15+
@Override
16+
public String toString() {
17+
return content;
18+
}
19+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import java.util.Collections;
4+
import java.util.List;
5+
6+
public class MessageBatch {
7+
8+
private final List<Message> messages;
9+
10+
public MessageBatch(List<Message> messages) {
11+
this.messages = messages;
12+
}
13+
14+
public List<Message> getMessages() {
15+
return Collections.unmodifiableList(messages);
16+
}
17+
18+
@Override
19+
public String toString() {
20+
return "MessageBatch{" +
21+
"messages=" + messages +
22+
'}';
23+
}
24+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import io.reactivex.Single;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class MessageHandler {
7+
8+
private final AtomicInteger processedMessages = new AtomicInteger();
9+
10+
private Logger logger = new Logger();
11+
12+
enum Result {
13+
SUCCESS,
14+
FAILURE
15+
}
16+
17+
public Single<Result> handleMessage(Message message){
18+
sleep(500);
19+
logger.log(String.format("processed message %s", message));
20+
return Single.just(Result.SUCCESS);
21+
}
22+
23+
private void sleep(long millis) {
24+
try {
25+
Thread.sleep(millis);
26+
} catch (InterruptedException e) {
27+
throw new RuntimeException(e);
28+
}
29+
}
30+
31+
public AtomicInteger getProcessedMessages() {
32+
return processedMessages;
33+
}
34+
}

0 commit comments

Comments
 (0)