Skip to content

Commit 76005e7

Browse files
gregwhitakeryschimke
authored andcommitted
Exposed server rsocket to client acceptor (rsocket#300)
Currently, it is not possible for the client side acceptor to call the server rsocket. This change exposes the server rsocket to the client acceptor.
1 parent df45135 commit 76005e7

2 files changed

Lines changed: 129 additions & 120 deletions

File tree

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 107 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.time.Duration;
1212
import java.util.function.Consumer;
13+
import java.util.function.Function;
1314
import java.util.function.Supplier;
1415

1516
/**
@@ -85,16 +86,19 @@ interface MimeType<T> {
8586
}
8687

8788
class ClientRSocketFactory implements
88-
KeepAlive<ClientRSocketFactory>,
89-
MimeType<ClientRSocketFactory>,
90-
Acceptor<ClientTransport, RSocket, RSocket>,
91-
Transport<ClientTransport, RSocket>,
92-
Fragmentation<ClientRSocketFactory, ClientTransport, RSocket, RSocket>,
93-
ErrorConsumer<ClientRSocketFactory, ClientTransport, RSocket, RSocket>,
94-
SetupPayload<ClientRSocketFactory> {
95-
96-
private Supplier<RSocket> acceptor = () -> new AbstractRSocket() {
97-
};
89+
KeepAlive<ClientRSocketFactory>,
90+
MimeType<ClientRSocketFactory>,
91+
Acceptor<ClientTransport, Function<RSocket, RSocket>, RSocket>,
92+
Transport<ClientTransport, RSocket>,
93+
Fragmentation<ClientRSocketFactory, ClientTransport, Function<RSocket, RSocket>, RSocket>,
94+
ErrorConsumer<ClientRSocketFactory, ClientTransport, Function<RSocket, RSocket>, RSocket>,
95+
SetupPayload<ClientRSocketFactory> {
96+
97+
private Supplier<Function<RSocket, RSocket>> acceptor =
98+
()
99+
-> rSocket
100+
-> new AbstractRSocket() {};
101+
98102
private Supplier<io.rsocket.transport.ClientTransport> transportClient;
99103
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
100104
private int mtu = 0;
@@ -174,7 +178,7 @@ public Start<RSocket> transport(Supplier<io.rsocket.transport.ClientTransport> t
174178
}
175179

176180
@Override
177-
public Transport<io.rsocket.transport.ClientTransport, RSocket> acceptor(Supplier<RSocket> acceptor) {
181+
public Transport<io.rsocket.transport.ClientTransport, RSocket> acceptor(Supplier<Function<RSocket, RSocket>> acceptor) {
178182
this.acceptor = acceptor;
179183
return new ClientTransport();
180184
}
@@ -201,56 +205,61 @@ protected class StartClient implements Start<RSocket> {
201205
@Override
202206
public Mono<RSocket> start() {
203207
return transportClient
204-
.get()
205-
.connect()
206-
.then(connection -> {
207-
Frame setupFrame = Frame.Setup
208-
.from(
209-
flags,
210-
(int) ackTimeout.toMillis(),
211-
(int) ackTimeout.toMillis() * missedAcks,
212-
dataMineType,
213-
metadataMimeType,
214-
setupPayload);
215-
216-
217-
ClientServerInputMultiplexer multiplexer;
218-
if (mtu > 0) {
219-
multiplexer = new ClientServerInputMultiplexer(new FragmentationDuplexConnection(connection, mtu));
220-
} else {
221-
multiplexer = new ClientServerInputMultiplexer(connection);
222-
}
223-
224-
RSocketClient rSocketClient
225-
= new RSocketClient(
226-
multiplexer.asClientConnection(),
227-
errorConsumer,
228-
StreamIdSupplier.clientSupplier(),
229-
tickPeriod,
230-
ackTimeout,
231-
missedAcks);
232-
233-
return Plugins
234-
.SERVER_REACTIVE_SOCKET_INTERCEPTOR
235-
.apply(acceptor.get())
236-
.doOnNext(rSocket ->
237-
new RSocketServer(
238-
multiplexer.asServerConnection(),
239-
rSocket,
240-
errorConsumer)
241-
)
242-
.then(connection
243-
.sendOne(setupFrame)
244-
.then(Plugins.CLIENT_REACTIVE_SOCKET_INTERCEPTOR.apply(rSocketClient)));
245-
});
208+
.get()
209+
.connect()
210+
.then(connection -> {
211+
Frame setupFrame = Frame.Setup
212+
.from(
213+
flags,
214+
(int) ackTimeout.toMillis(),
215+
(int) ackTimeout.toMillis() * missedAcks,
216+
dataMineType,
217+
metadataMimeType,
218+
setupPayload);
219+
220+
221+
ClientServerInputMultiplexer multiplexer;
222+
if (mtu > 0) {
223+
multiplexer = new ClientServerInputMultiplexer(new FragmentationDuplexConnection(connection, mtu));
224+
} else {
225+
multiplexer = new ClientServerInputMultiplexer(connection);
226+
}
227+
228+
RSocketClient rSocketClient
229+
= new RSocketClient(
230+
multiplexer.asClientConnection(),
231+
errorConsumer,
232+
StreamIdSupplier.clientSupplier(),
233+
tickPeriod,
234+
ackTimeout,
235+
missedAcks);
236+
237+
return Plugins
238+
.CLIENT_REACTIVE_SOCKET_INTERCEPTOR
239+
.apply(rSocketClient)
240+
.then(wrappedClientRSocket -> {
241+
RSocket unwrappedServerSocket = acceptor.get().apply(wrappedClientRSocket);
242+
return Plugins
243+
.SERVER_REACTIVE_SOCKET_INTERCEPTOR
244+
.apply(unwrappedServerSocket)
245+
.doOnNext(rSocket ->
246+
new RSocketServer(
247+
multiplexer.asServerConnection(),
248+
rSocket,
249+
errorConsumer)
250+
)
251+
.then(connection.sendOne(setupFrame))
252+
.then(Mono.just(wrappedClientRSocket));
253+
});
254+
});
246255
}
247256
}
248257
}
249258

250259
class ServerRSocketFactory implements
251-
Acceptor<ServerTransport, SocketAcceptor, Closeable>,
252-
Fragmentation<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable>,
253-
ErrorConsumer<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable> {
260+
Acceptor<ServerTransport, SocketAcceptor, Closeable>,
261+
Fragmentation<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable>,
262+
ErrorConsumer<ServerRSocketFactory, ServerTransport, SocketAcceptor, Closeable> {
254263

255264
private Supplier<SocketAcceptor> acceptor;
256265
private Supplier<io.rsocket.transport.ServerTransport> transportServer;
@@ -290,48 +299,48 @@ private class ServerStart implements Start {
290299
@Override
291300
public Mono<Closeable> start() {
292301
return transportServer
293-
.get()
294-
.start(connection -> {
295-
ClientServerInputMultiplexer multiplexer;
296-
if (mtu > 0) {
297-
multiplexer = new ClientServerInputMultiplexer(new FragmentationDuplexConnection(connection, mtu));
298-
} else {
299-
multiplexer = new ClientServerInputMultiplexer(connection);
300-
}
301-
302-
return multiplexer
303-
.asStreamZeroConnection()
304-
.receive()
305-
.next()
306-
.then(setupFrame -> {
307-
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
308-
309-
RSocketClient rSocketClient
310-
= new RSocketClient(
311-
multiplexer.asServerConnection(),
312-
errorConsumer,
313-
StreamIdSupplier.serverSupplier());
314-
315-
Mono<RSocket> wrappedRSocketClient
316-
= Plugins
317-
.CLIENT_REACTIVE_SOCKET_INTERCEPTOR
318-
.apply(rSocketClient);
319-
320-
return wrappedRSocketClient
321-
.then(sender ->
322-
acceptor
323-
.get()
324-
.accept(setupPayload, sender)
325-
.then(Plugins.SERVER_REACTIVE_SOCKET_INTERCEPTOR::apply)
326-
)
327-
.map(handler ->
328-
new RSocketServer(
329-
multiplexer.asClientConnection(),
330-
handler,
331-
errorConsumer))
332-
.then();
333-
});
334-
});
302+
.get()
303+
.start(connection -> {
304+
ClientServerInputMultiplexer multiplexer;
305+
if (mtu > 0) {
306+
multiplexer = new ClientServerInputMultiplexer(new FragmentationDuplexConnection(connection, mtu));
307+
} else {
308+
multiplexer = new ClientServerInputMultiplexer(connection);
309+
}
310+
311+
return multiplexer
312+
.asStreamZeroConnection()
313+
.receive()
314+
.next()
315+
.then(setupFrame -> {
316+
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
317+
318+
RSocketClient rSocketClient
319+
= new RSocketClient(
320+
multiplexer.asServerConnection(),
321+
errorConsumer,
322+
StreamIdSupplier.serverSupplier());
323+
324+
Mono<RSocket> wrappedRSocketClient
325+
= Plugins
326+
.CLIENT_REACTIVE_SOCKET_INTERCEPTOR
327+
.apply(rSocketClient);
328+
329+
return wrappedRSocketClient
330+
.then(sender ->
331+
acceptor
332+
.get()
333+
.accept(setupPayload, sender)
334+
.then(Plugins.SERVER_REACTIVE_SOCKET_INTERCEPTOR::apply)
335+
)
336+
.map(handler ->
337+
new RSocketServer(
338+
multiplexer.asClientConnection(),
339+
handler,
340+
errorConsumer))
341+
.then();
342+
});
343+
});
335344

336345
}
337346
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/duplex/DuplexClient.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -30,30 +30,30 @@ public final class DuplexClient {
3030

3131
public static void main(String[] args) {
3232
RSocketFactory
33-
.receive()
34-
.acceptor((setup, reactiveSocket) -> {
35-
reactiveSocket.requestStream(new PayloadImpl("Hello-Bidi"))
36-
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
37-
.log()
38-
.subscribe();
39-
40-
return Mono.just(new AbstractRSocket() {});
41-
})
42-
.transport(TcpServerTransport.create("localhost", 7000))
43-
.start()
44-
.subscribe();
33+
.receive()
34+
.acceptor((setup, reactiveSocket) -> {
35+
reactiveSocket.requestStream(new PayloadImpl("Hello-Bidi"))
36+
.map(payload -> StandardCharsets.UTF_8.decode(payload.getData()).toString())
37+
.log()
38+
.subscribe();
39+
40+
return Mono.just(new AbstractRSocket() {});
41+
})
42+
.transport(TcpServerTransport.create("localhost", 7000))
43+
.start()
44+
.subscribe();
4545

4646
RSocket socket = RSocketFactory
47-
.connect()
48-
.acceptor(new AbstractRSocket() {
49-
@Override
50-
public Flux<Payload> requestStream(Payload payload) {
51-
return Flux.interval(Duration.ofSeconds(1)).map(aLong -> new PayloadImpl("Bi-di Response => " + aLong));
52-
}
53-
})
54-
.transport(TcpClientTransport.create("localhost", 7000))
55-
.start()
56-
.block();
47+
.connect()
48+
.acceptor(rSocket -> new AbstractRSocket() {
49+
@Override
50+
public Flux<Payload> requestStream(Payload payload) {
51+
return Flux.interval(Duration.ofSeconds(1)).map(aLong -> new PayloadImpl("Bi-di Response => " + aLong));
52+
}
53+
})
54+
.transport(TcpClientTransport.create("localhost", 7000))
55+
.start()
56+
.block();
5757

5858
socket.onClose().block();
5959
}

0 commit comments

Comments
 (0)