|
15 | 15 | */ |
16 | 16 | package io.rsocket.client; |
17 | 17 |
|
18 | | -import static io.rsocket.util.ExceptionUtil.noStacktrace; |
19 | | - |
20 | 18 | import io.rsocket.Availability; |
21 | 19 | import io.rsocket.Closeable; |
22 | 20 | import io.rsocket.Payload; |
|
31 | 29 | import io.rsocket.stat.Quantile; |
32 | 30 | import io.rsocket.util.Clock; |
33 | 31 | import io.rsocket.util.RSocketProxy; |
34 | | -import java.nio.channels.ClosedChannelException; |
35 | | -import java.util.ArrayList; |
36 | | -import java.util.Collection; |
37 | | -import java.util.HashSet; |
38 | | -import java.util.Iterator; |
39 | | -import java.util.Random; |
40 | | -import java.util.Set; |
41 | | -import java.util.concurrent.ThreadLocalRandom; |
42 | | -import java.util.concurrent.TimeUnit; |
43 | | -import java.util.concurrent.atomic.AtomicBoolean; |
44 | | -import java.util.concurrent.atomic.AtomicInteger; |
45 | | -import java.util.concurrent.atomic.AtomicLong; |
46 | 32 | import org.reactivestreams.Publisher; |
47 | 33 | import org.reactivestreams.Subscriber; |
48 | 34 | import org.reactivestreams.Subscription; |
49 | 35 | import org.slf4j.Logger; |
50 | 36 | import org.slf4j.LoggerFactory; |
| 37 | +import reactor.core.CoreSubscriber; |
51 | 38 | import reactor.core.publisher.Flux; |
52 | | -import reactor.core.publisher.FluxSource; |
53 | 39 | import reactor.core.publisher.Mono; |
54 | 40 | import reactor.core.publisher.MonoProcessor; |
55 | | -import reactor.core.publisher.MonoSource; |
56 | 41 | import reactor.core.publisher.Operators; |
57 | 42 |
|
| 43 | +import java.nio.channels.ClosedChannelException; |
| 44 | +import java.util.*; |
| 45 | +import java.util.concurrent.ThreadLocalRandom; |
| 46 | +import java.util.concurrent.TimeUnit; |
| 47 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 48 | +import java.util.concurrent.atomic.AtomicInteger; |
| 49 | +import java.util.concurrent.atomic.AtomicLong; |
| 50 | + |
| 51 | +import static io.rsocket.util.ExceptionUtil.noStacktrace; |
| 52 | + |
58 | 53 | /** |
59 | 54 | * An implementation of {@link Mono} that load balances across a pool of RSockets and emits one when |
60 | 55 | * it is subscribed to |
@@ -203,7 +198,7 @@ public static LoadBalancedRSocketMono create( |
203 | 198 | maxAperture, |
204 | 199 | maxRefreshPeriodMs) { |
205 | 200 | @Override |
206 | | - public void subscribe(Subscriber<? super RSocket> s) { |
| 201 | + public void subscribe(CoreSubscriber<? super RSocket> s) { |
207 | 202 | started.thenMany(rSocketMono).subscribe(s); |
208 | 203 | } |
209 | 204 | }; |
@@ -493,7 +488,7 @@ public Mono<Void> onClose() { |
493 | 488 |
|
494 | 489 | @Override |
495 | 490 | public Mono<Void> close() { |
496 | | - return MonoSource.wrap( |
| 491 | + return Mono.from( |
497 | 492 | subscriber -> { |
498 | 493 | subscriber.onSubscribe(Operators.emptySubscription()); |
499 | 494 |
|
@@ -778,35 +773,35 @@ private class WeightedSocket extends RSocketProxy implements LoadBalancerSocketM |
778 | 773 |
|
779 | 774 | @Override |
780 | 775 | public Mono<Payload> requestResponse(Payload payload) { |
781 | | - return MonoSource.wrap( |
| 776 | + return Mono.from( |
782 | 777 | subscriber -> |
783 | 778 | source.requestResponse(payload).subscribe(new LatencySubscriber<>(subscriber, this))); |
784 | 779 | } |
785 | 780 |
|
786 | 781 | @Override |
787 | 782 | public Flux<Payload> requestStream(Payload payload) { |
788 | | - return FluxSource.wrap( |
| 783 | + return Flux.from( |
789 | 784 | subscriber -> |
790 | 785 | source.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber, this))); |
791 | 786 | } |
792 | 787 |
|
793 | 788 | @Override |
794 | 789 | public Mono<Void> fireAndForget(Payload payload) { |
795 | | - return MonoSource.wrap( |
| 790 | + return Mono.from( |
796 | 791 | subscriber -> |
797 | 792 | source.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber, this))); |
798 | 793 | } |
799 | 794 |
|
800 | 795 | @Override |
801 | 796 | public Mono<Void> metadataPush(Payload payload) { |
802 | | - return MonoSource.wrap( |
| 797 | + return Mono.from( |
803 | 798 | subscriber -> |
804 | 799 | source.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber, this))); |
805 | 800 | } |
806 | 801 |
|
807 | 802 | @Override |
808 | 803 | public Flux<Payload> requestChannel(Publisher<Payload> payloads) { |
809 | | - return FluxSource.wrap( |
| 804 | + return Flux.from( |
810 | 805 | subscriber -> |
811 | 806 | source |
812 | 807 | .requestChannel(payloads) |
|
0 commit comments