Skip to content

Commit 070cffe

Browse files
authored
Track Discarded Payloads (rsocket#777)
* provides extra hooks to ensure we capture all discarded elements Signed-off-by: Oleh Dokuka <[email protected]> * provides leaks tracking tooling Signed-off-by: Oleh Dokuka <[email protected]> * provides leaks tracking tests and tooling Signed-off-by: Oleh Dokuka <[email protected]> * more tests Signed-off-by: Oleh Dokuka <[email protected]> * provides mechanism for terminates queue on calling clear Signed-off-by: Oleh Dokuka <[email protected]> * provides workaround for FluxPublishOn to ensure that all elements are released in case of racing Signed-off-by: Oleh Dokuka <[email protected]> * provides more tests part of the tests are on racing (ignored for now) another few on verification that elements are discarded properly Signed-off-by: Oleh Dokuka <[email protected]> * provide fixes to RequestChannel responder and related tests. Ensures there is no leaks in RSocketRequesterTest and RSocketResponder tests Signed-off-by: Oleh Dokuka <[email protected]> * tries to migrate to junit 5 Signed-off-by: Oleh Dokuka <[email protected]> * fixes leaks in tests Signed-off-by: Oleh Dokuka <[email protected]> * optimizes discarded/dropped BB consumption and releasing Signed-off-by: Oleh Dokuka <[email protected]> * fixes javadocs Signed-off-by: Oleh Dokuka <[email protected]> * removes hooks from Decoder Signed-off-by: Oleh Dokuka <[email protected]> * fixes format Signed-off-by: Oleh Dokuka <[email protected]> * rollbacks some fixes that should be delivered separately Signed-off-by: Oleh Dokuka <[email protected]> * rollbacks some build.gradle refactoring Signed-off-by: Oleh Dokuka <[email protected]> * fixes test Signed-off-by: Oleh Dokuka <[email protected]> * fixes test Signed-off-by: Oleh Dokuka <[email protected]>
1 parent c05eb42 commit 070cffe

File tree

8 files changed

+1049
-102
lines changed

8 files changed

+1049
-102
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222

2323
import io.netty.buffer.ByteBuf;
2424
import io.netty.buffer.ByteBufAllocator;
25+
import io.netty.util.IllegalReferenceCountException;
2526
import io.netty.util.ReferenceCountUtil;
27+
import io.netty.util.ReferenceCounted;
2628
import io.netty.util.collection.IntObjectMap;
2729
import io.rsocket.DuplexConnection;
2830
import io.rsocket.Payload;
@@ -77,6 +79,16 @@ class RSocketRequester implements RSocket {
7779
AtomicReferenceFieldUpdater.newUpdater(
7880
RSocketRequester.class, Throwable.class, "terminationError");
7981
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
82+
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
83+
referenceCounted -> {
84+
if (referenceCounted.refCnt() > 0) {
85+
try {
86+
referenceCounted.release();
87+
} catch (IllegalReferenceCountException e) {
88+
// ignored
89+
}
90+
}
91+
};
8092

8193
static {
8294
CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
@@ -259,7 +271,7 @@ public void doOnTerminal(
259271
});
260272
receivers.put(streamId, receiver);
261273

262-
return receiver;
274+
return receiver.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
263275
}
264276

265277
private Flux<Payload> handleRequestStream(final Payload payload) {
@@ -323,7 +335,8 @@ public void accept(long n) {
323335
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
324336
}
325337
})
326-
.doFinally(s -> removeStreamReceiver(streamId));
338+
.doFinally(s -> removeStreamReceiver(streamId))
339+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
327340
}
328341

329342
private Flux<Payload> handleChannel(Flux<Payload> request) {
@@ -424,7 +437,10 @@ public void accept(long n) {
424437
senders.put(streamId, upstreamSubscriber);
425438
receivers.put(streamId, receiver);
426439

427-
inboundFlux.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(upstreamSubscriber);
440+
inboundFlux
441+
.limitRate(Queues.SMALL_BUFFER_SIZE)
442+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
443+
.subscribe(upstreamSubscriber);
428444
if (!payloadReleasedFlag.getAndSet(true)) {
429445
ByteBuf frame =
430446
RequestChannelFrameFlyweight.encode(
@@ -461,7 +477,8 @@ public void accept(long n) {
461477
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
462478
upstreamSubscriber.cancel();
463479
}
464-
});
480+
})
481+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
465482
}
466483

