1919import io .rsocket .Payload ;
2020import io .rsocket .RSocket ;
2121import io .rsocket .client .filter .RSocketSupplier ;
22- import io .rsocket .util .EmptyPayload ;
23- import java .net .InetSocketAddress ;
24- import java .net .SocketAddress ;
2522import java .util .Arrays ;
23+ import java .util .Collections ;
2624import java .util .List ;
27- import java .util .concurrent .CountDownLatch ;
25+ import java .util .concurrent .CompletableFuture ;
2826import java .util .function .Function ;
2927import org .junit .Assert ;
3028import org .junit .Test ;
3129import org .mockito .Mockito ;
3230import org .reactivestreams .Publisher ;
33- import org .reactivestreams .Subscriber ;
34- import org .reactivestreams .Subscription ;
3531import reactor .core .publisher .Flux ;
3632import reactor .core .publisher .Mono ;
3733
3834public class LoadBalancedRSocketMonoTest {
3935
4036 @ Test (timeout = 10_000L )
4137 public void testNeverSelectFailingFactories () throws InterruptedException {
42- InetSocketAddress local0 = InetSocketAddress .createUnresolved ("localhost" , 7000 );
43- InetSocketAddress local1 = InetSocketAddress .createUnresolved ("localhost" , 7001 );
44-
4538 TestingRSocket socket = new TestingRSocket (Function .identity ());
46- RSocketSupplier failing = failingClient (local0 );
39+ RSocketSupplier failing = failingClient ();
4740 RSocketSupplier succeeding = succeedingFactory (socket );
4841 List <RSocketSupplier > factories = Arrays .asList (failing , succeeding );
4942
@@ -52,9 +45,6 @@ public void testNeverSelectFailingFactories() throws InterruptedException {
5245
5346 @ Test (timeout = 10_000L )
5447 public void testNeverSelectFailingSocket () throws InterruptedException {
55- InetSocketAddress local0 = InetSocketAddress .createUnresolved ("localhost" , 7000 );
56- InetSocketAddress local1 = InetSocketAddress .createUnresolved ("localhost" , 7001 );
57-
5848 TestingRSocket socket = new TestingRSocket (Function .identity ());
5949 TestingRSocket failingSocket =
6050 new TestingRSocket (Function .identity ()) {
@@ -76,6 +66,33 @@ public double availability() {
7666 testBalancer (clients );
7767 }
7868
69+ @ Test (timeout = 10_000L )
70+ public void testRefreshesSocketsOnSelectBeforeReturningFailedAfterNewFactoriesDelivered () {
71+ TestingRSocket socket = new TestingRSocket (Function .identity ());
72+
73+ CompletableFuture <RSocketSupplier > laterSupplier = new CompletableFuture <>();
74+ Flux <List <RSocketSupplier >> factories =
75+ Flux .create (
76+ s -> {
77+ s .next (Collections .emptyList ());
78+
79+ laterSupplier .handle (
80+ (RSocketSupplier result , Throwable t ) -> {
81+ s .next (Collections .singletonList (result ));
82+ return null ;
83+ });
84+ });
85+
86+ LoadBalancedRSocketMono balancer = LoadBalancedRSocketMono .create (factories );
87+
88+ Assert .assertEquals (0.0 , balancer .availability (), 0 );
89+
90+ laterSupplier .complete (succeedingFactory (socket ));
91+ balancer .rSocketMono .block ();
92+
93+ Assert .assertEquals (1.0 , balancer .availability (), 0 );
94+ }
95+
7996 private void testBalancer (List <RSocketSupplier > factories ) throws InterruptedException {
8097 Publisher <List <RSocketSupplier >> src =
8198 s -> {
@@ -92,39 +109,6 @@ private void testBalancer(List<RSocketSupplier> factories) throws InterruptedExc
92109 Flux .range (0 , 100 ).flatMap (i -> balancer ).blockLast ();
93110 }
94111
95- private void makeAcall (RSocket balancer ) throws InterruptedException {
96- CountDownLatch latch = new CountDownLatch (1 );
97-
98- balancer
99- .requestResponse (EmptyPayload .INSTANCE )
100- .subscribe (
101- new Subscriber <Payload >() {
102- @ Override
103- public void onSubscribe (Subscription s ) {
104- s .request (1L );
105- }
106-
107- @ Override
108- public void onNext (Payload payload ) {
109- System .out .println ("Successfully receiving a response" );
110- }
111-
112- @ Override
113- public void onError (Throwable t ) {
114- t .printStackTrace ();
115- Assert .assertTrue (false );
116- latch .countDown ();
117- }
118-
119- @ Override
120- public void onComplete () {
121- latch .countDown ();
122- }
123- });
124-
125- latch .await ();
126- }
127-
128112 private static RSocketSupplier succeedingFactory (RSocket socket ) {
129113 RSocketSupplier mock = Mockito .mock (RSocketSupplier .class );
130114
@@ -135,7 +119,7 @@ private static RSocketSupplier succeedingFactory(RSocket socket) {
135119 return mock ;
136120 }
137121
138- private static RSocketSupplier failingClient (SocketAddress sa ) {
122+ private static RSocketSupplier failingClient () {
139123 RSocketSupplier mock = Mockito .mock (RSocketSupplier .class );
140124
141125 Mockito .when (mock .availability ()).thenReturn (0.0 );
0 commit comments