Skip to content

Commit 29638da

Browse files
JWood48jakobs
andauthored
migrates from Reactor UnicastProcessor to new Sinks API (rsocket#931)
Co-authored-by: jakobs <[email protected]>
1 parent e2d7fda commit 29638da

File tree

1 file changed

+12
-10
lines changed

1 file changed

+12
-10
lines changed

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/fnf/TaskProcessingWithServerSideNotificationsExample.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import reactor.core.publisher.BaseSubscriber;
3636
import reactor.core.publisher.Flux;
3737
import reactor.core.publisher.Mono;
38-
import reactor.core.publisher.UnicastProcessor;
38+
import reactor.core.publisher.Sinks;
3939
import reactor.util.concurrent.Queues;
4040

4141
/**
@@ -48,12 +48,12 @@
4848
public class TaskProcessingWithServerSideNotificationsExample {
4949

5050
public static void main(String[] args) throws InterruptedException {
51-
UnicastProcessor<Task> tasksProcessor =
52-
UnicastProcessor.create(Queues.<Task>unboundedMultiproducer().get());
51+
Sinks.Many<Task> tasksProcessor =
52+
Sinks.many().unicast().onBackpressureBuffer(Queues.<Task>unboundedMultiproducer().get());
5353
ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap = new ConcurrentHashMap<>();
5454
ConcurrentMap<String, RSocket> idToRSocketMap = new ConcurrentHashMap<>();
5555
BackgroundWorker backgroundWorker =
56-
new BackgroundWorker(tasksProcessor, idToCompletedTasksMap, idToRSocketMap);
56+
new BackgroundWorker(tasksProcessor.asFlux(), idToCompletedTasksMap, idToRSocketMap);
5757

5858
RSocketServer.create(new TasksAcceptor(tasksProcessor, idToCompletedTasksMap, idToRSocketMap))
5959
.bindNow(TcpServerTransport.create(9991));
@@ -132,12 +132,12 @@ static class TasksAcceptor implements SocketAcceptor {
132132

133133
static final Logger logger = LoggerFactory.getLogger(TasksAcceptor.class);
134134

135-
final UnicastProcessor<Task> tasksToProcess;
135+
final Sinks.Many<Task> tasksToProcess;
136136
final ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap;
137137
final ConcurrentMap<String, RSocket> idToRSocketMap;
138138

139139
TasksAcceptor(
140-
UnicastProcessor<Task> tasksToProcess,
140+
Sinks.Many<Task> tasksToProcess,
141141
ConcurrentMap<String, BlockingQueue<Task>> idToCompletedTasksMap,
142142
ConcurrentMap<String, RSocket> idToRSocketMap) {
143143
this.tasksToProcess = tasksToProcess;
@@ -197,11 +197,11 @@ private static class RSocketTaskHandler implements RSocket {
197197
private final String id;
198198
private final RSocket sendingSocket;
199199
private ConcurrentMap<String, RSocket> idToRSocketMap;
200-
private UnicastProcessor<Task> tasksToProcess;
200+
private Sinks.Many<Task> tasksToProcess;
201201

202202
public RSocketTaskHandler(
203203
ConcurrentMap<String, RSocket> idToRSocketMap,
204-
UnicastProcessor<Task> tasksToProcess,
204+
Sinks.Many<Task> tasksToProcess,
205205
String id,
206206
RSocket sendingSocket) {
207207
this.id = id;
@@ -213,9 +213,11 @@ public RSocketTaskHandler(
213213
@Override
214214
public Mono<Void> fireAndForget(Payload payload) {
215215
logger.info("Received a Task[{}] from Client.ID[{}]", payload.getDataUtf8(), id);
216-
tasksToProcess.onNext(new Task(id, payload.getDataUtf8()));
216+
Sinks.Emission emission = tasksToProcess.tryEmitNext(new Task(id, payload.getDataUtf8()));
217217
payload.release();
218-
return Mono.empty();
218+
return emission.hasFailed()
219+
? Mono.error(new Sinks.EmissionException(emission))
220+
: Mono.empty();
219221
}
220222

221223
@Override

0 commit comments

Comments
 (0)