467484
private Mono<Void> handleMetadataPush(Payload payload) {

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222
import io.netty.buffer.ByteBufAllocator;
23+
import io.netty.util.IllegalReferenceCountException;
2324
import io.netty.util.ReferenceCountUtil;
25+
import io.netty.util.ReferenceCounted;
2426
import io.netty.util.collection.IntObjectMap;
2527
import io.rsocket.DuplexConnection;
2628
import io.rsocket.Payload;
@@ -45,6 +47,16 @@
4547

4648
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
4749
class RSocketResponder implements ResponderRSocket {
50+
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
51+
referenceCounted -> {
52+
if (referenceCounted.refCnt() > 0) {
53+
try {
54+
referenceCounted.release();
55+
} catch (IllegalReferenceCountException e) {
56+
// ignored
57+
}
58+
}
59+
};
4860

4961
private final DuplexConnection connection;
5062
private final RSocket requestHandler;
@@ -418,7 +430,7 @@ protected void hookFinally(SignalType type) {
418430
};
419431

420432
sendingSubscriptions.put(streamId, subscriber);
421-
response.subscribe(subscriber);
433+
response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(subscriber);
422434
}
423435

424436
private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
@@ -471,7 +483,10 @@ protected void hookFinally(SignalType type) {
471483
};
472484

473485
sendingSubscriptions.put(streamId, subscriber);
474-
response.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(subscriber);
486+
response
487+
.limitRate(Queues.SMALL_BUFFER_SIZE)
488+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
489+
.subscribe(subscriber);
475490
}
476491

477492
private void handleChannel(int streamId, Payload payload, int initialRequestN) {
@@ -499,7 +514,8 @@ public void accept(long l) {
499514
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
500515
}
501516
})
502-
.doFinally(signalType -> channelProcessors.remove(streamId));
517+
.doFinally(signalType -> channelProcessors.remove(streamId))
518+
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
503519

504520
// not chained, as the payload should be enqueued in the Unicast processor before this method
505521
// returns

rsocket-core/src/main/java/io/rsocket/util/CharByteBufUtil.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private static char[] checkCharSequenceBounds(char[] seq, int start, int end) {
9999
}
100100

