|
44 | 44 | import io.rsocket.util.MultiSubscriberRSocket; |
45 | 45 | import java.time.Duration; |
46 | 46 | import java.util.Objects; |
| 47 | +import java.util.function.BiConsumer; |
47 | 48 | import java.util.function.Consumer; |
48 | 49 | import java.util.function.Function; |
49 | 50 | import java.util.function.Supplier; |
| 51 | +import reactor.core.Disposable; |
50 | 52 | import reactor.core.publisher.Mono; |
| 53 | +import reactor.util.retry.Retry; |
51 | 54 |
|
52 | 55 | /** Factory for creating RSocket clients and servers. */ |
53 | 56 | public class RSocketFactory { |
@@ -95,6 +98,9 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) { |
95 | 98 | public static class ClientRSocketFactory implements ClientTransportAcceptor { |
96 | 99 | private static final String CLIENT_TAG = "client"; |
97 | 100 |
|
| 101 | + private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION = |
| 102 | + (r, i) -> r.onClose().subscribe(null, null, i::invalidate); |
| 103 | + |
98 | 104 | private SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just(new AbstractRSocket() {}); |
99 | 105 |
|
100 | 106 | private Consumer<Throwable> errorConsumer = Throwable::printStackTrace; |
@@ -125,6 +131,8 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor { |
125 | 131 | private boolean multiSubscriberRequester = true; |
126 | 132 | private boolean leaseEnabled; |
127 | 133 | private Supplier<Leases<?>> leasesSupplier = Leases::new; |
| 134 | + private boolean reconnectEnabled; |
| 135 | + private Retry retrySpec; |
128 | 136 |
|
129 | 137 | private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT; |
130 | 138 |
|
@@ -230,6 +238,86 @@ public ClientRSocketFactory singleSubscriberRequester() { |
230 | 238 | return this; |
231 | 239 | } |
232 | 240 |
|
| 241 | + /** |
| 242 | + * Enables a reconnectable, shared instance of {@code Mono<RSocket>} so every subscriber will |
| 243 | + * observe the same RSocket instance up on connection establishment. <br> |
| 244 | + * For example: |
| 245 | + * |
| 246 | + * <pre>{@code |
| 247 | + * Mono<RSocket> sharedRSocketMono = |
| 248 | + * RSocketFactory |
| 249 | + * .connect() |
| 250 | + * .singleSubscriberRequester() |
| 251 | + * .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1))) |
| 252 | + * .transport(transport) |
| 253 | + * .start(); |
| 254 | + * |
| 255 | + * RSocket r1 = sharedRSocketMono.block(); |
| 256 | + * RSocket r2 = sharedRSocketMono.block(); |
| 257 | + * |
| 258 | + * assert r1 == r2; |
| 259 | + * |
| 260 | + * }</pre> |
| 261 | + * |
| 262 | + * Apart of the shared behavior, if the connection is lost, the same {@code Mono<RSocket>} |
| 263 | + * instance will transparently re-establish the connection for subsequent subscribers.<br> |
| 264 | + * For example: |
| 265 | + * |
| 266 | + * <pre>{@code |
| 267 | + * Mono<RSocket> sharedRSocketMono = |
| 268 | + * RSocketFactory |
| 269 | + * .connect() |
| 270 | + * .singleSubscriberRequester() |
| 271 | + * .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1))) |
| 272 | + * .transport(transport) |
| 273 | + * .start(); |
| 274 | + * |
| 275 | + * RSocket r1 = sharedRSocketMono.block(); |
| 276 | + * RSocket r2 = sharedRSocketMono.block(); |
| 277 | + * |
| 278 | + * assert r1 == r2; |
| 279 | + * |
| 280 | + * r1.dispose() |
| 281 | + * |
| 282 | + * assert r2.isDisposed() |
| 283 | + * |
| 284 | + * RSocket r3 = sharedRSocketMono.block(); |
| 285 | + * RSocket r4 = sharedRSocketMono.block(); |
| 286 | + * |
| 287 | + * |
| 288 | + * assert r1 != r3; |
| 289 | + * assert r4 == r3; |
| 290 | + * |
| 291 | + * }</pre> |
| 292 | + * |
| 293 | + * <b>Note,</b> having reconnect() enabled does not eliminate the need to accompany each |
| 294 | + * individual request with the corresponding retry logic. <br> |
| 295 | + * For example: |
| 296 | + * |
| 297 | + * <pre>{@code |
| 298 | + * Mono<RSocket> sharedRSocketMono = |
| 299 | + * RSocketFactory |
| 300 | + * .connect() |
| 301 | + * .singleSubscriberRequester() |
| 302 | + * .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1))) |
| 303 | + * .transport(transport) |
| 304 | + * .start(); |
| 305 | + * |
| 306 | + * sharedRSocket.flatMap(rSocket -> rSocket.requestResponse(...)) |
| 307 | + * .retryWhen(ownRetry) |
| 308 | + * .subscribe() |
| 309 | + * |
| 310 | + * }</pre> |
| 311 | + * |
| 312 | + * @param retrySpec a retry factory applied for {@link Mono#retryWhen(Retry)} |
| 313 | + * @return a shared instance of {@code Mono<RSocket>}. |
| 314 | + */ |
| 315 | + public ClientRSocketFactory reconnect(Retry retrySpec) { |
| 316 | + this.retrySpec = Objects.requireNonNull(retrySpec); |
| 317 | + this.reconnectEnabled = true; |
| 318 | + return this; |
| 319 | + } |
| 320 | + |
233 | 321 | public ClientRSocketFactory resume() { |
234 | 322 | this.resumeEnabled = true; |
235 | 323 | return this; |
@@ -392,6 +480,15 @@ public Mono<RSocket> start() { |
392 | 480 | .sendOne(setupFrame) |
393 | 481 | .thenReturn(wrappedRSocketRequester); |
394 | 482 | }); |
| 483 | + }) |
| 484 | + .as( |
| 485 | + source -> { |
| 486 | + if (reconnectEnabled) { |
| 487 | + return new ReconnectMono<>( |
| 488 | + source.retryWhen(retrySpec), Disposable::dispose, INVALIDATE_FUNCTION); |
| 489 | + } else { |
| 490 | + return source; |
| 491 | + } |
395 | 492 | }); |
396 | 493 | } |
397 | 494 |
|
@@ -422,7 +519,7 @@ private ClientSetup clientSetup(DuplexConnection startConnection) { |
422 | 519 | } |
423 | 520 |
|
424 | 521 | private Mono<DuplexConnection> newConnection() { |
425 | | - return transportClient.get().connect(mtu); |
| 522 | + return Mono.fromSupplier(transportClient).flatMap(t -> t.connect(mtu)); |
426 | 523 | } |
427 | 524 | } |
428 | 525 | } |
@@ -698,9 +795,11 @@ public Mono<T> start() { |
698 | 795 |
|
699 | 796 | @Override |
700 | 797 | public Mono<T> get() { |
701 | | - return transportServer |
702 | | - .get() |
703 | | - .start(duplexConnection -> acceptor(serverSetup, duplexConnection), mtu) |
| 798 | + return Mono.fromSupplier(transportServer) |
| 799 | + .flatMap( |
| 800 | + transport -> |
| 801 | + transport.start( |
| 802 | + duplexConnection -> acceptor(serverSetup, duplexConnection), mtu)) |
704 | 803 | .doOnNext(c -> c.onClose().doFinally(v -> serverSetup.dispose()).subscribe()); |
705 | 804 | } |
706 | 805 | }); |
|
0 commit comments