Skip to content

Commit 414193f

Browse files
Send root-cause errors from connection to onClose of RSocket (#797)
Co-Authored-By: Rossen Stoyanchev <[email protected]>
1 parent b3087ef commit 414193f

5 files changed

Lines changed: 80 additions & 83 deletions

File tree

rsocket-core/src/main/java/io/rsocket/Closeable.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package io.rsocket;
1818

19+
import org.reactivestreams.Subscriber;
1920
import reactor.core.Disposable;
2021
import reactor.core.publisher.Mono;
2122

22-
/** */
23+
/** An interface which allows listening to when a specific instance of this interface is closed */
2324
public interface Closeable extends Disposable {
2425
/**
25-
* Returns a {@code Publisher} that completes when this {@code RSocket} is closed. A {@code
26-
* RSocket} can be closed by explicitly calling {@link RSocket#dispose()} or when the underlying
27-
* transport connection is closed.
26+
* Returns a {@link Mono} that terminates when the instance is terminated by any reason. Note, in
27+
* case of error termination, the cause of error will be propagated as an error signal through
28+
* {@link org.reactivestreams.Subscriber#onError(Throwable)}. Otherwise, {@link
29+
* Subscriber#onComplete()} will be called.
2830
*
29-
* @return A {@code Publisher} that completes when this {@code RSocket} close is complete.
31+
* @return a {@link Mono} to track completion with success or error of the underlying resource.
32+
* When the underlying resource is an `RSocket`, the {@code Mono} exposes stream 0 (i.e.
33+
* connection level) errors.
3034
*/
3135
Mono<Void> onClose();
3236
}

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class RSocketConnector {
5050
private static final int MIN_MTU_SIZE = 64;
5151

5252
private static final BiConsumer<RSocket, Invalidatable> INVALIDATE_FUNCTION =
53-
(r, i) -> r.onClose().subscribe(null, null, i::invalidate);
53+
(r, i) -> r.onClose().subscribe(null, __ -> i.invalidate(), i::invalidate);
5454

5555
private Payload setupPayload = EmptyPayload.INSTANCE;
5656
private String metadataMimeType = "application/binary";

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import io.rsocket.lease.RequesterLeaseHandler;
5454
import io.rsocket.util.MonoLifecycleHandler;
5555
import java.nio.channels.ClosedChannelException;
56+
import java.util.concurrent.CancellationException;
5657
import java.util.concurrent.atomic.AtomicBoolean;
5758
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5859
import java.util.function.Consumer;
@@ -67,6 +68,7 @@
6768
import reactor.core.publisher.BaseSubscriber;
6869
import reactor.core.publisher.Flux;
6970
import reactor.core.publisher.Mono;
71+
import reactor.core.publisher.MonoProcessor;
7072
import reactor.core.publisher.SignalType;
7173
import reactor.core.publisher.UnicastProcessor;
7274
import reactor.util.concurrent.Queues;
@@ -106,6 +108,7 @@ class RSocketRequester implements RSocket {
106108
private final ByteBufAllocator allocator;
107109
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
108110
private volatile Throwable terminationError;
111+
private final MonoProcessor<Void> onClose;
109112

110113
RSocketRequester(
111114
DuplexConnection connection,
@@ -126,14 +129,15 @@ class RSocketRequester implements RSocket {
126129
this.leaseHandler = leaseHandler;
127130
this.senders = new SynchronizedIntObjectHashMap<>();
128131
this.receivers = new SynchronizedIntObjectHashMap<>();
132+
this.onClose = MonoProcessor.create();
129133

130134
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
131135
this.sendProcessor = new UnboundedProcessor<>();
132136

133137
connection
134138
.onClose()
135-
.doFinally(signalType -> tryTerminateOnConnectionClose())
136-
.subscribe(null, errorConsumer);
139+
.or(onClose)
140+
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
137141
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);
138142

139143
connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
@@ -181,17 +185,17 @@ public double availability() {
181185

182186
@Override
183187
public void dispose() {
184-
connection.dispose();
188+
tryTerminate(() -> new CancellationException("Disposed"));
185189
}
186190

187191
@Override
188192
public boolean isDisposed() {
189-
return connection.isDisposed();
193+
return onClose.isDisposed();
190194
}
191195

192196
@Override
193197
public Mono<Void> onClose() {
194-
return connection.onClose();
198+
return onClose;
195199
}
196200

197201
private Mono<Void> handleFireAndForget(Payload payload) {
@@ -619,6 +623,10 @@ private void tryTerminateOnKeepAlive(KeepAlive keepAlive) {
619623
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis())));
620624
}
621625

626+
private void tryTerminateOnConnectionError(Throwable e) {
627+
tryTerminate(() -> e);
628+
}
629+
622630
private void tryTerminateOnConnectionClose() {
623631
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
624632
}
@@ -627,16 +635,16 @@ private void tryTerminateOnZeroError(ByteBuf errorFrame) {
627635
tryTerminate(() -> Exceptions.from(0, errorFrame));
628636
}
629637

630-
private void tryTerminate(Supplier<Exception> errorSupplier) {
638+
private void tryTerminate(Supplier<Throwable> errorSupplier) {
631639
if (terminationError == null) {
632-
Exception e = errorSupplier.get();
640+
Throwable e = errorSupplier.get();
633641
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
634642
terminate(e);
635643
}
636644
}
637645
}
638646

639-
private void terminate(Exception e) {
647+
private void terminate(Throwable e) {
640648
connection.dispose();
641649
leaseHandler.dispose();
642650

@@ -668,6 +676,7 @@ private void terminate(Exception e) {
668676
receivers.clear();
669677
sendProcessor.dispose();
670678
errorConsumer.accept(e);
679+
onClose.onError(e);
671680
}
672681

673682
private void removeStreamReceiver(int streamId) {

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 49 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@
3434
import io.rsocket.internal.SynchronizedIntObjectHashMap;
3535
import io.rsocket.internal.UnboundedProcessor;
3636
import io.rsocket.lease.ResponderLeaseHandler;
37+
import java.nio.channels.ClosedChannelException;
38+
import java.util.concurrent.CancellationException;
39+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3740
import java.util.function.Consumer;
3841
import java.util.function.LongConsumer;
42+
import java.util.function.Supplier;
3943
import javax.annotation.Nullable;
4044
import org.reactivestreams.Processor;
4145
import org.reactivestreams.Publisher;
@@ -58,13 +62,21 @@ class RSocketResponder implements ResponderRSocket {
5862
}
5963
}
6064
};
65+
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
6166

6267
private final DuplexConnection connection;
6368
private final RSocket requestHandler;
6469
private final ResponderRSocket responderRSocket;
6570
private final PayloadDecoder payloadDecoder;
6671
private final Consumer<Throwable> errorConsumer;
6772
private final ResponderLeaseHandler leaseHandler;
73+
private final Disposable leaseHandlerDisposable;
74+
private final MonoProcessor<Void> onClose;
75+
76+
private volatile Throwable terminationError;
77+
private static final AtomicReferenceFieldUpdater<RSocketResponder, Throwable> TERMINATION_ERROR =
78+
AtomicReferenceFieldUpdater.newUpdater(
79+
RSocketResponder.class, Throwable.class, "terminationError");
6880

6981
private final int mtu;
7082

@@ -94,28 +106,21 @@ class RSocketResponder implements ResponderRSocket {
94106
this.leaseHandler = leaseHandler;
95107
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
96108
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
109+
this.onClose = MonoProcessor.create();
97110

98111
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
99112
// connections
100113
this.sendProcessor = new UnboundedProcessor<>();
101114

102-
connection
103-
.send(sendProcessor)
104-
.doFinally(this::handleSendProcessorCancel)
105-
.subscribe(null, this::handleSendProcessorError);
115+
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);
106116

107-
Disposable receiveDisposable = connection.receive().subscribe(this::handleFrame, errorConsumer);
108-
Disposable sendLeaseDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);
117+
connection.receive().subscribe(this::handleFrame, errorConsumer);
118+
leaseHandlerDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);
109119

110120
this.connection
111121
.onClose()
112-
.doFinally(
113-
s -> {
114-
cleanup();
115-
receiveDisposable.dispose();
116-
sendLeaseDisposable.dispose();
117-
})
118-
.subscribe(null, errorConsumer);
122+
.or(onClose)
123+
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
119124
}
120125

121126
private void handleSendProcessorError(Throwable t) {
@@ -142,32 +147,21 @@ private void handleSendProcessorError(Throwable t) {
142147
});
143148
}
144149

