Skip to content

Commit 265a83b

Browse files
Lease support (rsocket#648)
* lease support * rsocket requester: remove lifecycle, make interactions single subscriber only * wrap requester rsockets in multi subscription rsocket, except lease case * pluggable stats * single subscriber Requester: metadata-push, fnf, request-response requests are lazy; make sure above can be subscribed at most once, and send at most 1 request frame Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent fb5aa1d commit 265a83b

30 files changed

+1833
-368
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.0.0-RC1-SNAPSHOT
14+
version=1.0.0-RC1-LEASE-SNAPSHOT

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 111 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.rsocket.internal.ClientSetup;
3232
import io.rsocket.internal.ServerSetup;
3333
import io.rsocket.keepalive.KeepAliveHandler;
34+
import io.rsocket.lease.*;
3435
import io.rsocket.plugins.DuplexConnectionInterceptor;
3536
import io.rsocket.plugins.PluginRegistry;
3637
import io.rsocket.plugins.Plugins;
@@ -40,12 +41,10 @@
4041
import io.rsocket.transport.ServerTransport;
4142
import io.rsocket.util.ConnectionUtils;
4243
import io.rsocket.util.EmptyPayload;
44+
import io.rsocket.util.MultiSubscriberRSocket;
4345
import java.time.Duration;
4446
import java.util.Objects;
45-
import java.util.function.BiFunction;
46-
import java.util.function.Consumer;
47-
import java.util.function.Function;
48-
import java.util.function.Supplier;
47+
import java.util.function.*;
4948
import reactor.core.publisher.Mono;
5049

5150
/** Factory for creating RSocket clients and servers. */
@@ -92,6 +91,8 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
9291
}
9392