101101
/**
102-
* Encode a {@link char[]} in <a href="http://en.wikipedia.org/wiki/UTF-8">UTF-8</a> and write it
102+
* Encode a {@code char[]} in <a href="http://en.wikipedia.org/wiki/UTF-8">UTF-8</a> and write it
103103
* into {@link ByteBuf}.
104104
*
105105
* <p>This method returns the actual number of bytes written.
@@ -109,9 +109,8 @@ public static int writeUtf8(ByteBuf buf, char[] seq) {
109109
}
110110

111111
/**
112-
* Equivalent to <code>{@link #writeUtf8(ByteBuf, char[])
113-
* writeUtf8(buf, seq.subSequence(start, end), reserveBytes)}</code> but avoids subsequence object
114-
* allocation if possible.
112+
* Equivalent to {@link #writeUtf8(ByteBuf, char[]) writeUtf8(buf, seq.subSequence(start, end),
113+
* reserveBytes)} but avoids subsequence object allocation if possible.
115114
*
116115
* @return actual number of bytes written
117116
*/
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package io.rsocket.buffer;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.buffer.CompositeByteBuf;
6+
import java.util.List;
7+
import java.util.concurrent.ConcurrentLinkedQueue;
8+
import org.assertj.core.api.Assertions;
9+
10+
/**
11+
* Additional Utils which allows to decorate a ByteBufAllocator and track/assertOnLeaks all created
12+
* ByteBuffs
13+
*/
14+
public class LeaksTrackingByteBufAllocator implements ByteBufAllocator {
15+
16+
/**
17+
* Allows to instrument any given the instance of ByteBufAllocator
18+
*
19+
* @param allocator
20+
* @return
21+
*/
22+
public static LeaksTrackingByteBufAllocator instrument(ByteBufAllocator allocator) {
23+
return new LeaksTrackingByteBufAllocator(allocator);
24+
}
25+
26+
final ConcurrentLinkedQueue<ByteBuf> tracker = new ConcurrentLinkedQueue<>();
27+
28+
final ByteBufAllocator delegate;
29+
30+
private LeaksTrackingByteBufAllocator(ByteBufAllocator delegate) {
31+
this.delegate = delegate;
32+
}
33+
34+
public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
35+
try {
36+
Assertions.assertThat(tracker)
37+
.allSatisfy(
38+
buf -> {
39+
if (buf instanceof CompositeByteBuf) {
40+
if (buf.refCnt() > 0) {
41+
List<ByteBuf> decomposed =
42+
((CompositeByteBuf) buf).decompose(0, buf.readableBytes());
43+
for (int i = 0; i < decomposed.size(); i++) {
44+
Assertions.assertThat(decomposed.get(i))
45+
.matches(bb -> bb.refCnt() == 0, "Got unreleased CompositeByteBuf");
46+
}
47+
}
48+
49+
} else {
50+
Assertions.assertThat(buf)
51+
.matches(bb -> bb.refCnt() == 0, "buffer should be released");
52+
}
53+
});
54+
} finally {
55+
tracker.clear();
56+
}
57+
return this;
58+
}
59+
60+
// Delegating logic with tracking of buffers
61+
62+
@Override
63+
public ByteBuf buffer() {
64+
return track(delegate.buffer());
65+
}
66+
67+
@Override
68+
public ByteBuf buffer(int initialCapacity) {
69+
return track(delegate.buffer(initialCapacity));
70+
}
71+
72+
@Override
73+
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
74+
return track(delegate.buffer(initialCapacity, maxCapacity));
75+
}
76+
77+
@Override
78+
public ByteBuf ioBuffer() {
79+
return track(delegate.ioBuffer());
80+
}
81+
82+
@Override
83+
public ByteBuf ioBuffer(int initialCapacity) {
84+
return track(delegate.ioBuffer(initialCapacity));
85+
}
86+
87+
@Override
88+
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
89+
return track(delegate.ioBuffer(initialCapacity, maxCapacity));
90+
}
91+
92+
@Override
93+
public ByteBuf heapBuffer() {
94+
return track(delegate.heapBuffer());
95+
}
96+
97+
@Override
98+
public ByteBuf heapBuffer(int initialCapacity) {
99+
return track(delegate.heapBuffer(initialCapacity));
100+
}
101+
102+
@Override
103+
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
104+
return track(delegate.heapBuffer(initialCapacity, maxCapacity));
105+
}
106+
107+
@Override
108+
public ByteBuf directBuffer() {
109+
return track(delegate.directBuffer());
110+
}
111+
112+
@Override
113+
public ByteBuf directBuffer(int initialCapacity) {
114+
return track(delegate.directBuffer(initialCapacity));
115+
}
116+
117+
@Override
118+
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
119+
return track(delegate.directBuffer(initialCapacity, maxCapacity));
120+
}
121+
122+
@Override
123+
public CompositeByteBuf compositeBuffer() {
124+
return track(delegate.compositeBuffer());
125+
}
126+
127+
@Override
128+
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
129+
return track(delegate.compositeBuffer(maxNumComponents));
130+
}
131+
132+
@Override
133+
public CompositeByteBuf compositeHeapBuffer() {
134+
return track(delegate.compositeHeapBuffer());
135+
}
136+
137+
@Override
138+
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
139+
return track(delegate.compositeHeapBuffer(maxNumComponents));
140+
}
141+
142+
@Override
143+
public CompositeByteBuf compositeDirectBuffer() {
144+
return track(delegate.compositeDirectBuffer());
145+
}
146+
147+
@Override
148+
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
149+
return track(delegate.compositeDirectBuffer(maxNumComponents));
150+
}
151+
152+
@Override
153+
public boolean isDirectBufferPooled() {
154+
return delegate.isDirectBufferPooled();
155+
}
156+
157+
@Override
158+
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
159+
return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
160+
}
161+
162+
<T extends ByteBuf> T track(T buffer) {
163+
tracker.offer(buffer);
164+
165+
return buffer;
166+
}
167+
}

rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package io.rsocket.core;
1818

19+
import io.netty.buffer.ByteBufAllocator;
1920
import io.rsocket.RSocket;
21+
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
2022
import io.rsocket.test.util.TestDuplexConnection;
2123
import io.rsocket.test.util.TestSubscriber;
2224
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -32,6 +34,7 @@ public abstract class AbstractSocketRule<T extends RSocket> extends ExternalReso
3234
protected Subscriber<Void> connectSub;
3335
protected T socket;
3436
protected ConcurrentLinkedQueue<Throwable> errors;
37+
protected LeaksTrackingByteBufAllocator allocator;
3538

3639
@Override
3740
public Statement apply(final Statement base, Description description) {
@@ -41,21 +44,30 @@ public void evaluate() throws Throwable {
4144
connection = new TestDuplexConnection();
4245
connectSub = TestSubscriber.create();
4346
errors = new ConcurrentLinkedQueue<>();
47+
allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
4448
init();
4549
base.evaluate();
4650
}
4751
};
4852
}
4953

5054
protected void init() {
51-
socket = newRSocket();
55+
socket = newRSocket(allocator);
5256
}
5357

54-
protected abstract T newRSocket();
58+
protected abstract T newRSocket(LeaksTrackingByteBufAllocator allocator);
5559

5660
public void assertNoConnectionErrors() {
5761
if (errors.size() > 1) {
5862
Assert.fail("No connection errors expected: " + errors.peek().toString());
5963
}
6064
}
65+
66+
public ByteBufAllocator alloc() {
67+
return allocator;
68+
}
69+
70+
public void assertHasNoLeaks() {
71+
allocator.assertHasNoLeaks();
72+
}
6173
}

0 commit comments

Comments
 (0)