Skip to content

Commit 8969401

Browse files
rstoyanchevrobertroeser
authored andcommitted
Add SocketAcceptorInterceptor (rsocket#676)
* SocketAcceptor applied symmetrically This commit deprecates the client side acceptor(BiFunction<ConnectionSetupPayload, RSocket>) and adds support for SocketAcceptor instead. This makes it possible for a client side acceptor to be asynchronous too, and allows applying the same acceptor both client and server side. Signed-off-by: Rossen Stoyanchev <[email protected]> * Add SocketAcceptorInteceptor This commit adds an interceptor for SocketAcceptor. This provides access to connection setup information and also allows applying requester and responder interceptors from one place. Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 2ac4f47 commit 8969401

5 files changed

Lines changed: 155 additions & 64 deletions

File tree

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 67 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
import io.rsocket.internal.ClientSetup;
3232
import io.rsocket.internal.ServerSetup;
3333
import io.rsocket.keepalive.KeepAliveHandler;
34-
import io.rsocket.lease.*;
35-
import io.rsocket.plugins.DuplexConnectionInterceptor;
36-
import io.rsocket.plugins.PluginRegistry;
37-
import io.rsocket.plugins.Plugins;
38-
import io.rsocket.plugins.RSocketInterceptor;
34+
import io.rsocket.lease.LeaseStats;
35+
import io.rsocket.lease.Leases;
36+
import io.rsocket.lease.RequesterLeaseHandler;
37+
import io.rsocket.lease.ResponderLeaseHandler;
38+
import io.rsocket.plugins.*;
3939
import io.rsocket.resume.*;
4040
import io.rsocket.transport.ClientTransport;
4141
import io.rsocket.transport.ServerTransport;
@@ -44,7 +44,10 @@
4444
import io.rsocket.util.MultiSubscriberRSocket;
4545
import java.time.Duration;
4646
import java.util.Objects;
47-
import java.util.function.*;
47+
import java.util.function.BiFunction;
48+
import java.util.function.Consumer;
49+
import java.util.function.Function;
50+
import java.util.function.Supplier;
4851
import reactor.core.publisher.Mono;
4952

5053
/** Factory for creating RSocket clients and servers. */
@@ -93,10 +96,7 @@ default <T extends Closeable> Start<T> transport(ServerTransport<T> transport) {
9396
public static class ClientRSocketFactory implements ClientTransportAcceptor {
9497
private static final String CLIENT_TAG = "client";
9598

96-
private Supplier<Function<RSocket, RSocket>> acceptor =
97-
() -> rSocket -> new AbstractRSocket() {};
98-
99-
private BiFunction<ConnectionSetupPayload, RSocket, RSocket> biAcceptor;
99+
private SocketAcceptor acceptor = (setup, sendingSocket) -> Mono.just(new AbstractRSocket() {});
100100

101101
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
102102
private int mtu = 0;
@@ -161,6 +161,11 @@ public ClientRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
161161
return this;
162162
}
163163

164+
public ClientRSocketFactory addSocketAcceptorPlugin(SocketAcceptorInterceptor interceptor) {
165+
plugins.addSocketAcceptorPlugin(interceptor);
166+
return this;
167+
}
168+
164169
/**
165170
* Deprecated as Keep-Alive is not optional according to spec
166171
*
@@ -268,18 +273,25 @@ public Start<RSocket> transport(Supplier<ClientTransport> transportClient) {
268273
}
269274

270275
public ClientTransportAcceptor acceptor(Function<RSocket, RSocket> acceptor) {
271-
this.acceptor = () -> acceptor;
272-
return StartClient::new;
276+
return acceptor(() -> acceptor);
273277
}
274278

275279
public ClientTransportAcceptor acceptor(Supplier<Function<RSocket, RSocket>> acceptor) {
276-
this.acceptor = acceptor;
277-
return StartClient::new;
280+
return acceptor(
281+
(SocketAcceptor)
282+
(setup, sendingSocket) -> Mono.just(acceptor.get().apply(sendingSocket)));
278283
}
279284

285+
@Deprecated
280286
public ClientTransportAcceptor acceptor(
281287
BiFunction<ConnectionSetupPayload, RSocket, RSocket> biAcceptor) {
282-
this.biAcceptor = biAcceptor;
288+
return acceptor(
289+
(SocketAcceptor)
290+
(setup, sendingSocket) -> Mono.just(biAcceptor.apply(setup, sendingSocket)));
291+
}
292+
293+
public ClientTransportAcceptor acceptor(SocketAcceptor acceptor) {
294+
this.acceptor = acceptor;
283295
return StartClient::new;
284296
}
285297

@@ -346,6 +358,8 @@ public Mono<RSocket> start() {
346358
rSocketRequester = new MultiSubscriberRSocket(rSocketRequester);
347359
}
348360

361+
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
362+
349363
ByteBuf setupFrame =
350364
SetupFrameFlyweight.encode(
351365
allocator,
@@ -357,34 +371,38 @@ public Mono<RSocket> start() {
357371
dataMimeType,
358372
setupPayload);
359373

360-
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
361-
362-
RSocket rSocketHandler;
363-
if (biAcceptor != null) {
364-
ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame);
365-
rSocketHandler = biAcceptor.apply(setup, wrappedRSocketRequester);
366-
} else {
367-
rSocketHandler = acceptor.get().apply(wrappedRSocketRequester);
368-
}
369-
370-
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
371-
372-
ResponderLeaseHandler responderLeaseHandler =
373-
isLeaseEnabled
374-
? new ResponderLeaseHandler.Impl<>(
375-
CLIENT_TAG, allocator, leases.sender(), errorConsumer, leases.stats())
376-
: ResponderLeaseHandler.None;
377-
378-
RSocket rSocketResponder =
379-
new RSocketResponder(
380-
allocator,
381-
multiplexer.asServerConnection(),
382-
wrappedRSocketHandler,
383-
payloadDecoder,
384-
errorConsumer,
385-
responderLeaseHandler);
374+
ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame);
375+
376+
return plugins
377+
.applySocketAcceptorInterceptor(acceptor)
378+
.accept(setup, wrappedRSocketRequester)
379+
.flatMap(
380+
rSocketHandler -> {
381+
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
382+
383+
ResponderLeaseHandler responderLeaseHandler =
384+
isLeaseEnabled
385+
? new ResponderLeaseHandler.Impl<>(
386+
CLIENT_TAG,
387+
allocator,
388+
leases.sender(),
389+
errorConsumer,
390+
leases.stats())
391+
: ResponderLeaseHandler.None;
392+
393+
RSocket rSocketResponder =
394+
new RSocketResponder(
395+
allocator,
396+
multiplexer.asServerConnection(),
397+
wrappedRSocketHandler,
398+
payloadDecoder,
399+
errorConsumer,
400+
responderLeaseHandler);
386401

387-
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketRequester);
402+
return wrappedConnection
403+
.sendOne(setupFrame)
404+
.thenReturn(wrappedRSocketRequester);
405+
});
388406
});
389407
}
390408

@@ -476,6 +494,11 @@ public ServerRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
476494
return this;
477495
}
478496

497+
public ServerRSocketFactory addSocketAcceptorPlugin(SocketAcceptorInterceptor interceptor) {
498+
plugins.addSocketAcceptorPlugin(interceptor);
499+
return this;
500+
}
501+
479502
public ServerTransportAcceptor acceptor(SocketAcceptor acceptor) {
480503
this.acceptor = acceptor;
481504
return new ServerStart<>();
@@ -644,7 +667,8 @@ private Mono<Void> acceptSetup(
644667
}
645668
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
646669

647-
return acceptor
670+
return plugins
671+
.applySocketAcceptorInterceptor(acceptor)
648672
.accept(setupPayload, wrappedRSocketRequester)
649673
.onErrorResume(
650674
err -> sendError(multiplexer, rejectedSetupError(err)).then(Mono.error(err)))

rsocket-core/src/main/java/io/rsocket/SocketAcceptor.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,21 @@
2020
import reactor.core.publisher.Mono;
2121

2222
/**
23-
* {@code RSocket} is a full duplex protocol where a client and server are identical in terms of
24-
* both having the capability to initiate requests to their peer. This interface provides the
25-
* contract where a server accepts a new {@code RSocket} for sending requests to the peer and
26-
* returns a new {@code RSocket} that will be used to accept requests from it's peer.
23+
* RSocket is a full duplex protocol where a client and server are identical in terms of both having
24+
* the capability to initiate requests to their peer. This interface provides the contract where a
25+
* client or server handles the {@code setup} for a new connection and creates a responder {@code
26+
* RSocket} for accepting requests from the remote peer.
2727
*/
2828
public interface SocketAcceptor {
2929

3030
/**
31-
* Accepts a new {@code RSocket} used to send requests to the peer and returns another {@code
32-
* RSocket} that is used for accepting requests from the peer.
31+
* Handle the {@code SETUP} frame for a new connection and create a responder {@code RSocket} for
32+
* handling requests from the remote peer.
3333
*
34-
* @param setup Setup as sent by the client.
35-
* @param sendingSocket Socket used to send requests to the peer.
36-
* @return Socket to accept requests from the peer.
34+
* @param setup the {@code setup} received from a client in a server scenario, or in a client
35+
* scenario this is the setup about to be sent to the server.
36+
* @param sendingSocket socket for sending requests to the remote peer.
37+
* @return {@code RSocket} to accept requests with.
3738
* @throws SetupException If the acceptor needs to reject the setup of this socket.
3839
*/
3940
Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket);

rsocket-core/src/main/java/io/rsocket/plugins/PluginRegistry.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818

1919
import io.rsocket.DuplexConnection;
2020
import io.rsocket.RSocket;
21+
import io.rsocket.SocketAcceptor;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324

2425
public class PluginRegistry {
2526
private List<DuplexConnectionInterceptor> connections = new ArrayList<>();
2627
private List<RSocketInterceptor> requesters = new ArrayList<>();
2728
private List<RSocketInterceptor> responders = new ArrayList<>();
29+
private List<SocketAcceptorInterceptor> socketAcceptorInterceptors = new ArrayList<>();
2830

2931
public PluginRegistry() {}
3032

@@ -58,6 +60,10 @@ public void addResponderPlugin(RSocketInterceptor interceptor) {
5860
responders.add(interceptor);
5961
}
6062

63+
public void addSocketAcceptorPlugin(SocketAcceptorInterceptor interceptor) {
64+
socketAcceptorInterceptors.add(interceptor);
65+
}
66+
6167
/** Deprecated. Use {@link #applyRequester(RSocket)} instead */
6268
@Deprecated
6369
public RSocket applyClient(RSocket rSocket) {
@@ -86,6 +92,14 @@ public RSocket applyResponder(RSocket rSocket) {
8692
return rSocket;
8793
}
8894

95+
public SocketAcceptor applySocketAcceptorInterceptor(SocketAcceptor acceptor) {
96+
for (SocketAcceptorInterceptor i : socketAcceptorInterceptors) {
97+
acceptor = i.apply(acceptor);
98+
}
99+
100+
return acceptor;
101+
}
102+
89103
public DuplexConnection applyConnection(
90104
DuplexConnectionInterceptor.Type type, DuplexConnection connection) {
91105
for (DuplexConnectionInterceptor i : connections) {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.rsocket.plugins;
17+
18+
import io.rsocket.SocketAcceptor;
19+
import java.util.function.Function;
20+
21+
/**
22+
* Contract to decorate a {@link SocketAcceptor}, providing access to connection {@code setup}
23+
* information and the ability to also decorate the sockets for requesting and responding.
24+
*
25+
* <p>This can be used as an alternative to individual requester and responder {@link
26+
* RSocketInterceptor} plugins.
27+
*/
28+
public @FunctionalInterface interface SocketAcceptorInterceptor
29+
extends Function<SocketAcceptor, SocketAcceptor> {}

rsocket-examples/src/test/java/io/rsocket/integration/IntegrationTest.java

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.rsocket.RSocketFactory;
3030
import io.rsocket.plugins.DuplexConnectionInterceptor;
3131
import io.rsocket.plugins.RSocketInterceptor;
32+
import io.rsocket.plugins.SocketAcceptorInterceptor;
3233
import io.rsocket.test.TestSubscriber;
3334
import io.rsocket.transport.netty.client.TcpClientTransport;
3435
import io.rsocket.transport.netty.server.CloseableChannel;
@@ -48,34 +49,52 @@
4849

4950
public class IntegrationTest {
5051

51-
private static final RSocketInterceptor clientPlugin;
52-
private static final RSocketInterceptor serverPlugin;
52+
private static final RSocketInterceptor requesterPlugin;
53+
private static final RSocketInterceptor responderPlugin;
54+
private static final SocketAcceptorInterceptor clientAcceptorPlugin;
55+
private static final SocketAcceptorInterceptor serverAcceptorPlugin;
5356
private static final DuplexConnectionInterceptor connectionPlugin;
54-
public static volatile boolean calledClient = false;
55-
public static volatile boolean calledServer = false;
57+
public static volatile boolean calledRequester = false;
58+
public static volatile boolean calledResponder = false;
59+
public static volatile boolean calledClientAcceptor = false;
60+
public static volatile boolean calledServerAcceptor = false;
5661
public static volatile boolean calledFrame = false;
5762

5863
static {
59-
clientPlugin =
64+
requesterPlugin =
6065
reactiveSocket ->
6166
new RSocketProxy(reactiveSocket) {
6267
@Override
6368
public Mono<Payload> requestResponse(Payload payload) {
64-
calledClient = true;
69+
calledRequester = true;
6570
return reactiveSocket.requestResponse(payload);
6671
}
6772
};
6873

69-
serverPlugin =
74+
responderPlugin =
7075
reactiveSocket ->
7176
new RSocketProxy(reactiveSocket) {
7277
@Override
7378
public Mono<Payload> requestResponse(Payload payload) {
74-
calledServer = true;
79+
calledResponder = true;
7580
return reactiveSocket.requestResponse(payload);
7681
}
7782
};
7883

84+
clientAcceptorPlugin =
85+
acceptor ->
86+
(setup, sendingSocket) -> {
87+
calledClientAcceptor = true;
88+
return acceptor.accept(setup, sendingSocket);
89+
};
90+
91+
serverAcceptorPlugin =
92+
acceptor ->
93+
(setup, sendingSocket) -> {
94+
calledServerAcceptor = true;
95+
return acceptor.accept(setup, sendingSocket);
96+
};
97+
7998
connectionPlugin =
8099
(type, connection) -> {
81100
calledFrame = true;
@@ -99,7 +118,8 @@ public void startup() {
99118

100119
server =
101120
RSocketFactory.receive()
102-
.addServerPlugin(serverPlugin)
121+
.addResponderPlugin(responderPlugin)
122+
.addSocketAcceptorPlugin(serverAcceptorPlugin)
103123
.addConnectionPlugin(connectionPlugin)
104124
.errorConsumer(
105125
t -> {
@@ -138,7 +158,8 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
138158

139159
client =
140160
RSocketFactory.connect()
141-
.addClientPlugin(clientPlugin)
161+
.addRequesterPlugin(requesterPlugin)
162+
.addSocketAcceptorPlugin(clientAcceptorPlugin)
142163
.addConnectionPlugin(connectionPlugin)
143164
.transport(TcpClientTransport.create(server.address()))
144165
.start()
@@ -154,8 +175,10 @@ public void teardown() {
154175
public void testRequest() {
155176
client.requestResponse(DefaultPayload.create("REQUEST", "META")).block();
156177
assertThat("Server did not see the request.", requestCount.get(), is(1));
157-
assertTrue(calledClient);
158-
assertTrue(calledServer);
178+
assertTrue(calledRequester);
179+
assertTrue(calledResponder);
180+
assertTrue(calledClientAcceptor);
181+
assertTrue(calledServerAcceptor);
159182
assertTrue(calledFrame);
160183
}
161184

0 commit comments

Comments
 (0)