Skip to content

Commit a8f5a75

Browse files
mostroverkhovrobertroeser
authored andcommitted
Rename internal components to match their purpose (rsocket#643)
* Rename internal components to match their purpose Introduce addRequesterPlugin and addHandlerPlugin to RSocketFactory instead of addClientPlugin, addServerPlugin; derprecate latter Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 312f4ac commit a8f5a75

10 files changed

Lines changed: 119 additions & 71 deletions

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -134,14 +134,25 @@ public ClientRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor inte
134134
plugins.addConnectionPlugin(interceptor);
135135
return this;
136136
}
137-
137+
/** Deprecated. Use {@link #addRequesterPlugin(RSocketInterceptor)} instead */
138+
@Deprecated
138139
public ClientRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
139-
plugins.addClientPlugin(interceptor);
140+
return addRequesterPlugin(interceptor);
141+
}
142+
143+
public ClientRSocketFactory addRequesterPlugin(RSocketInterceptor interceptor) {
144+
plugins.addRequesterPlugin(interceptor);
140145
return this;
141146
}
142147

148+
/** Deprecated. Use {@link #addResponderPlugin(RSocketInterceptor)} instead */
149+
@Deprecated
143150
public ClientRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
144-
plugins.addServerPlugin(interceptor);
151+
return addResponderPlugin(interceptor);
152+
}
153+
154+
public ClientRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
155+
plugins.addResponderPlugin(interceptor);
145156
return this;
146157
}
147158

@@ -291,8 +302,8 @@ public Mono<RSocket> start() {
291302
ClientServerInputMultiplexer multiplexer =
292303
new ClientServerInputMultiplexer(wrappedConnection, plugins);
293304

294-
RSocketClient rSocketClient =
295-
new RSocketClient(
305+
RSocketRequester rSocketRequester =
306+
new RSocketRequester(
296307
allocator,
297308
multiplexer.asClientConnection(),
298309
payloadDecoder,
@@ -314,27 +325,27 @@ public Mono<RSocket> start() {
314325
setupPayload.sliceMetadata(),
315326
setupPayload.sliceData());
316327

317-
RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
328+
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
318329

319-
RSocket unwrappedServerSocket;
330+
RSocket rSocketHandler;
320331
if (biAcceptor != null) {
321332
ConnectionSetupPayload setup = ConnectionSetupPayload.create(setupFrame);
322-
unwrappedServerSocket = biAcceptor.apply(setup, wrappedRSocketClient);
333+
rSocketHandler = biAcceptor.apply(setup, wrappedRSocketRequester);
323334
} else {
324-
unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient);
335+
rSocketHandler = acceptor.get().apply(wrappedRSocketRequester);
325336
}
326337

327-
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
338+
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
328339

329-
RSocketServer rSocketServer =
330-
new RSocketServer(
340+
RSocketResponder rSocketResponder =
341+
new RSocketResponder(
331342
allocator,
332343
multiplexer.asServerConnection(),
333-
wrappedRSocketServer,
344+
wrappedRSocketHandler,
334345
payloadDecoder,
335346
errorConsumer);
336347

337-
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketClient);
348+
return wrappedConnection.sendOne(setupFrame).thenReturn(wrappedRSocketRequester);
338349
});
339350
}
340351

@@ -397,14 +408,25 @@ public ServerRSocketFactory addConnectionPlugin(DuplexConnectionInterceptor inte
397408
plugins.addConnectionPlugin(interceptor);
398409
return this;
399410
}
400-
411+
/** Deprecated. Use {@link #addRequesterPlugin(RSocketInterceptor)} instead */
412+
@Deprecated
401413
public ServerRSocketFactory addClientPlugin(RSocketInterceptor interceptor) {
402-
plugins.addClientPlugin(interceptor);
414+
return addRequesterPlugin(interceptor);
415+
}
416+
417+
public ServerRSocketFactory addRequesterPlugin(RSocketInterceptor interceptor) {
418+
plugins.addRequesterPlugin(interceptor);
403419
return this;
404420
}
405421

