1010
1111import java .time .Duration ;
1212import java .util .function .Consumer ;
13+ import java .util .function .Function ;
1314import java .util .function .Supplier ;
1415
1516/**
@@ -85,16 +86,19 @@ interface MimeType<T> {
8586 }
8687
8788 class ClientRSocketFactory implements
88- KeepAlive <ClientRSocketFactory >,
89- MimeType <ClientRSocketFactory >,
90- Acceptor <ClientTransport , RSocket , RSocket >,
91- Transport <ClientTransport , RSocket >,
92- Fragmentation <ClientRSocketFactory , ClientTransport , RSocket , RSocket >,
93- ErrorConsumer <ClientRSocketFactory , ClientTransport , RSocket , RSocket >,
94- SetupPayload <ClientRSocketFactory > {
95-
96- private Supplier <RSocket > acceptor = () -> new AbstractRSocket () {
97- };
89+ KeepAlive <ClientRSocketFactory >,
90+ MimeType <ClientRSocketFactory >,
91+ Acceptor <ClientTransport , Function <RSocket , RSocket >, RSocket >,
92+ Transport <ClientTransport , RSocket >,
93+ Fragmentation <ClientRSocketFactory , ClientTransport , Function <RSocket , RSocket >, RSocket >,
94+ ErrorConsumer <ClientRSocketFactory , ClientTransport , Function <RSocket , RSocket >, RSocket >,
95+ SetupPayload <ClientRSocketFactory > {
96+
97+ private Supplier <Function <RSocket , RSocket >> acceptor =
98+ ()
99+ -> rSocket
100+ -> new AbstractRSocket () {};
101+
98102 private Supplier <io .rsocket .transport .ClientTransport > transportClient ;
99103 private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
100104 private int mtu = 0 ;
@@ -174,7 +178,7 @@ public Start<RSocket> transport(Supplier<io.rsocket.transport.ClientTransport> t
174178 }
175179
176180 @ Override
177- public Transport <io .rsocket .transport .ClientTransport , RSocket > acceptor (Supplier <RSocket > acceptor ) {
181+ public Transport <io .rsocket .transport .ClientTransport , RSocket > acceptor (Supplier <Function < RSocket , RSocket > > acceptor ) {
178182 this .acceptor = acceptor ;
179183 return new ClientTransport ();
180184 }
@@ -201,56 +205,61 @@ protected class StartClient implements Start<RSocket> {
201205 @ Override
202206 public Mono <RSocket > start () {
203207 return transportClient
204- .get ()
205- .connect ()
206- .then (connection -> {
207- Frame setupFrame = Frame .Setup
208- .from (
209- flags ,
210- (int ) ackTimeout .toMillis (),
211- (int ) ackTimeout .toMillis () * missedAcks ,
212- dataMineType ,
213- metadataMimeType ,
214- setupPayload );
215-
216-
217- ClientServerInputMultiplexer multiplexer ;
218- if (mtu > 0 ) {
219- multiplexer = new ClientServerInputMultiplexer (new FragmentationDuplexConnection (connection , mtu ));
220- } else {
221- multiplexer = new ClientServerInputMultiplexer (connection );
222- }
223-
224- RSocketClient rSocketClient
225- = new RSocketClient (
226- multiplexer .asClientConnection (),
227- errorConsumer ,
228- StreamIdSupplier .clientSupplier (),
229- tickPeriod ,
230- ackTimeout ,
231- missedAcks );
232-
233- return Plugins
234- .SERVER_REACTIVE_SOCKET_INTERCEPTOR
235- .apply (acceptor .get ())
236- .doOnNext (rSocket ->
237- new RSocketServer (
238- multiplexer .asServerConnection (),
239- rSocket ,
240- errorConsumer )
241- )
242- .then (connection
243- .sendOne (setupFrame )
244- .then (Plugins .CLIENT_REACTIVE_SOCKET_INTERCEPTOR .apply (rSocketClient )));
245- });
208+ .get ()
209+ .connect ()
210+ .then (connection -> {
211+ Frame setupFrame = Frame .Setup
212+ .from (
213+ flags ,
214+ (int ) ackTimeout .toMillis (),
215+ (int ) ackTimeout .toMillis () * missedAcks ,
216+ dataMineType ,
217+ metadataMimeType ,
218+ setupPayload );
219+
220+
221+ ClientServerInputMultiplexer multiplexer ;
222+ if (mtu > 0 ) {
223+ multiplexer = new ClientServerInputMultiplexer (new FragmentationDuplexConnection (connection , mtu ));
224+ } else {
225+ multiplexer = new ClientServerInputMultiplexer (connection );
226+ }
227+
228+ RSocketClient rSocketClient
229+ = new RSocketClient (
230+ multiplexer .asClientConnection (),
231+ errorConsumer ,
232+ StreamIdSupplier .clientSupplier (),
233+ tickPeriod ,
234+ ackTimeout ,
235+ missedAcks );
236+
237+ return Plugins
238+ .CLIENT_REACTIVE_SOCKET_INTERCEPTOR
239+ .apply (rSocketClient )
240+ .then (wrappedClientRSocket -> {
241+ RSocket unwrappedServerSocket = acceptor .get ().apply (wrappedClientRSocket );
242+ return Plugins
243+ .SERVER_REACTIVE_SOCKET_INTERCEPTOR
244+ .apply (unwrappedServerSocket )
245+ .doOnNext (rSocket ->
246+ new RSocketServer (
247+ multiplexer .asServerConnection (),
248+ rSocket ,
249+ errorConsumer )
250+ )
251+ .then (connection .sendOne (setupFrame ))
252+ .then (Mono .just (wrappedClientRSocket ));
253+ });
254+ });
246255 }
247256 }
248257 }
249258
250259 class ServerRSocketFactory implements
251- Acceptor <ServerTransport , SocketAcceptor , Closeable >,
252- Fragmentation <ServerRSocketFactory , ServerTransport , SocketAcceptor , Closeable >,
253- ErrorConsumer <ServerRSocketFactory , ServerTransport , SocketAcceptor , Closeable > {
260+ Acceptor <ServerTransport , SocketAcceptor , Closeable >,
261+ Fragmentation <ServerRSocketFactory , ServerTransport , SocketAcceptor , Closeable >,
262+ ErrorConsumer <ServerRSocketFactory , ServerTransport , SocketAcceptor , Closeable > {
254263
255264 private Supplier <SocketAcceptor > acceptor ;
256265 private Supplier <io .rsocket .transport .ServerTransport > transportServer ;
@@ -290,48 +299,48 @@ private class ServerStart implements Start {
290299 @ Override
291300 public Mono <Closeable > start () {
292301 return transportServer
293- .get ()
294- .start (connection -> {
295- ClientServerInputMultiplexer multiplexer ;
296- if (mtu > 0 ) {
297- multiplexer = new ClientServerInputMultiplexer (new FragmentationDuplexConnection (connection , mtu ));
298- } else {
299- multiplexer = new ClientServerInputMultiplexer (connection );
300- }
301-
302- return multiplexer
303- .asStreamZeroConnection ()
304- .receive ()
305- .next ()
306- .then (setupFrame -> {
307- ConnectionSetupPayload setupPayload = ConnectionSetupPayload .create (setupFrame );
308-
309- RSocketClient rSocketClient
310- = new RSocketClient (
311- multiplexer .asServerConnection (),
312- errorConsumer ,
313- StreamIdSupplier .serverSupplier ());
314-
315- Mono <RSocket > wrappedRSocketClient
316- = Plugins
317- .CLIENT_REACTIVE_SOCKET_INTERCEPTOR
318- .apply (rSocketClient );
319-
320- return wrappedRSocketClient
321- .then (sender ->
322- acceptor
323- .get ()
324- .accept (setupPayload , sender )
325- .then (Plugins .SERVER_REACTIVE_SOCKET_INTERCEPTOR ::apply )
326- )
327- .map (handler ->
328- new RSocketServer (
329- multiplexer .asClientConnection (),
330- handler ,
331- errorConsumer ))
332- .then ();
333- });
334- });
302+ .get ()
303+ .start (connection -> {
304+ ClientServerInputMultiplexer multiplexer ;
305+ if (mtu > 0 ) {
306+ multiplexer = new ClientServerInputMultiplexer (new FragmentationDuplexConnection (connection , mtu ));
307+ } else {
308+ multiplexer = new ClientServerInputMultiplexer (connection );
309+ }
310+
311+ return multiplexer
312+ .asStreamZeroConnection ()
313+ .receive ()
314+ .next ()
315+ .then (setupFrame -> {
316+ ConnectionSetupPayload setupPayload = ConnectionSetupPayload .create (setupFrame );
317+
318+ RSocketClient rSocketClient
319+ = new RSocketClient (
320+ multiplexer .asServerConnection (),
321+ errorConsumer ,
322+ StreamIdSupplier .serverSupplier ());
323+
324+ Mono <RSocket > wrappedRSocketClient
325+ = Plugins
326+ .CLIENT_REACTIVE_SOCKET_INTERCEPTOR
327+ .apply (rSocketClient );
328+
329+ return wrappedRSocketClient
330+ .then (sender ->
331+ acceptor
332+ .get ()
333+ .accept (setupPayload , sender )
334+ .then (Plugins .SERVER_REACTIVE_SOCKET_INTERCEPTOR ::apply )
335+ )
336+ .map (handler ->
337+ new RSocketServer (
338+ multiplexer .asClientConnection (),
339+ handler ,
340+ errorConsumer ))
341+ .then ();
342+ });
343+ });
335344
336345 }
337346 }
0 commit comments