2626import io .reactivesocket .exceptions .TransportException ;
2727import io .reactivesocket .internal .DisabledEventPublisher ;
2828import io .reactivesocket .internal .EventPublisher ;
29- import io .reactivesocket .reactivestreams .extensions .Px ;
30- import io .reactivesocket .reactivestreams .extensions .internal .EmptySubject ;
31- import io .reactivesocket .reactivestreams .extensions .internal .ValidatingSubscription ;
32- import io .reactivesocket .reactivestreams .extensions .internal .subscribers .Subscribers ;
29+ import io .reactivesocket .internal .ValidatingSubscription ;
3330import io .reactivesocket .stat .Ewma ;
3431import io .reactivesocket .stat .FrugalQuantile ;
3532import io .reactivesocket .stat .Median ;
4138import org .reactivestreams .Subscription ;
4239import org .slf4j .Logger ;
4340import org .slf4j .LoggerFactory ;
41+ import reactor .core .publisher .*;
4442
4543import java .nio .channels .ClosedChannelException ;
4644import java .util .ArrayList ;
@@ -93,13 +91,14 @@ public class LoadBalancer implements ReactiveSocket {
9391 private final ActiveList <WeightedSocket > activeSockets ;
9492 private final ActiveList <ReactiveSocketClient > activeFactories ;
9593 private final FactoriesRefresher factoryRefresher ;
94+ private final Mono <ReactiveSocket > selectSocket ;
9695
9796 private final Ewma pendings ;
9897 private volatile int targetAperture ;
9998 private long lastApertureRefresh ;
10099 private long refreshPeriod ;
101100 private volatile long lastRefresh ;
102- private final EmptySubject closeSubject = new EmptySubject ();
101+ private final MonoProcessor < Void > closeSubject = MonoProcessor . create ();
103102
104103 private final LoadBalancingClientListener eventListener ;
105104 private final EventPublisher <ClientEventListener > eventPublisher ;
@@ -152,6 +151,7 @@ public LoadBalancer(
152151 this .activeFactories = new ActiveList <>(eventListener , true );
153152 this .pendingSockets = 0 ;
154153 this .factoryRefresher = new FactoriesRefresher ();
154+ this .selectSocket = Mono .fromCallable (this ::select );
155155
156156 this .minPendings = minPendings ;
157157 this .maxPendings = maxPendings ;
@@ -193,28 +193,28 @@ public LoadBalancer(Publisher<? extends Collection<ReactiveSocketClient>> factor
193193 }
194194
195195 @ Override
196- public Publisher <Void > fireAndForget (Payload payload ) {
197- return subscriber -> select () .fireAndForget (payload ). subscribe ( subscriber );
196+ public Mono <Void > fireAndForget (Payload payload ) {
197+ return selectSocket . then ( socket -> socket .fireAndForget (payload ));
198198 }
199199
200200 @ Override
201- public Publisher <Payload > requestResponse (Payload payload ) {
202- return subscriber -> select () .requestResponse (payload ). subscribe ( subscriber );
201+ public Mono <Payload > requestResponse (Payload payload ) {
202+ return selectSocket . then ( socket -> socket .requestResponse (payload ));
203203 }
204204
205205 @ Override
206- public Publisher <Payload > requestStream (Payload payload ) {
207- return subscriber -> select () .requestStream (payload ). subscribe ( subscriber );
206+ public Flux <Payload > requestStream (Payload payload ) {
207+ return selectSocket . flatMap ( socket -> socket .requestStream (payload ));
208208 }
209209
210210 @ Override
211- public Publisher <Void > metadataPush (Payload payload ) {
212- return subscriber -> select () .metadataPush (payload ). subscribe ( subscriber );
211+ public Mono <Void > metadataPush (Payload payload ) {
212+ return selectSocket . then ( socket -> socket .metadataPush (payload ));
213213 }
214214
215215 @ Override
216- public Publisher <Payload > requestChannel (Publisher <Payload > payloads ) {
217- return subscriber -> select () .requestChannel (payloads ). subscribe ( subscriber );
216+ public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
217+ return selectSocket . flatMap ( socket -> socket .requestChannel (payloads ));
218218 }
219219
220220 private synchronized void addSockets (int numberOfNewSocket ) {
@@ -393,7 +393,7 @@ private synchronized void removeSocket(WeightedSocket socket, boolean refresh) {
393393 logger .debug ("Removing socket: -> " + socket );
394394 activeSockets .remove (socket );
395395 activeFactories .add (socket .getFactory ());
396- socket .close ().subscribe (Subscribers . empty () );
396+ socket .close ().subscribe ();
397397 if (refresh ) {
398398 refreshSockets ();
399399 }
@@ -492,8 +492,8 @@ public synchronized String toString() {
492492 }
493493
494494 @ Override
495- public Publisher <Void > close () {
496- return subscriber -> {
495+ public Mono <Void > close () {
496+ return MonoSource . wrap ( subscriber -> {
497497 subscriber .onSubscribe (ValidatingSubscription .empty (subscriber ));
498498
499499 synchronized (this ) {
@@ -527,11 +527,11 @@ public void onComplete() {
527527 });
528528 });
529529 }
530- };
530+ }) ;
531531 }
532532
533533 @ Override
534- public Publisher <Void > onClose () {
534+ public Mono <Void > onClose () {
535535 return closeSubject ;
536536 }
537537
@@ -691,31 +691,31 @@ private static class FailingReactiveSocket implements ReactiveSocket {
691691 private static final NoAvailableReactiveSocketException NO_AVAILABLE_RS_EXCEPTION =
692692 new NoAvailableReactiveSocketException ();
693693
694- private static final Publisher <Void > errorVoid = Px .error (NO_AVAILABLE_RS_EXCEPTION );
695- private static final Publisher <Payload > errorPayload = Px .error (NO_AVAILABLE_RS_EXCEPTION );
694+ private static final Mono <Void > errorVoid = Mono .error (NO_AVAILABLE_RS_EXCEPTION );
695+ private static final Mono <Payload > errorPayload = Mono .error (NO_AVAILABLE_RS_EXCEPTION );
696696
697697 @ Override
698- public Publisher <Void > fireAndForget (Payload payload ) {
698+ public Mono <Void > fireAndForget (Payload payload ) {
699699 return errorVoid ;
700700 }
701701
702702 @ Override
703- public Publisher <Payload > requestResponse (Payload payload ) {
703+ public Mono <Payload > requestResponse (Payload payload ) {
704704 return errorPayload ;
705705 }
706706
707707 @ Override
708- public Publisher <Payload > requestStream (Payload payload ) {
709- return errorPayload ;
708+ public Flux <Payload > requestStream (Payload payload ) {
709+ return errorPayload . flux () ;
710710 }
711711
712712 @ Override
713- public Publisher <Payload > requestChannel (Publisher <Payload > payloads ) {
714- return errorPayload ;
713+ public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
714+ return errorPayload . flux () ;
715715 }
716716
717717 @ Override
718- public Publisher <Void > metadataPush (Payload payload ) {
718+ public Mono <Void > metadataPush (Payload payload ) {
719719 return errorVoid ;
720720 }
721721
@@ -725,13 +725,13 @@ public double availability() {
725725 }
726726
727727 @ Override
728- public Publisher <Void > close () {
729- return Px .empty ();
728+ public Mono <Void > close () {
729+ return Mono .empty ();
730730 }
731731
732732 @ Override
733- public Publisher <Void > onClose () {
734- return Px .empty ();
733+ public Mono <Void > onClose () {
734+ return Mono .empty ();
735735 }
736736 }
737737
@@ -743,7 +743,6 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
743743
744744 private static final double STARTUP_PENALTY = Long .MAX_VALUE >> 12 ;
745745
746- private final ReactiveSocket child ;
747746 private ReactiveSocketClient factory ;
748747 private final Quantile lowerQuantile ;
749748 private final Quantile higherQuantile ;
@@ -767,7 +766,6 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
767766 int inactivityFactor
768767 ) {
769768 super (child );
770- this .child = child ;
771769 this .factory = factory ;
772770 this .lowerQuantile = lowerQuantile ;
773771 this .higherQuantile = higherQuantile ;
@@ -780,7 +778,9 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
780778 this .median = new Median ();
781779 this .interArrivalTime = new Ewma (1 , TimeUnit .MINUTES , DEFAULT_INITIAL_INTER_ARRIVAL_TIME );
782780 this .pendingStreams = new AtomicLong ();
783- child .onClose ().subscribe (Subscribers .doOnTerminate (() -> removeSocket (this , true )));
781+ child .onClose ()
782+ .doFinally (signalType -> removeSocket (this , true ))
783+ .subscribe ();
784784 }
785785
786786 WeightedSocket (
@@ -793,33 +793,33 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
793793 }
794794
795795 @ Override
796- public Publisher <Payload > requestResponse (Payload payload ) {
797- return subscriber ->
798- child .requestResponse (payload ).subscribe (new LatencySubscriber <>(subscriber , this ));
796+ public Mono <Payload > requestResponse (Payload payload ) {
797+ return MonoSource . wrap ( subscriber ->
798+ source .requestResponse (payload ).subscribe (new LatencySubscriber <>(subscriber , this ) ));
799799 }
800800
801801 @ Override
802- public Publisher <Payload > requestStream (Payload payload ) {
803- return subscriber ->
804- child .requestStream (payload ).subscribe (new CountingSubscriber <>(subscriber , this ));
802+ public Flux <Payload > requestStream (Payload payload ) {
803+ return FluxSource . wrap ( subscriber ->
804+ source .requestStream (payload ).subscribe (new CountingSubscriber <>(subscriber , this ) ));
805805 }
806806
807807 @ Override
808- public Publisher <Void > fireAndForget (Payload payload ) {
809- return subscriber ->
810- child .fireAndForget (payload ).subscribe (new CountingSubscriber <>(subscriber , this ));
808+ public Mono <Void > fireAndForget (Payload payload ) {
809+ return MonoSource . wrap ( subscriber ->
810+ source .fireAndForget (payload ).subscribe (new CountingSubscriber <>(subscriber , this ) ));
811811 }
812812
813813 @ Override
814- public Publisher <Void > metadataPush (Payload payload ) {
815- return subscriber ->
816- child .metadataPush (payload ).subscribe (new CountingSubscriber <>(subscriber , this ));
814+ public Mono <Void > metadataPush (Payload payload ) {
815+ return MonoSource . wrap ( subscriber ->
816+ source .metadataPush (payload ).subscribe (new CountingSubscriber <>(subscriber , this ) ));
817817 }
818818
819819 @ Override
820- public Publisher <Payload > requestChannel (Publisher <Payload > payloads ) {
821- return subscriber ->
822- child .requestChannel (payloads ).subscribe (new CountingSubscriber <>(subscriber , this ));
820+ public Flux <Payload > requestChannel (Publisher <Payload > payloads ) {
821+ return FluxSource . wrap ( subscriber ->
822+ source .requestChannel (payloads ).subscribe (new CountingSubscriber <>(subscriber , this ) ));
823823 }
824824
825825 ReactiveSocketClient getFactory () {
@@ -893,8 +893,8 @@ private synchronized void observe(double rtt) {
893893 }
894894
895895 @ Override
896- public Publisher <Void > close () {
897- return child .close ();
896+ public Mono <Void > close () {
897+ return source .close ();
898898 }
899899
900900 @ Override
@@ -907,7 +907,7 @@ public String toString() {
907907 + " duration/pending=" + (pending == 0 ? 0 : (double )duration / pending )
908908 + " pending=" + pending
909909 + " availability= " + availability ()
910- + ")->" + child ;
910+ + ")->" + source ;
911911 }
912912
913913 @ Override
0 commit comments