Skip to content

Commit bb1035c

Browse files
committed
provides implementation of the new loadbalance API
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 30dae87 commit bb1035c

38 files changed

+2937
-971
lines changed
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.loadbalance;
17+
18+
import io.netty.util.ReferenceCountUtil;
19+
import io.rsocket.Payload;
20+
import io.rsocket.frame.FrameType;
21+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
22+
import java.util.function.BiConsumer;
23+
import org.reactivestreams.Subscription;
24+
import reactor.core.CoreSubscriber;
25+
import reactor.core.Scannable;
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Operators;
28+
import reactor.util.annotation.Nullable;
29+
import reactor.util.context.Context;
30+
31+
abstract class FluxDeferredResolution<INPUT, R> extends Flux<Payload>
32+
implements CoreSubscriber<Payload>, Subscription, BiConsumer<R, Throwable>, Scannable {
33+
34+
final ResolvingOperator<R> parent;
35+
final INPUT fluxOrPayload;
36+
final FrameType requestType;
37+
38+
volatile long requested;
39+
40+
@SuppressWarnings("rawtypes")
41+
static final AtomicLongFieldUpdater<FluxDeferredResolution> REQUESTED =
42+
AtomicLongFieldUpdater.newUpdater(FluxDeferredResolution.class, "requested");
43+
44+
static final long STATE_UNSUBSCRIBED = -1;
45+
static final long STATE_SUBSCRIBER_SET = 0;
46+
static final long STATE_SUBSCRIBED = -2;
47+
static final long STATE_TERMINATED = Long.MIN_VALUE;
48+
49+
Subscription s;
50+
CoreSubscriber<? super Payload> actual;
51+
boolean done;
52+
53+
FluxDeferredResolution(ResolvingOperator<R> parent, INPUT fluxOrPayload, FrameType requestType) {
54+
this.parent = parent;
55+
this.fluxOrPayload = fluxOrPayload;
56+
this.requestType = requestType;
57+
58+
REQUESTED.lazySet(this, STATE_UNSUBSCRIBED);
59+
}
60+
61+
@Override
62+
public final void subscribe(CoreSubscriber<? super Payload> actual) {
63+
if (this.requested == STATE_UNSUBSCRIBED
64+
&& REQUESTED.compareAndSet(this, STATE_UNSUBSCRIBED, STATE_SUBSCRIBER_SET)) {
65+
66+
actual.onSubscribe(this);
67+
68+
if (this.requested == STATE_TERMINATED) {
69+
return;
70+
}
71+
72+
this.actual = actual;
73+
this.parent.observe(this);
74+
} else {
75+
Operators.error(actual, new IllegalStateException("Only a single Subscriber allowed"));
76+
}
77+
}
78+
79+
@Override
80+
public final Context currentContext() {
81+
return this.actual.currentContext();
82+
}
83+
84+
@Nullable
85+
@Override
86+
public Object scanUnsafe(Attr key) {
87+
long state = this.requested;
88+
89+
if (key == Attr.PARENT) {
90+
return this.s;
91+
}
92+
if (key == Attr.ACTUAL) {
93+
return this.parent;
94+
}
95+
if (key == Attr.TERMINATED) {
96+
return this.done;
97+
}
98+
if (key == Attr.CANCELLED) {
99+
return state == STATE_TERMINATED;
100+
}
101+
102+
return null;
103+
}
104+
105+
@Override
106+
public final void onSubscribe(Subscription s) {
107+
final long state = this.requested;
108+
Subscription a = this.s;
109+
if (state == STATE_TERMINATED) {
110+
s.cancel();
111+
return;
112+
}
113+
if (a != null) {
114+
s.cancel();
115+
return;
116+
}
117+
118+
long r;
119+
long accumulated = 0;
120+
for (; ; ) {
121+
r = this.requested;
122+
123+
if (r == STATE_TERMINATED || r == STATE_SUBSCRIBED) {
124+
s.cancel();
125+
return;
126+
}
127+
128+
this.s = s;
129+
130+
long toRequest = r - accumulated;
131+
if (toRequest > 0) { // if there is something,
132+
s.request(toRequest); // then we do a request on the given subscription
133+
}
134+
accumulated = r;
135+
136+
if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) {
137+
return;
138+
}
139+
}
140+
}
141+
142+
@Override
143+
public final void onNext(Payload payload) {
144+
this.actual.onNext(payload);
145+
}
146+
147+
@Override
148+
public void onError(Throwable t) {
149+
if (this.done) {
150+
Operators.onErrorDropped(t, this.actual.currentContext());
151+
return;
152+
}
153+
154+
this.done = true;
155+
this.actual.onError(t);
156+
}
157+
158+
@Override
159+
public void onComplete() {
160+
if (this.done) {
161+
return;
162+
}
163+
164+
this.done = true;
165+
this.actual.onComplete();
166+
}
167+
168+
@Override
169+
public final void request(long n) {
170+
if (Operators.validate(n)) {
171+
long r = this.requested; // volatile read beforehand
172+
173+
if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened
174+
long u;
175+
for (; ; ) { // normal CAS loop with overflow protection
176+
if (r == Long.MAX_VALUE) {
177+
// if r == Long.MAX_VALUE then we dont care and we can loose this
178+
// request just in case of racing
179+
return;
180+
}
181+
u = Operators.addCap(r, n);
182+
if (REQUESTED.compareAndSet(this, r, u)) {
183+
// Means increment happened before onSubscribe
184+
return;
185+
} else {
186+
// Means increment happened after onSubscribe
187+
188+
// update new state to see what exactly happened (onSubscribe |cancel | requestN)
189+
r = this.requested;
190+
191+
// check state (expect -1 | -2 to exit, otherwise repeat)
192+
if (r < 0) {
193+
break;
194+
}
195+
}
196+
}
197+
}
198+
199+
if (r == STATE_TERMINATED) { // if canceled, just exit
200+
return;
201+
}
202+
203+
// if onSubscribe -> subscription exists (and we sure of that because volatile read
204+
// after volatile write) so we can execute requestN on the subscription
205+
this.s.request(n);
206+
}
207+
}
208+
209+
public void cancel() {
210+
long state = REQUESTED.getAndSet(this, STATE_TERMINATED);
211+
if (state == STATE_TERMINATED) {
212+
return;
213+
}
214+
215+
if (state == STATE_SUBSCRIBED) {
216+
this.s.cancel();
217+
} else {
218+
this.parent.remove(this);
219+
if (requestType == FrameType.REQUEST_STREAM) {
220+
ReferenceCountUtil.safeRelease(this.fluxOrPayload);
221+
}
222+
}
223+
}
224+
225+
boolean isTerminated() {
226+
return this.requested == STATE_TERMINATED;
227+
}
228+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.loadbalance;
17+
18+
import io.rsocket.Payload;
19+
import io.rsocket.RSocket;
20+
import io.rsocket.RSocketClient;
21+
import java.util.List;
22+
import org.reactivestreams.Publisher;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.Mono;
25+
26+
public class LoadBalancedRSocketClient implements RSocketClient {
27+
28+
private final RSocketPool rSocketPool;
29+
30+
LoadBalancedRSocketClient(RSocketPool rSocketPool) {
31+
this.rSocketPool = rSocketPool;
32+
}
33+
34+
@Override
35+
public Mono<RSocket> source() {
36+
return Mono.fromSupplier(rSocketPool::select);
37+
}
38+
39+
@Override
40+
public Mono<Void> fireAndForget(Mono<Payload> payloadMono) {
41+
return payloadMono.flatMap(p -> rSocketPool.select().fireAndForget(p));
42+
}
43+
44+
@Override
45+
public Mono<Payload> requestResponse(Mono<Payload> payloadMono) {
46+
return payloadMono.flatMap(p -> rSocketPool.select().requestResponse(p));
47+
}
48+
49+
@Override
50+
public Flux<Payload> requestStream(Mono<Payload> payloadMono) {
51+
return payloadMono.flatMapMany(p -> rSocketPool.select().requestStream(p));
52+
}
53+
54+
@Override
55+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
56+
return rSocketPool.select().requestChannel(payloads);
57+
}
58+
59+
@Override
60+
public Mono<Void> metadataPush(Mono<Payload> payloadMono) {
61+
return payloadMono.flatMap(p -> rSocketPool.select().metadataPush(p));
62+
}
63+
64+
@Override
65+
public void dispose() {
66+
rSocketPool.dispose();
67+
}
68+
69+
public static LoadBalancedRSocketClient create(
70+
LoadbalanceStrategy loadbalanceStrategy,
71+
Publisher<List<LoadbalanceTarget>> rSocketSuppliersPublisher) {
72+
return new LoadBalancedRSocketClient(
73+
new RSocketPool(rSocketSuppliersPublisher, loadbalanceStrategy));
74+
}
75+
76+
public static RSocketClient create(Publisher<List<LoadbalanceTarget>> rSocketSuppliersPublisher) {
77+
return create(new RoundRobinLoadbalanceStrategy(), rSocketSuppliersPublisher);
78+
}
79+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.loadbalance;
17+
18+
public interface LoadbalanceStrategy {
19+
20+
PooledRSocket select(PooledRSocket[] availableRSockets);
21+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.loadbalance;
18+
19+
import io.rsocket.RSocket;
20+
import reactor.core.publisher.Mono;
21+
22+
public class LoadbalanceTarget {
23+
24+
final String serverKey;
25+
final Mono<RSocket> source;
26+
27+
public LoadbalanceTarget(String serverKey, Mono<RSocket> source) {
28+
this.serverKey = serverKey;
29+
this.source = source;
30+
}
31+
32+
public Mono<RSocket> source() {
33+
return source;
34+
}
35+
36+
@Override
37+
public boolean equals(Object o) {
38+
if (this == o) return true;
39+
if (o == null || getClass() != o.getClass()) return false;
40+
41+
LoadbalanceTarget that = (LoadbalanceTarget) o;
42+
43+
return serverKey.equals(that.serverKey);
44+
}
45+
46+
@Override
47+
public int hashCode() {
48+
return serverKey.hashCode();
49+
}
50+
}

0 commit comments

Comments
 (0)