9493
public static class ClientRSocketFactory implements ClientTransportAcceptor {
94+
private static final String CLIENT_TAG = "client";
95+
9596
private Supplier<Function<RSocket, RSocket>> acceptor =
9697
() -> rSocket -> new AbstractRSocket() {};
9798

@@ -115,13 +116,17 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
115116
private boolean resumeCleanupStoreOnKeepAlive;
116117
private Supplier<ByteBuf> resumeTokenSupplier = ResumeFrameFlyweight::generateResumeToken;
117118
private Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory =
118-
token -> new InMemoryResumableFramesStore("client", 100_000);
119+
token -> new InMemoryResumableFramesStore(CLIENT_TAG, 100_000);
119120
private Duration resumeSessionDuration = Duration.ofMinutes(2);
120121
private Duration resumeStreamTimeout = Duration.ofSeconds(10);
121122
private Supplier<ResumeStrategy> resumeStrategySupplier =
122123
() ->
123124
new ExponentialBackoffResumeStrategy(Duration.ofSeconds(1), Duration.ofSeconds(16), 2);
124125

126+
private boolean multiSubscriberRequester = true;
127+
private boolean leaseEnabled;
128+
private Supplier<Leases<?>> leasesSupplier = Leases::new;
129+
125130
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
126131

127132
public ClientRSocketFactory byteBufAllocator(ByteBufAllocator allocator) {
@@ -205,6 +210,22 @@ public ClientRSocketFactory metadataMimeType(String metadataMimeType) {
205210
return this;
206211
}
207212

213+
public ClientRSocketFactory lease(Supplier<Leases<? extends LeaseStats>> leasesSupplier) {
214+
this.leaseEnabled = true;
215+
this.leasesSupplier = Objects.requireNonNull(leasesSupplier);
216+
return this;
217+
}
218+
219+
public ClientRSocketFactory lease() {
220+
this.leaseEnabled = true;
221+
return this;
222+
}
223+
224+
public ClientRSocketFactory singleSubscriberRequester() {
225+
this.multiSubscriberRequester = false;
226+
return this;
227+
}
228+
208229
public ClientRSocketFactory resume() {
209230
this.resumeEnabled = true;
210231
return this;
@@ -302,7 +323,14 @@ public Mono<RSocket> start() {
302323
ClientServerInputMultiplexer multiplexer =
303324
new ClientServerInputMultiplexer(wrappedConnection, plugins, true);
304325

305-
RSocketRequester rSocketRequester =
326+
boolean isLeaseEnabled = leaseEnabled;
327+
Leases<?> leases = leasesSupplier.get();
328+
RequesterLeaseHandler requesterLeaseHandler =
329+
isLeaseEnabled
330+
? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver())
331+
: RequesterLeaseHandler.None;
332+
333+
RSocket rSocketRequester =
306334
new RSocketRequester(
307335
allocator,
308336
multiplexer.asClientConnection(),
@@ -311,12 +339,17 @@ public Mono<RSocket> start() {
311339
StreamIdSupplier.clientSupplier(),
312340
keepAliveTickPeriod(),
313341
keepAliveTimeout(),
314-
keepAliveHandler);
342+
keepAliveHandler,
343+
requesterLeaseHandler);
344+
345+
if (multiSubscriberRequester) {
346+
rSocketRequester = new MultiSubscriberRSocket(rSocketRequester);
347+
}
315348

316349
ByteBuf setupFrame =
317350
SetupFrameFlyweight.encode(
318351
allocator,
319-
false,
352+
isLeaseEnabled,
320353
keepAliveTickPeriod(),
321354
keepAliveTimeout(),
322355
resumeToken,
@@ -337,13 +370,20 @@ public Mono<RSocket> start() {
337370

338371
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
339372

340-
RSocketResponder rSocketResponder =
373+
ResponderLeaseHandler responderLeaseHandler =
374+
isLeaseEnabled
375+
? new ResponderLeaseHandler.Impl<>(
376+
CLIENT_TAG, allocator, leases.sender(), errorConsumer, leases.stats())
377+
: ResponderLeaseHandler.None;
378+
379+
RSocket rSocketResponder =
341380
new RSocketResponder(
342381
allocator,
343382
multiplexer.asServerConnection(),
344383
wrappedRSocketHandler,
345384
payloadDecoder,
346-
errorConsumer);
385+
errorConsumer,
386+
responderLeaseHandler);
347387

348388
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketRequester);
349389
});
@@ -382,16 +422,23 @@ private Mono<DuplexConnection> newConnection() {
382422
}
383423

384424
public static class ServerRSocketFactory {
425+
private static final String SERVER_TAG = "server";
426+
385427
private SocketAcceptor acceptor;
386428
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
387429
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
388430
private int mtu = 0;
389431
private PluginRegistry plugins = new PluginRegistry(Plugins.defaultPlugins());
432+
390433
private boolean resumeSupported;
391434
private Duration resumeSessionDuration = Duration.ofSeconds(120);
392435
private Duration resumeStreamTimeout = Duration.ofSeconds(10);
393436
private Function<? super ByteBuf, ? extends ResumableFramesStore> resumeStoreFactory =
394-
token -> new InMemoryResumableFramesStore("server", 100_000);
437+
token -> new InMemoryResumableFramesStore(SERVER_TAG, 100_000);
438+
439+
private boolean multiSubscriberRequester = true;
440+
private boolean leaseEnabled;
441+
private Supplier<Leases<?>> leasesSupplier = Leases::new;
395442

396443
private ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
397444
private boolean resumeCleanupStoreOnKeepAlive;
@@ -450,6 +497,22 @@ public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
450497
return this;
451498
}
452499

500+
public ServerRSocketFactory lease(Supplier<Leases<?>> leasesSupplier) {
501+
this.leaseEnabled = true;
502+
this.leasesSupplier = Objects.requireNonNull(leasesSupplier);
503+
return this;
504+
}
505+
506+
public ServerRSocketFactory lease() {
507+
this.leaseEnabled = true;
508+
return this;
509+
}
510+
511+
public ServerRSocketFactory singleSubscriberRequester() {
512+
this.multiSubscriberRequester = false;
513+
return this;
514+
}
515+
453516
public ServerRSocketFactory resume() {
454517
this.resumeSupported = true;
455518
return this;
@@ -541,13 +604,31 @@ private Mono<Void> acceptSetup(
541604
multiplexer.dispose();
542605
});
543606
}
607+
608+
boolean isLeaseEnabled = leaseEnabled;
609+
610+
if (SetupFrameFlyweight.honorLease(setupFrame) && !isLeaseEnabled) {
611+
return sendError(multiplexer, new InvalidSetupException("lease is not supported"))
612+
.doFinally(
613+
signalType -> {
614+
setupFrame.release();
615+
multiplexer.dispose();
616+
});
617+
}
618+
544619
return serverSetup.acceptRSocketSetup(
545620
setupFrame,
546621
multiplexer,
547622
(keepAliveHandler, wrappedMultiplexer) -> {
548623
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
549624

550-
RSocketRequester rSocketRequester =
625+
Leases<?> leases = leasesSupplier.get();
626+
RequesterLeaseHandler requesterLeaseHandler =
627+
isLeaseEnabled
628+
? new RequesterLeaseHandler.Impl(SERVER_TAG, leases.receiver())
629+
: RequesterLeaseHandler.None;
630+
631+
RSocket rSocketRequester =
551632
new RSocketRequester(
552633
allocator,
553634
wrappedMultiplexer.asServerConnection(),
@@ -556,8 +637,12 @@ private Mono<Void> acceptSetup(
556637
StreamIdSupplier.serverSupplier(),
557638
setupPayload.keepAliveInterval(),
558639
setupPayload.keepAliveMaxLifetime(),
559-
keepAliveHandler);
640+
keepAliveHandler,
641+
requesterLeaseHandler);
560642

643+
if (multiSubscriberRequester) {
644+
rSocketRequester = new MultiSubscriberRSocket(rSocketRequester);
645+
}
561646
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
562647

563648
return acceptor
@@ -568,13 +653,24 @@ private Mono<Void> acceptSetup(
568653
rSocketHandler -> {
569654
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
570655

571-
RSocketResponder rSocketResponder =
656+
ResponderLeaseHandler responderLeaseHandler =
657+
isLeaseEnabled
658+
? new ResponderLeaseHandler.Impl<>(
659+
SERVER_TAG,
660+
allocator,
661+
leases.sender(),
662+
errorConsumer,
663+
leases.stats())
664+
: ResponderLeaseHandler.None;
665+
666+
RSocket rSocketResponder =
572667
new RSocketResponder(
573668
allocator,
574669
wrappedMultiplexer.asClientConnection(),
575670
wrappedRSocketHandler,
576671
payloadDecoder,
577-
errorConsumer);
672+
errorConsumer,
673+
responderLeaseHandler);
578674
})
579675
.doFinally(signalType -> setupPayload.release())
580676
.then();

0 commit comments

Comments
 (0)