Skip to content

Commit 5fe5dec

Browse files
committed
added Single.defer to fan out messages over multiple threads
1 parent 3fb8099 commit 5fe5dec

File tree

5 files changed

+120
-95
lines changed

5 files changed

+120
-95
lines changed

aws/aws-hello-world/gradlew

100644100755
File mode changed.

aws/aws-hello-world/gradlew.bat

100644100755
File mode changed.
Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,36 @@
11
package io.reflectoring.reactive.batch;
22

33
import io.reactivex.Single;
4+
45
import java.util.concurrent.atomic.AtomicInteger;
56

67
public class MessageHandler {
78

8-
private final AtomicInteger processedMessages = new AtomicInteger();
9+
private final AtomicInteger processedMessages = new AtomicInteger();
910

10-
private Logger logger = new Logger();
11+
private Logger logger = new Logger();
1112

12-
enum Result {
13-
SUCCESS,
14-
FAILURE
15-
}
13+
enum Result {
14+
SUCCESS,
15+
FAILURE
16+
}
1617

17-
public Single<Result> handleMessage(Message message){
18-
sleep(500);
19-
logger.log(String.format("processed message %s", message));
20-
return Single.just(Result.SUCCESS);
21-
}
18+
public Result handleMessage(Message message) {
19+
logger.log(String.format("handling message %s", message));
20+
sleep(500);
21+
this.processedMessages.getAndAdd(1);
22+
return Result.SUCCESS;
23+
}
2224

23-
private void sleep(long millis) {
24-
try {
25-
Thread.sleep(millis);
26-
} catch (InterruptedException e) {
27-
throw new RuntimeException(e);
25+
private void sleep(long millis) {
26+
try {
27+
Thread.sleep(millis);
28+
} catch (InterruptedException e) {
29+
throw new RuntimeException(e);
30+
}
2831
}
29-
}
3032

31-
public AtomicInteger getProcessedMessages() {
32-
return processedMessages;
33-
}
33+
public Integer getProcessedMessages() {
34+
return processedMessages.get();
35+
}
3436
}

reactive/src/main/java/io/reflectoring/reactive/batch/ReactiveBatchProcessor.java

Lines changed: 80 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -5,90 +5,95 @@
55
import io.reactivex.Single;
66
import io.reactivex.schedulers.Schedulers;
77
import io.reflectoring.reactive.batch.MessageHandler.Result;
8+
import org.reactivestreams.Subscriber;
9+
import org.reactivestreams.Subscription;
10+
811
import java.util.ArrayList;
912
import java.util.List;
1013
import java.util.concurrent.LinkedBlockingDeque;
1114
import java.util.concurrent.ThreadPoolExecutor;
1215
import java.util.concurrent.TimeUnit;
13-
import java.util.concurrent.atomic.AtomicInteger;
14-
import org.reactivestreams.Subscriber;
15-
import org.reactivestreams.Subscription;
1616

1717
public class ReactiveBatchProcessor {
1818

19-
private final static Logger logger = new Logger();
20-
21-
private final int MESSAGE_BATCHES = 10;
22-
23-
private final int BATCH_SIZE = 3;
24-
25-
private final int THREADS = 4;
26-
27-
private final MessageHandler messageHandler = new MessageHandler();
28-
29-
public void start() {
30-
retrieveMessageBatches()
31-
.doOnNext(batch -> logger.log(batch.toString()))
32-
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
33-
.flatMapSingle(message -> messageHandler.handleMessage(message)
34-
.subscribeOn(threadPoolScheduler(THREADS, 10)))
35-
.subscribeWith(subscriber());
36-
}
37-
38-
private Subscriber<MessageHandler.Result> subscriber() {
39-
return new Subscriber<>() {
40-
private Subscription subscription;
41-
42-
@Override
43-
public void onSubscribe(Subscription subscription) {
44-
this.subscription = subscription;
45-
subscription.request(THREADS);
46-
logger.log("subscribed");
47-
}
48-
49-
@Override
50-
public void onNext(Result message) {
51-
subscription.request(THREADS);
52-
}
53-
54-
@Override
55-
public void onError(Throwable t) {
56-
logger.log("error");
57-
}
58-
59-
@Override
60-
public void onComplete() {
61-
logger.log("completed");
62-
}
63-
};
64-
}
65-
66-
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
67-
return Schedulers.from(new ThreadPoolExecutor(
68-
poolSize,
69-
poolSize,
70-
0L,
71-
TimeUnit.SECONDS,
72-
new LinkedBlockingDeque<>(queueSize)
73-
));
74-
}
75-
76-
public boolean allMessagesProcessed() {
77-
return this.messageHandler.getProcessedMessages().get() == MESSAGE_BATCHES * BATCH_SIZE;
78-
}
79-
80-
private Flowable<MessageBatch> retrieveMessageBatches() {
81-
return Flowable.range(1, MESSAGE_BATCHES)
82-
.map(this::messageBatch);
83-
}
84-
85-
private MessageBatch messageBatch(int batchNumber) {
86-
List<Message> messages = new ArrayList<>();
87-
for (int i = 1; i <= BATCH_SIZE; i++) {
88-
messages.add(new Message(String.format("%d-%d", batchNumber, i)));
19+
private final static Logger logger = new Logger();
20+
21+
private final int MESSAGE_BATCHES = 10;
22+
23+
private final int BATCH_SIZE = 3;
24+
25+
private final int THREADS = 4;
26+
27+
private final MessageHandler messageHandler = new MessageHandler();
28+
29+
public void start() {
30+
31+
Scheduler threadPoolScheduler = threadPoolScheduler(THREADS, 10);
32+
33+
retrieveMessageBatches()
34+
.doOnNext(batch -> logger.log(batch.toString()))
35+
.flatMap(batch -> Flowable.fromIterable(batch.getMessages()))
36+
.flatMapSingle(message -> Single.defer(() -> Single.just(messageHandler.handleMessage(message)))
37+
.doOnSuccess(result -> logger.log("message handled"))
38+
.subscribeOn(threadPoolScheduler))
39+
.subscribeWith(subscriber());
40+
}
41+
42+
private Subscriber<MessageHandler.Result> subscriber() {
43+
return new Subscriber<>() {
44+
private Subscription subscription;
45+
46+
@Override
47+
public void onSubscribe(Subscription subscription) {
48+
this.subscription = subscription;
49+
subscription.request(THREADS);
50+
logger.log("subscribed");
51+
}
52+
53+
@Override
54+
public void onNext(Result message) {
55+
subscription.request(THREADS);
56+
}
57+
58+
@Override
59+
public void onError(Throwable t) {
60+
logger.log("error");
61+
}
62+
63+
@Override
64+
public void onComplete() {
65+
logger.log("completed");
66+
}
67+
};
68+
}
69+
70+
private Scheduler threadPoolScheduler(int poolSize, int queueSize) {
71+
return Schedulers.from(new ThreadPoolExecutor(
72+
poolSize,
73+
poolSize,
74+
0L,
75+
TimeUnit.SECONDS,
76+
new LinkedBlockingDeque<>(queueSize),
77+
new RetryRejectedExecutionHandler()
78+
));
79+
}
80+
81+
public boolean allMessagesProcessed() {
82+
return this.messageHandler.getProcessedMessages() == MESSAGE_BATCHES * BATCH_SIZE;
83+
}
84+
85+
private Flowable<MessageBatch> retrieveMessageBatches() {
86+
return Flowable.range(1, MESSAGE_BATCHES)
87+
.map(this::messageBatch);
88+
}
89+
90+
private MessageBatch messageBatch(int batchNumber) {
91+
List<Message> messages = new ArrayList<>();
92+
for (int i = 1; i <= BATCH_SIZE; i++) {
93+
messages.add(new Message(String.format("%d-%d", batchNumber, i)));
94+
}
95+
return new MessageBatch(messages);
8996
}
90-
return new MessageBatch(messages);
91-
}
9297

9398

9499
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.reflectoring.reactive.batch;
2+
3+
import java.util.concurrent.RejectedExecutionException;
4+
import java.util.concurrent.RejectedExecutionHandler;
5+
import java.util.concurrent.ThreadPoolExecutor;
6+
7+
public class RetryRejectedExecutionHandler implements RejectedExecutionHandler {
8+
9+
@Override
10+
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
11+
try {
12+
threadPoolExecutor.getQueue().put(runnable);
13+
} catch (InterruptedException e) {
14+
throw new RejectedExecutionException(e);
15+
}
16+
}
17+
18+
}

0 commit comments

Comments
 (0)