3535import reactor .core .publisher .BaseSubscriber ;
3636import reactor .core .publisher .Flux ;
3737import reactor .core .publisher .Mono ;
38- import reactor .core .publisher .UnicastProcessor ;
38+ import reactor .core .publisher .Sinks ;
3939import reactor .util .concurrent .Queues ;
4040
4141/**
4848public class TaskProcessingWithServerSideNotificationsExample {
4949
5050 public static void main (String [] args ) throws InterruptedException {
51- UnicastProcessor <Task > tasksProcessor =
52- UnicastProcessor . create (Queues .<Task >unboundedMultiproducer ().get ());
51+ Sinks . Many <Task > tasksProcessor =
52+ Sinks . many (). unicast (). onBackpressureBuffer (Queues .<Task >unboundedMultiproducer ().get ());
5353 ConcurrentMap <String , BlockingQueue <Task >> idToCompletedTasksMap = new ConcurrentHashMap <>();
5454 ConcurrentMap <String , RSocket > idToRSocketMap = new ConcurrentHashMap <>();
5555 BackgroundWorker backgroundWorker =
56- new BackgroundWorker (tasksProcessor , idToCompletedTasksMap , idToRSocketMap );
56+ new BackgroundWorker (tasksProcessor . asFlux () , idToCompletedTasksMap , idToRSocketMap );
5757
5858 RSocketServer .create (new TasksAcceptor (tasksProcessor , idToCompletedTasksMap , idToRSocketMap ))
5959 .bindNow (TcpServerTransport .create (9991 ));
@@ -132,12 +132,12 @@ static class TasksAcceptor implements SocketAcceptor {
132132
133133 static final Logger logger = LoggerFactory .getLogger (TasksAcceptor .class );
134134
135- final UnicastProcessor <Task > tasksToProcess ;
135+ final Sinks . Many <Task > tasksToProcess ;
136136 final ConcurrentMap <String , BlockingQueue <Task >> idToCompletedTasksMap ;
137137 final ConcurrentMap <String , RSocket > idToRSocketMap ;
138138
139139 TasksAcceptor (
140- UnicastProcessor <Task > tasksToProcess ,
140+ Sinks . Many <Task > tasksToProcess ,
141141 ConcurrentMap <String , BlockingQueue <Task >> idToCompletedTasksMap ,
142142 ConcurrentMap <String , RSocket > idToRSocketMap ) {
143143 this .tasksToProcess = tasksToProcess ;
@@ -197,11 +197,11 @@ private static class RSocketTaskHandler implements RSocket {
197197 private final String id ;
198198 private final RSocket sendingSocket ;
199199 private ConcurrentMap <String , RSocket > idToRSocketMap ;
200- private UnicastProcessor <Task > tasksToProcess ;
200+ private Sinks . Many <Task > tasksToProcess ;
201201
202202 public RSocketTaskHandler (
203203 ConcurrentMap <String , RSocket > idToRSocketMap ,
204- UnicastProcessor <Task > tasksToProcess ,
204+ Sinks . Many <Task > tasksToProcess ,
205205 String id ,
206206 RSocket sendingSocket ) {
207207 this .id = id ;
@@ -213,9 +213,11 @@ public RSocketTaskHandler(
213213 @ Override
214214 public Mono <Void > fireAndForget (Payload payload ) {
215215 logger .info ("Received a Task[{}] from Client.ID[{}]" , payload .getDataUtf8 (), id );
216- tasksToProcess .onNext (new Task (id , payload .getDataUtf8 ()));
216+ Sinks . Emission emission = tasksToProcess .tryEmitNext (new Task (id , payload .getDataUtf8 ()));
217217 payload .release ();
218- return Mono .empty ();
218+ return emission .hasFailed ()
219+ ? Mono .error (new Sinks .EmissionException (emission ))
220+ : Mono .empty ();
219221 }
220222
221223 @ Override
0 commit comments