145-
private void handleSendProcessorCancel(SignalType t) {
146-
if (SignalType.ON_ERROR == t) {
147-
return;
148-
}
150+
private void tryTerminateOnConnectionError(Throwable e) {
151+
tryTerminate(() -> e);
152+
}
149153

150-
sendingSubscriptions
151-
.values()
152-
.forEach(
153-
subscription -> {
154-
try {
155-
subscription.cancel();
156-
} catch (Throwable e) {
157-
errorConsumer.accept(e);
158-
}
159-
});
154+
private void tryTerminateOnConnectionClose() {
155+
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
156+
}
160157

161-
channelProcessors
162-
.values()
163-
.forEach(
164-
subscription -> {
165-
try {
166-
subscription.onComplete();
167-
} catch (Throwable e) {
168-
errorConsumer.accept(e);
169-
}
170-
});
158+
private void tryTerminate(Supplier<Throwable> errorSupplier) {
159+
if (terminationError == null) {
160+
Throwable e = errorSupplier.get();
161+
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
162+
cleanup(e);
163+
}
164+
}
171165
}
172166

173167
@Override
@@ -250,23 +244,25 @@ public Mono<Void> metadataPush(Payload payload) {
250244

251245
@Override
252246
public void dispose() {
253-
connection.dispose();
247+
tryTerminate(() -> new CancellationException("Disposed"));
254248
}
255249

256250
@Override
257251
public boolean isDisposed() {
258-
return connection.isDisposed();
252+
return onClose.isDisposed();
259253
}
260254

261255
@Override
262256
public Mono<Void> onClose() {
263-
return connection.onClose();
257+
return onClose;
264258
}
265259

266-
private void cleanup() {
260+
private void cleanup(Throwable e) {
267261
cleanUpSendingSubscriptions();
268-
cleanUpChannelProcessors();
262+
cleanUpChannelProcessors(e);
269263

264+
connection.dispose();
265+
leaseHandlerDisposable.dispose();
270266
requestHandler.dispose();
271267
sendProcessor.dispose();
272268
}
@@ -276,8 +272,17 @@ private synchronized void cleanUpSendingSubscriptions() {
276272
sendingSubscriptions.clear();
277273
}
278274

279-
private synchronized void cleanUpChannelProcessors() {
280-
channelProcessors.values().forEach(Processor::onComplete);
275+
private synchronized void cleanUpChannelProcessors(Throwable e) {
276+
channelProcessors
277+
.values()
278+
.forEach(
279+
payloadPayloadProcessor -> {
280+
try {
281+
payloadPayloadProcessor.onError(e);
282+
} catch (Throwable t) {
283+
// noops
284+
}
285+
});
281286
channelProcessors.clear();
282287
}
283288

rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,11 @@
3535
import java.nio.channels.ClosedChannelException;
3636
import java.time.Duration;
3737
import java.util.concurrent.atomic.AtomicInteger;
38-
import java.util.function.Consumer;
3938
import org.assertj.core.api.Assertions;
4039
import org.junit.jupiter.api.Test;
4140
import org.reactivestreams.Publisher;
4241
import reactor.core.publisher.Flux;
4342
import reactor.core.publisher.Mono;
44-
import reactor.core.publisher.ReplayProcessor;
4543
import reactor.core.scheduler.Schedulers;
4644
import reactor.test.StepVerifier;
4745

@@ -105,7 +103,6 @@ public void reconnectOnMissingSession() {
105103

106104
DisconnectableClientTransport clientTransport =
107105
new DisconnectableClientTransport(clientTransport(closeable.address()));
108-
ErrorConsumer errorConsumer = new ErrorConsumer();
109106
int clientSessionDurationSeconds = 10;
110107

111108
RSocket rSocket = newClientRSocket(clientTransport, clientSessionDurationSeconds).block();
@@ -118,12 +115,11 @@ public void reconnectOnMissingSession() {
118115
.expectError()
119116
.verify(Duration.ofSeconds(5));
120117

121-
StepVerifier.create(errorConsumer.errors().next())
122-
.expectNextMatches(
118+
StepVerifier.create(rSocket.onClose())
119+
.expectErrorMatches(
123120
err ->
124121
err instanceof RejectedResumeException
125122
&& "unknown resume token".equals(err.getMessage()))
126-
.expectComplete()
127123
.verify(Duration.ofSeconds(5));
128124
}
129125

@@ -134,23 +130,19 @@ void serverMissingResume() {
134130
.bind(serverTransport(SERVER_HOST, SERVER_PORT))
135131
.block();
136132

137-
ErrorConsumer errorConsumer = new ErrorConsumer();
138-
139133
RSocket rSocket =
140134
RSocketConnector.create()
141135
.resume(new Resume())
142136
.connect(clientTransport(closeableChannel.address()))
143137
.block();
144138

145-
StepVerifier.create(errorConsumer.errors().next().doFinally(s -> closeableChannel.dispose()))
146-
.expectNextMatches(
139+
StepVerifier.create(rSocket.onClose().doFinally(s -> closeableChannel.dispose()))
140+
.expectErrorMatches(
147141
err ->
148142
err instanceof UnsupportedSetupException
149143
&& "resume not supported".equals(err.getMessage()))
150-
.expectComplete()
151144
.verify(Duration.ofSeconds(5));
152145

153-
StepVerifier.create(rSocket.onClose()).expectComplete().verify(Duration.ofSeconds(5));
154146
Assertions.assertThat(rSocket.isDisposed()).isTrue();
155147
}
156148

@@ -162,19 +154,6 @@ static ServerTransport<CloseableChannel> serverTransport(String host, int port)
162154
return TcpServerTransport.create(host, port);
163155
}
164156

165-
private static class ErrorConsumer implements Consumer<Throwable> {
166-
private final ReplayProcessor<Throwable> errors = ReplayProcessor.create();
167-
168-
public Flux<Throwable> errors() {
169-
return errors;
170-
}
171-
172-
@Override
173-
public void accept(Throwable throwable) {
174-
errors.onNext(throwable);
175-
}
176-
}
177-
178157
private static Flux<Payload> testRequest() {
179158
return Flux.interval(Duration.ofMillis(50))
180159
.map(v -> DefaultPayload.create("client_request"))

0 commit comments

Comments
 (0)