Skip to content

Commit da18ea1

Browse files
Reconnectable and Shared Mono<RSocket> feature (rsocket#759)
* provides a reconnectable Mono<RSocket> feature Signed-off-by: Oleh Dokuka <[email protected]> * fixes interface naming Signed-off-by: Oleh Dokuka <[email protected]> * makes usage of retry only in case whenFactory is set up Signed-off-by: Oleh Dokuka <[email protected]> * refactors to convenient Retry Signed-off-by: Oleh Dokuka <[email protected]> * removes noargs reconnect() method Signed-off-by: Oleh Dokuka <[email protected]> * fixes tests Signed-off-by: Oleh Dokuka <[email protected]> * Update rsocket-core/src/main/java/io/rsocket/RSocketFactory.java Co-Authored-By: Rossen Stoyanchev <[email protected]> * fixes docs Signed-off-by: Oleh Dokuka <[email protected]> * provide test events logging Signed-off-by: Oleh Dokuka <[email protected]> * fixes test Signed-off-by: Oleh Dokuka <[email protected]> Co-authored-by: Rossen Stoyanchev <[email protected]>
1 parent d1c36ba commit da18ea1

File tree

5 files changed

+1617
-4
lines changed

5 files changed

+1617
-4
lines changed

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ subprojects {
119119
test {
120120
useJUnitPlatform()
121121

122+
testLogging {
123+
events "started", "passed", "skipped", "failed"
124+
}
125+
122126
systemProperty "io.netty.leakDetection.level", "ADVANCED"
123127
}
124128

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 103 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@
4444
import io.rsocket.util.MultiSubscriberRSocket;
4545
import java.time.Duration;
4646
import java.util.Objects;
47+
import java.util.function.BiConsumer;
4748
import java.util.function.Consumer;
4849
import java.util.function.Function;
4950
import java.util.function.Supplier;
51+
import reactor.core.Disposable;
5052
import reactor.core.publisher.Mono;
53+
import reactor.util.retry.Retry;
5154

5255
/** Factory for creating RSocket clients and servers. */
5356
public class RSocketFactory {
@@ -95,6 +98,9 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
9598
public static class ClientRSocketFactory implements ClientTransportAcceptor {
9699
private static final String CLIENT_TAG = "client";
97100

101+
private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION =
102+
(r, i) -> r.onClose().subscribe(null, null, i::invalidate);
103+
98104
private SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just(new AbstractRSocket() {});
99105

100106
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
@@ -125,6 +131,8 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
125131
private boolean multiSubscriberRequester = true;
126132
private boolean leaseEnabled;
127133
private Supplier<Leases<?>> leasesSupplier = Leases::new;
134+
private boolean reconnectEnabled;
135+
private Retry retrySpec;
128136

129137
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
130138

@@ -230,6 +238,86 @@ public ClientRSocketFactory singleSubscriberRequester() {
230238
return this;
231239
}
232240

241+
/**
242+
* Enables a reconnectable, shared instance of {@code Mono<RSocket>} so every subscriber will
243+
* observe the same RSocket instance up on connection establishment. <br>
244+
* For example:
245+
*
246+
* <pre>{@code
247+
* Mono<RSocket> sharedRSocketMono =
248+
* RSocketFactory
249+
* .connect()
250+
* .singleSubscriberRequester()
251+
* .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
252+
* .transport(transport)
253+
* .start();
254+
*
255+
* RSocket r1 = sharedRSocketMono.block();
256+
* RSocket r2 = sharedRSocketMono.block();
257+
*
258+
* assert r1 == r2;
259+
*
260+
* }</pre>
261+
*
262+
* Apart of the shared behavior, if the connection is lost, the same {@code Mono<RSocket>}
263+
* instance will transparently re-establish the connection for subsequent subscribers.<br>
264+
* For example:
265+
*
266+
* <pre>{@code
267+
* Mono<RSocket> sharedRSocketMono =
268+
* RSocketFactory
269+
* .connect()
270+
* .singleSubscriberRequester()
271+
* .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
272+
* .transport(transport)
273+
* .start();
274+
*
275+
* RSocket r1 = sharedRSocketMono.block();
276+
* RSocket r2 = sharedRSocketMono.block();
277+
*
278+
* assert r1 == r2;
279+
*
280+
* r1.dispose()
281+
*
282+
* assert r2.isDisposed()
283+
*
284+
* RSocket r3 = sharedRSocketMono.block();
285+
* RSocket r4 = sharedRSocketMono.block();
286+
*
287+
*
288+
* assert r1 != r3;
289+
* assert r4 == r3;
290+
*
291+
* }</pre>
292+
*
293+
* <b>Note,</b> having reconnect() enabled does not eliminate the need to accompany each
294+
* individual request with the corresponding retry logic. <br>
295+
* For example:
296+
*
297+
* <pre>{@code
298+
* Mono<RSocket> sharedRSocketMono =
299+
* RSocketFactory
300+
* .connect()
301+
* .singleSubscriberRequester()
302+
* .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
303+
* .transport(transport)
304+
* .start();
305+
*
306+
* sharedRSocket.flatMap(rSocket -> rSocket.requestResponse(...))
307+
* .retryWhen(ownRetry)
308+
* .subscribe()
309+
*
310+
* }</pre>
311+
*
312+
* @param retrySpec a retry factory applied for {@link Mono#retryWhen(Retry)}
313+
* @return a shared instance of {@code Mono<RSocket>}.
314+
*/
315+
public ClientRSocketFactory reconnect(Retry retrySpec) {
316+
this.retrySpec = Objects.requireNonNull(retrySpec);
317+
this.reconnectEnabled = true;
318+
return this;
319+
}
320+
233321
public ClientRSocketFactory resume() {
234322
this.resumeEnabled = true;
235323
return this;
@@ -392,6 +480,15 @@ public Mono<RSocket> start() {
392480
.sendOne(setupFrame)
393481
.thenReturn(wrappedRSocketRequester);
394482
});
483+
})
484+
.as(
485+
source -> {
486+
if (reconnectEnabled) {
487+
return new ReconnectMono<>(
488+
source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION);
489+
} else {
490+
return source;
491+
}
395492
});
396493
}
397494

@@ -422,7 +519,7 @@ private ClientSetup clientSetup(DuplexConnection startConnection) {
422519
}
423520

424521
private Mono<DuplexConnection> newConnection() {
425-
return transportClient.get().connect(mtu);
522+
return Mono.fromSupplier(transportClient).flatMap(t -> t.connect(mtu));
426523
}
427524
}
428525
}
@@ -698,9 +795,11 @@ public Mono<T> start() {
698795

699796
@Override
700797
public Mono<T> get() {
701-
return transportServer
702-
.get()
703-
.start(duplexConnection -> acceptor(serverSetup, duplexConnection), mtu)
798+
return Mono.fromSupplier(transportServer)
799+
.flatMap(
800+
transport ->
801+
transport.start(
802+
duplexConnection -> acceptor(serverSetup, duplexConnection), mtu))
704803
.doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe());
705804
}
706805
});

0 commit comments

Comments
 (0)