422+
/** Deprecated. Use {@link #addResponderPlugin(RSocketInterceptor)} instead */
423+
@Deprecated
406424
public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
407-
plugins.addServerPlugin(interceptor);
425+
return addResponderPlugin(interceptor);
426+
}
427+
428+
public ServerRSocketFactory addResponderPlugin(RSocketInterceptor interceptor) {
429+
plugins.addResponderPlugin(interceptor);
408430
return this;
409431
}
410432

@@ -525,29 +547,29 @@ private Mono<Void> acceptSetup(
525547
(keepAliveHandler, wrappedMultiplexer) -> {
526548
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create(setupFrame);
527549

528-
RSocketClient rSocketClient =
529-
new RSocketClient(
550+
RSocketRequester rSocketRequester =
551+
new RSocketRequester(
530552
allocator,
531553
wrappedMultiplexer.asServerConnection(),
532554
payloadDecoder,
533555
errorConsumer,
534556
StreamIdSupplier.serverSupplier());
535557

536-
RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
558+
RSocket wrappedRSocketRequester = plugins.applyRequester(rSocketRequester);
537559

538560
return acceptor
539-
.accept(setupPayload, wrappedRSocketClient)
561+
.accept(setupPayload, wrappedRSocketRequester)
540562
.onErrorResume(
541563
err -> sendError(multiplexer, rejectedSetupError(err)).then(Mono.error(err)))
542564
.doOnNext(
543-
unwrappedServerSocket -> {
544-
RSocket wrappedRSocketServer = plugins.applyServer(unwrappedServerSocket);
565+
rSocketHandler -> {
566+
RSocket wrappedRSocketHandler = plugins.applyResponder(rSocketHandler);
545567

546-
RSocketServer rSocketServer =
547-
new RSocketServer(
568+
RSocketResponder rSocketResponder =
569+
new RSocketResponder(
548570
allocator,
549571
wrappedMultiplexer.asClientConnection(),
550-
wrappedRSocketServer,
572+
wrappedRSocketHandler,
551573
payloadDecoder,
552574
errorConsumer,
553575
setupPayload.keepAliveInterval(),

rsocket-core/src/main/java/io/rsocket/RSocketClient.java renamed to rsocket-core/src/main/java/io/rsocket/RSocketRequester.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,10 @@
4545
import org.reactivestreams.Subscriber;
4646
import reactor.core.publisher.*;
4747

48-
/** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */
49-
class RSocketClient implements RSocket {
48+
/**
49+
* Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer
50+
*/
51+
class RSocketRequester implements RSocket {
5052

5153
private final DuplexConnection connection;
5254
private final PayloadDecoder payloadDecoder;
@@ -60,7 +62,7 @@ class RSocketClient implements RSocket {
6062
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
6163

6264
/*client requester*/
63-
RSocketClient(
65+
RSocketRequester(
6466
ByteBufAllocator allocator,
6567
DuplexConnection connection,
6668
PayloadDecoder payloadDecoder,
@@ -99,7 +101,7 @@ class RSocketClient implements RSocket {
99101
}
100102

101103
/*server requester*/
102-
RSocketClient(
104+
RSocketRequester(
103105
ByteBufAllocator allocator,
104106
DuplexConnection connection,
105107
PayloadDecoder payloadDecoder,

rsocket-core/src/main/java/io/rsocket/RSocketServer.java renamed to rsocket-core/src/main/java/io/rsocket/RSocketResponder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import reactor.core.Exceptions;
4444
import reactor.core.publisher.*;
4545

46-
/** Server side RSocket. Receives {@link ByteBuf}s from a {@link RSocketClient} */
47-
class RSocketServer implements ResponderRSocket {
46+
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
47+
class RSocketResponder implements ResponderRSocket {
4848

4949
private final DuplexConnection connection;
5050
private final RSocket requestHandler;
@@ -61,7 +61,7 @@ class RSocketServer implements ResponderRSocket {
6161
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
6262

6363
/*client responder*/
64-
RSocketServer(
64+
RSocketResponder(
6565
ByteBufAllocator allocator,
6666
DuplexConnection connection,
6767
RSocket requestHandler,
@@ -71,7 +71,7 @@ class RSocketServer implements ResponderRSocket {
7171
}
7272

7373
/*server responder*/
74-
RSocketServer(
74+
RSocketResponder(
7575
ByteBufAllocator allocator,
7676
DuplexConnection connection,
7777
RSocket requestHandler,

rsocket-core/src/main/java/io/rsocket/plugins/PluginRegistry.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,63 @@
2323

2424
public class PluginRegistry {
2525
private List<DuplexConnectionInterceptor> connections = new ArrayList<>();
26-
private List<RSocketInterceptor> clients = new ArrayList<>();
27-
private List<RSocketInterceptor> servers = new ArrayList<>();
26+
private List<RSocketInterceptor> requesters = new ArrayList<>();
27+
private List<RSocketInterceptor> responders = new ArrayList<>();
2828

2929
public PluginRegistry() {}
3030

3131
public PluginRegistry(PluginRegistry defaults) {
3232
this.connections.addAll(defaults.connections);
33-
this.clients.addAll(defaults.clients);
34-
this.servers.addAll(defaults.servers);
33+
this.requesters.addAll(defaults.requesters);
34+
this.responders.addAll(defaults.responders);
3535
}
3636

3737
public void addConnectionPlugin(DuplexConnectionInterceptor interceptor) {
3838
connections.add(interceptor);
3939
}
4040

41+
/** Deprecated. Use {@link #addRequesterPlugin(RSocketInterceptor)} instead */
42+
@Deprecated
4143
public void addClientPlugin(RSocketInterceptor interceptor) {
42-
clients.add(interceptor);
44+
addRequesterPlugin(interceptor);
4345
}
4446

47+
public void addRequesterPlugin(RSocketInterceptor interceptor) {
48+
requesters.add(interceptor);
49+
}
50+
51+
/** Deprecated. Use {@link #addResponderPlugin(RSocketInterceptor)} instead */
52+
@Deprecated
4553
public void addServerPlugin(RSocketInterceptor interceptor) {
46-
servers.add(interceptor);
54+
addResponderPlugin(interceptor);
55+
}
56+
57+
public void addResponderPlugin(RSocketInterceptor interceptor) {
58+
responders.add(interceptor);
4759
}
4860

61+
/** Deprecated. Use {@link #applyRequester(RSocket)} instead */
62+
@Deprecated
4963
public RSocket applyClient(RSocket rSocket) {
50-
for (RSocketInterceptor i : clients) {
64+
return applyRequester(rSocket);
65+
}
66+
67+
public RSocket applyRequester(RSocket rSocket) {
68+
for (RSocketInterceptor i : requesters) {
5169
rSocket = i.apply(rSocket);
5270
}
5371

5472
return rSocket;
5573
}
5674

75+
/** Deprecated. Use {@link #applyResponder(RSocket)} instead */
76+
@Deprecated
5777
public RSocket applyServer(RSocket rSocket) {
58-
for (RSocketInterceptor i : servers) {
78+
return applyResponder(rSocket);
79+
}
80+
81+
public RSocket applyResponder(RSocket rSocket) {
82+
for (RSocketInterceptor i : responders) {
5983
rSocket = i.apply(rSocket);
6084
}
6185

rsocket-core/src/test/java/io/rsocket/KeepAliveTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ static Supplier<RSocketState> requester(int tickPeriod, int timeout) {
6565
return () -> {
6666
TestDuplexConnection connection = new TestDuplexConnection();
6767
Errors errors = new Errors();
68-
RSocketClient rSocket =
69-
new RSocketClient(
68+
RSocketRequester rSocket =
69+
new RSocketRequester(
7070
ByteBufAllocator.DEFAULT,
7171
connection,
7272
DefaultPayload::create,
@@ -84,8 +84,8 @@ static Supplier<RSocketState> responder(int tickPeriod, int timeout) {
8484
TestDuplexConnection connection = new TestDuplexConnection();
8585
AbstractRSocket handler = new AbstractRSocket() {};
8686
Errors errors = new Errors();
87-
RSocketServer rSocket =
88-
new RSocketServer(
87+
RSocketResponder rSocket =
88+
new RSocketResponder(
8989
ByteBufAllocator.DEFAULT,
9090
connection,
9191
handler,
@@ -110,8 +110,8 @@ static Supplier<ResumableRSocketState> resumableRequester(int tickPeriod, int ti
110110
false);
111111

112112
Errors errors = new Errors();
113-
RSocketClient rSocket =
114-
new RSocketClient(
113+
RSocketRequester rSocket =
114+
new RSocketRequester(
115115
ByteBufAllocator.DEFAULT,
116116
resumableConnection,
117117
DefaultPayload::create,
@@ -136,8 +136,8 @@ static Supplier<ResumableRSocketState> resumableResponder(int tickPeriod, int ti
136136
Duration.ofSeconds(10),
137137
false);
138138
Errors errors = new Errors();
139-
RSocketServer rSocket =
140-
new RSocketServer(
139+
RSocketResponder rSocket =
140+
new RSocketResponder(
141141
ByteBufAllocator.DEFAULT,
142142
resumableConnection,
143143
handler,

rsocket-core/src/test/java/io/rsocket/RSocketClientTerminationTest.java renamed to rsocket-core/src/test/java/io/rsocket/RSocketRequesterTerminationTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.rsocket;
22

3-
import io.rsocket.RSocketClientTest.ClientSocketRule;
3+
import io.rsocket.RSocketRequesterTest.ClientSocketRule;
44
import io.rsocket.util.EmptyPayload;
55
import java.nio.channels.ClosedChannelException;
66
import java.time.Duration;
@@ -16,18 +16,18 @@
1616
import reactor.test.StepVerifier;
1717

1818
@RunWith(Parameterized.class)
19-
public class RSocketClientTerminationTest {
19+
public class RSocketRequesterTerminationTest {
2020

2121
@Rule public final ClientSocketRule rule = new ClientSocketRule();
2222
private Function<RSocket, ? extends Publisher<?>> interaction;
2323

24-
public RSocketClientTerminationTest(Function<RSocket, ? extends Publisher<?>> interaction) {
24+
public RSocketRequesterTerminationTest(Function<RSocket, ? extends Publisher<?>> interaction) {
2525
this.interaction = interaction;
2626
}
2727

2828
@Test
2929
public void testCurrentStreamIsTerminatedOnConnectionClose() {
30-
RSocketClient rSocket = rule.socket;
30+
RSocketRequester rSocket = rule.socket;
3131

3232
Mono.delay(Duration.ofSeconds(1)).doOnNext(v -> rule.connection.dispose()).subscribe();
3333

@@ -38,7 +38,7 @@ public void testCurrentStreamIsTerminatedOnConnectionClose() {
3838

3939
@Test
4040
public void testSubsequentStreamIsTerminatedAfterConnectionClose() {
41-
RSocketClient rSocket = rule.socket;
41+
RSocketRequester rSocket = rule.socket;
4242

4343
rule.connection.dispose();
4444
StepVerifier.create(interaction.apply(rSocket))

rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java renamed to rsocket-core/src/test/java/io/rsocket/RSocketRequesterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import reactor.core.publisher.MonoProcessor;
4848
import reactor.core.publisher.UnicastProcessor;
4949

50-
public class RSocketClientTest {
50+
public class RSocketRequesterTest {
5151

5252
@Rule public final ClientSocketRule rule = new ClientSocketRule();
5353

@@ -223,10 +223,10 @@ public int sendRequestResponse(Publisher<Payload> response) {
223223
return streamId;
224224
}
225225

226-
public static class ClientSocketRule extends AbstractSocketRule<RSocketClient> {
226+
public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester> {
227227
@Override
228-
protected RSocketClient newRSocket() {
229-
return new RSocketClient(
228+
protected RSocketRequester newRSocket() {
229+
return new RSocketRequester(
230230
ByteBufAllocator.DEFAULT,
231231
connection,
232232
DefaultPayload::create,

0 commit comments

Comments
 (0)