Skip to content

Commit a537b08

Browse files
rdegnanrobertroeser
authored andcommitted
Reactor (rsocket#250)
* Initial commit * Remove reactivesocket-publishers * Update reactivesocket-client * Build fixes * Added reactivesocket-transport-netty
1 parent 098332c commit a537b08

File tree

151 files changed

+1572
-6113
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

151 files changed

+1572
-6113
lines changed

reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancer.java

Lines changed: 54 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@
2626
import io.reactivesocket.exceptions.TransportException;
2727
import io.reactivesocket.internal.DisabledEventPublisher;
2828
import io.reactivesocket.internal.EventPublisher;
29-
import io.reactivesocket.reactivestreams.extensions.Px;
30-
import io.reactivesocket.reactivestreams.extensions.internal.EmptySubject;
31-
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
32-
import io.reactivesocket.reactivestreams.extensions.internal.subscribers.Subscribers;
29+
import io.reactivesocket.internal.ValidatingSubscription;
3330
import io.reactivesocket.stat.Ewma;
3431
import io.reactivesocket.stat.FrugalQuantile;
3532
import io.reactivesocket.stat.Median;
@@ -41,6 +38,7 @@
4138
import org.reactivestreams.Subscription;
4239
import org.slf4j.Logger;
4340
import org.slf4j.LoggerFactory;
41+
import reactor.core.publisher.*;
4442

4543
import java.nio.channels.ClosedChannelException;
4644
import java.util.ArrayList;
@@ -93,13 +91,14 @@ public class LoadBalancer implements ReactiveSocket {
9391
private final ActiveList<WeightedSocket> activeSockets;
9492
private final ActiveList<ReactiveSocketClient> activeFactories;
9593
private final FactoriesRefresher factoryRefresher;
94+
private final Mono<ReactiveSocket> selectSocket;
9695

9796
private final Ewma pendings;
9897
private volatile int targetAperture;
9998
private long lastApertureRefresh;
10099
private long refreshPeriod;
101100
private volatile long lastRefresh;
102-
private final EmptySubject closeSubject = new EmptySubject();
101+
private final MonoProcessor<Void> closeSubject = MonoProcessor.create();
103102

104103
private final LoadBalancingClientListener eventListener;
105104
private final EventPublisher<ClientEventListener> eventPublisher;
@@ -152,6 +151,7 @@ public LoadBalancer(
152151
this.activeFactories = new ActiveList<>(eventListener, true);
153152
this.pendingSockets = 0;
154153
this.factoryRefresher = new FactoriesRefresher();
154+
this.selectSocket = Mono.fromCallable(this::select);
155155

156156
this.minPendings = minPendings;
157157
this.maxPendings = maxPendings;
@@ -193,28 +193,28 @@ public LoadBalancer(Publisher<? extends Collection<ReactiveSocketClient>> factor
193193
}
194194

195195
@Override
196-
public Publisher<Void> fireAndForget(Payload payload) {
197-
return subscriber -> select().fireAndForget(payload).subscribe(subscriber);
196+
public Mono<Void> fireAndForget(Payload payload) {
197+
return selectSocket.then(socket -> socket.fireAndForget(payload));
198198
}
199199

200200
@Override
201-
public Publisher<Payload> requestResponse(Payload payload) {
202-
return subscriber -> select().requestResponse(payload).subscribe(subscriber);
201+
public Mono<Payload> requestResponse(Payload payload) {
202+
return selectSocket.then(socket -> socket.requestResponse(payload));
203203
}
204204

205205
@Override
206-
public Publisher<Payload> requestStream(Payload payload) {
207-
return subscriber -> select().requestStream(payload).subscribe(subscriber);
206+
public Flux<Payload> requestStream(Payload payload) {
207+
return selectSocket.flatMap(socket -> socket.requestStream(payload));
208208
}
209209

210210
@Override
211-
public Publisher<Void> metadataPush(Payload payload) {
212-
return subscriber -> select().metadataPush(payload).subscribe(subscriber);
211+
public Mono<Void> metadataPush(Payload payload) {
212+
return selectSocket.then(socket -> socket.metadataPush(payload));
213213
}
214214

215215
@Override
216-
public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
217-
return subscriber -> select().requestChannel(payloads).subscribe(subscriber);
216+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
217+
return selectSocket.flatMap(socket -> socket.requestChannel(payloads));
218218
}
219219

220220
private synchronized void addSockets(int numberOfNewSocket) {
@@ -393,7 +393,7 @@ private synchronized void removeSocket(WeightedSocket socket, boolean refresh) {
393393
logger.debug("Removing socket: -> " + socket);
394394
activeSockets.remove(socket);
395395
activeFactories.add(socket.getFactory());
396-
socket.close().subscribe(Subscribers.empty());
396+
socket.close().subscribe();
397397
if (refresh) {
398398
refreshSockets();
399399
}
@@ -492,8 +492,8 @@ public synchronized String toString() {
492492
}
493493

494494
@Override
495-
public Publisher<Void> close() {
496-
return subscriber -> {
495+
public Mono<Void> close() {
496+
return MonoSource.wrap(subscriber -> {
497497
subscriber.onSubscribe(ValidatingSubscription.empty(subscriber));
498498

499499
synchronized (this) {
@@ -527,11 +527,11 @@ public void onComplete() {
527527
});
528528
});
529529
}
530-
};
530+
});
531531
}
532532

533533
@Override
534-
public Publisher<Void> onClose() {
534+
public Mono<Void> onClose() {
535535
return closeSubject;
536536
}
537537

@@ -691,31 +691,31 @@ private static class FailingReactiveSocket implements ReactiveSocket {
691691
private static final NoAvailableReactiveSocketException NO_AVAILABLE_RS_EXCEPTION =
692692
new NoAvailableReactiveSocketException();
693693

694-
private static final Publisher<Void> errorVoid = Px.error(NO_AVAILABLE_RS_EXCEPTION);
695-
private static final Publisher<Payload> errorPayload = Px.error(NO_AVAILABLE_RS_EXCEPTION);
694+
private static final Mono<Void> errorVoid = Mono.error(NO_AVAILABLE_RS_EXCEPTION);
695+
private static final Mono<Payload> errorPayload = Mono.error(NO_AVAILABLE_RS_EXCEPTION);
696696

697697
@Override
698-
public Publisher<Void> fireAndForget(Payload payload) {
698+
public Mono<Void> fireAndForget(Payload payload) {
699699
return errorVoid;
700700
}
701701

702702
@Override
703-
public Publisher<Payload> requestResponse(Payload payload) {
703+
public Mono<Payload> requestResponse(Payload payload) {
704704
return errorPayload;
705705
}
706706

707707
@Override
708-
public Publisher<Payload> requestStream(Payload payload) {
709-
return errorPayload;
708+
public Flux<Payload> requestStream(Payload payload) {
709+
return errorPayload.flux();
710710
}
711711

712712
@Override
713-
public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
714-
return errorPayload;
713+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
714+
return errorPayload.flux();
715715
}
716716

717717
@Override
718-
public Publisher<Void> metadataPush(Payload payload) {
718+
public Mono<Void> metadataPush(Payload payload) {
719719
return errorVoid;
720720
}
721721

@@ -725,13 +725,13 @@ public double availability() {
725725
}
726726

727727
@Override
728-
public Publisher<Void> close() {
729-
return Px.empty();
728+
public Mono<Void> close() {
729+
return Mono.empty();
730730
}
731731

732732
@Override
733-
public Publisher<Void> onClose() {
734-
return Px.empty();
733+
public Mono<Void> onClose() {
734+
return Mono.empty();
735735
}
736736
}
737737

@@ -743,7 +743,6 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
743743

744744
private static final double STARTUP_PENALTY = Long.MAX_VALUE >> 12;
745745

746-
private final ReactiveSocket child;
747746
private ReactiveSocketClient factory;
748747
private final Quantile lowerQuantile;
749748
private final Quantile higherQuantile;
@@ -767,7 +766,6 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
767766
int inactivityFactor
768767
) {
769768
super(child);
770-
this.child = child;
771769
this.factory = factory;
772770
this.lowerQuantile = lowerQuantile;
773771
this.higherQuantile = higherQuantile;
@@ -780,7 +778,9 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
780778
this.median = new Median();
781779
this.interArrivalTime = new Ewma(1, TimeUnit.MINUTES, DEFAULT_INITIAL_INTER_ARRIVAL_TIME);
782780
this.pendingStreams = new AtomicLong();
783-
child.onClose().subscribe(Subscribers.doOnTerminate(() -> removeSocket(this, true)));
781+
child.onClose()
782+
.doFinally(signalType -> removeSocket(this, true))
783+
.subscribe();
784784
}
785785

786786
WeightedSocket(
@@ -793,33 +793,33 @@ private class WeightedSocket extends ReactiveSocketProxy implements LoadBalancer
793793
}
794794

795795
@Override
796-
public Publisher<Payload> requestResponse(Payload payload) {
797-
return subscriber ->
798-
child.requestResponse(payload).subscribe(new LatencySubscriber<>(subscriber, this));
796+
public Mono<Payload> requestResponse(Payload payload) {
797+
return MonoSource.wrap(subscriber ->
798+
source.requestResponse(payload).subscribe(new LatencySubscriber<>(subscriber, this)));
799799
}
800800

801801
@Override
802-
public Publisher<Payload> requestStream(Payload payload) {
803-
return subscriber ->
804-
child.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber, this));
802+
public Flux<Payload> requestStream(Payload payload) {
803+
return FluxSource.wrap(subscriber ->
804+
source.requestStream(payload).subscribe(new CountingSubscriber<>(subscriber, this)));
805805
}
806806

807807
@Override
808-
public Publisher<Void> fireAndForget(Payload payload) {
809-
return subscriber ->
810-
child.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber, this));
808+
public Mono<Void> fireAndForget(Payload payload) {
809+
return MonoSource.wrap(subscriber ->
810+
source.fireAndForget(payload).subscribe(new CountingSubscriber<>(subscriber, this)));
811811
}
812812

813813
@Override
814-
public Publisher<Void> metadataPush(Payload payload) {
815-
return subscriber ->
816-
child.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber, this));
814+
public Mono<Void> metadataPush(Payload payload) {
815+
return MonoSource.wrap(subscriber ->
816+
source.metadataPush(payload).subscribe(new CountingSubscriber<>(subscriber, this)));
817817
}
818818

819819
@Override
820-
public Publisher<Payload> requestChannel(Publisher<Payload> payloads) {
821-
return subscriber ->
822-
child.requestChannel(payloads).subscribe(new CountingSubscriber<>(subscriber, this));
820+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
821+
return FluxSource.wrap(subscriber ->
822+
source.requestChannel(payloads).subscribe(new CountingSubscriber<>(subscriber, this)));
823823
}
824824

825825
ReactiveSocketClient getFactory() {
@@ -893,8 +893,8 @@ private synchronized void observe(double rtt) {
893893
}
894894

895895
@Override
896-
public Publisher<Void> close() {
897-
return child.close();
896+
public Mono<Void> close() {
897+
return source.close();
898898
}
899899

900900
@Override
@@ -907,7 +907,7 @@ public String toString() {
907907
+ " duration/pending=" + (pending == 0 ? 0 : (double)duration / pending)
908908
+ " pending=" + pending
909909
+ " availability= " + availability()
910-
+ ")->" + child;
910+
+ ")->" + source;
911911
}
912912

913913
@Override

reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancerInitializer.java

Lines changed: 8 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,80 +19,39 @@
1919
import io.reactivesocket.ReactiveSocket;
2020
import io.reactivesocket.events.AbstractEventSource;
2121
import io.reactivesocket.events.ClientEventListener;
22-
import io.reactivesocket.reactivestreams.extensions.Px;
23-
import io.reactivesocket.reactivestreams.extensions.internal.ValidatingSubscription;
2422
import org.reactivestreams.Publisher;
25-
import org.reactivestreams.Subscriber;
23+
import reactor.core.publisher.Mono;
24+
import reactor.core.publisher.MonoProcessor;
2625

27-
import java.util.ArrayList;
2826
import java.util.Collection;
29-
import java.util.List;
30-
import java.util.concurrent.CopyOnWriteArrayList;
3127

3228
/**
3329
* This is a temporary class to provide a {@link LoadBalancingClient#connect()} implementation when {@link LoadBalancer}
3430
* does not support it.
3531
*/
3632
final class LoadBalancerInitializer extends AbstractEventSource<ClientEventListener> implements Runnable {
3733

38-
private volatile LoadBalancer loadBalancer;
39-
private final Publisher<ReactiveSocket> emitSource;
40-
private boolean ready; // Guarded by this.
41-
private boolean created; // Guarded by this.
42-
private final List<Subscriber<? super ReactiveSocket>> earlySubscribers = new CopyOnWriteArrayList<>();
34+
private final LoadBalancer loadBalancer;
35+
private final MonoProcessor<ReactiveSocket> emitSource = MonoProcessor.create();
4336

4437
private LoadBalancerInitializer(Publisher<? extends Collection<ReactiveSocketClient>> factories) {
45-
emitSource = s -> {
46-
final boolean _emit;
47-
final boolean _create;
48-
synchronized (this) {
49-
_create = !created;
50-
_emit = ready;
51-
if (!_emit) {
52-
earlySubscribers.add(s);
53-
}
54-
if (!created) {
55-
created = true;
56-
}
57-
}
58-
if (_create) {
59-
loadBalancer = new LoadBalancer(factories, this, this);
60-
}
61-
if (_emit) {
62-
s.onSubscribe(ValidatingSubscription.empty(s));
63-
s.onNext(loadBalancer);
64-
s.onComplete();
65-
}
66-
};
38+
loadBalancer = new LoadBalancer(factories, this, this);
6739
}
6840

6941
static LoadBalancerInitializer create(Publisher<? extends Collection<ReactiveSocketClient>> factories) {
7042
return new LoadBalancerInitializer(factories);
7143
}
7244

73-
Publisher<ReactiveSocket> connect() {
45+
Mono<ReactiveSocket> connect() {
7446
return emitSource;
7547
}
7648

7749
@Override
7850
public void run() {
79-
List<Subscriber<? super ReactiveSocket>> earlySubs;
80-
synchronized (this) {
81-
if (!ready) {
82-
earlySubs = new ArrayList<>(earlySubscribers);
83-
earlySubscribers.clear();
84-
ready = true;
85-
} else {
86-
return;
87-
}
88-
}
89-
Px<LoadBalancer> source = Px.just(loadBalancer);
90-
for (Subscriber<? super ReactiveSocket> earlySub : earlySubs) {
91-
source.subscribe(earlySub);
92-
}
51+
emitSource.onNext(loadBalancer);
9352
}
9453

9554
synchronized double availability() {
96-
return ready? 1.0 : 0.0;
55+
return emitSource.isTerminated() ? 1.0 : 0.0;
9756
}
9857
}

reactivesocket-client/src/main/java/io/reactivesocket/client/LoadBalancingClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
package io.reactivesocket.client;
1818

1919
import io.reactivesocket.ReactiveSocket;
20-
import io.reactivesocket.reactivestreams.extensions.Px;
2120
import org.reactivestreams.Publisher;
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
2223

2324
import java.util.ArrayList;
2425
import java.util.Collection;
@@ -41,7 +42,7 @@ public LoadBalancingClient(LoadBalancerInitializer initializer) {
4142
}
4243

4344
@Override
44-
public Publisher<? extends ReactiveSocket> connect() {
45+
public Mono<? extends ReactiveSocket> connect() {
4546
return initializer.connect();
4647
}
4748

@@ -65,7 +66,7 @@ public double availability() {
6566
public static <T> LoadBalancingClient create(Publisher<? extends Collection<T>> servers,
6667
Function<T, ReactiveSocketClient> clientFactory) {
6768
SourceToClient<T> f = new SourceToClient<T>(clientFactory);
68-
return new LoadBalancingClient(LoadBalancerInitializer.create(Px.from(servers).map(f)));
69+
return new LoadBalancingClient(LoadBalancerInitializer.create(Flux.from(servers).map(f)));
6970
}
7071

7172
/**

0 commit comments

Comments
 (0)