11package io .rsocket .buffer ;
22
3+ import static java .util .concurrent .locks .LockSupport .parkNanos ;
4+
35import io .netty .buffer .ByteBuf ;
46import io .netty .buffer .ByteBufAllocator ;
57import io .netty .buffer .CompositeByteBuf ;
8+ import io .netty .util .ResourceLeakDetector ;
9+ import java .lang .reflect .Field ;
10+ import java .time .Duration ;
11+ import java .util .ArrayList ;
12+ import java .util .Collections ;
13+ import java .util .Set ;
614import java .util .concurrent .ConcurrentLinkedQueue ;
715import org .assertj .core .api .Assertions ;
816
@@ -19,24 +27,89 @@ public class LeaksTrackingByteBufAllocator implements ByteBufAllocator {
1927 * @return
2028 */
2129 public static LeaksTrackingByteBufAllocator instrument (ByteBufAllocator allocator ) {
22- return new LeaksTrackingByteBufAllocator (allocator );
30+ return new LeaksTrackingByteBufAllocator (allocator , Duration .ZERO , "" );
31+ }
32+
33+ /**
34+ * Allows to instrument any given the instance of ByteBufAllocator
35+ *
36+ * @param allocator
37+ * @return
38+ */
39+ public static LeaksTrackingByteBufAllocator instrument (
40+ ByteBufAllocator allocator , Duration awaitZeroRefCntDuration , String tag ) {
41+ return new LeaksTrackingByteBufAllocator (allocator , awaitZeroRefCntDuration , tag );
2342 }
2443
2544 final ConcurrentLinkedQueue <ByteBuf > tracker = new ConcurrentLinkedQueue <>();
2645
2746 final ByteBufAllocator delegate ;
2847
29- private LeaksTrackingByteBufAllocator (ByteBufAllocator delegate ) {
48+ final Duration awaitZeroRefCntDuration ;
49+
50+ final String tag ;
51+
52+ private LeaksTrackingByteBufAllocator (
53+ ByteBufAllocator delegate , Duration awaitZeroRefCntDuration , String tag ) {
3054 this .delegate = delegate ;
55+ this .awaitZeroRefCntDuration = awaitZeroRefCntDuration ;
56+ this .tag = tag ;
3157 }
3258
3359 public LeaksTrackingByteBufAllocator assertHasNoLeaks () {
3460 try {
35- Assertions .assertThat (tracker )
36- .allSatisfy (
37- buf ->
38- Assertions .assertThat (buf )
39- .matches (bb -> bb .refCnt () == 0 , "buffer should be released" ));
61+ ArrayList <ByteBuf > unreleased = new ArrayList <>();
62+ for (ByteBuf bb : tracker ) {
63+ if (bb .refCnt () != 0 ) {
64+ unreleased .add (bb );
65+ }
66+ }
67+
68+ final Duration awaitZeroRefCntDuration = this .awaitZeroRefCntDuration ;
69+ if (!unreleased .isEmpty () && !awaitZeroRefCntDuration .isZero ()) {
70+ final long startTime = System .currentTimeMillis ();
71+ final long endTimeInMillis = startTime + awaitZeroRefCntDuration .toMillis ();
72+ boolean hasUnreleased ;
73+ while (System .currentTimeMillis () <= endTimeInMillis ) {
74+ hasUnreleased = false ;
75+ for (ByteBuf bb : unreleased ) {
76+ if (bb .refCnt () != 0 ) {
77+ hasUnreleased = true ;
78+ break ;
79+ }
80+ }
81+
82+ if (!hasUnreleased ) {
83+ System .out .println (tag + " all the buffers are released..." );
84+ return this ;
85+ }
86+
87+ System .out .println (tag + " await buffers to be released" );
88+ for (int i = 0 ; i < 100 ; i ++) {
89+ System .gc ();
90+ parkNanos (1000 );
91+ System .gc ();
92+ }
93+ }
94+ }
95+
96+ Assertions .assertThat (unreleased )
97+ .allMatch (
98+ bb -> {
99+ final boolean checkResult = bb .refCnt () == 0 ;
100+
101+ if (!checkResult ) {
102+ try {
103+ System .out .println (tag + " " + resolveTrackingInfo (bb ));
104+ } catch (Exception e ) {
105+ e .printStackTrace ();
106+ }
107+ }
108+
109+ return checkResult ;
110+ },
111+ tag );
112+ System .out .println (tag + " all the buffers are released..." );
40113 } finally {
41114 tracker .clear ();
42115 }
@@ -150,4 +223,60 @@ <T extends ByteBuf> T track(T buffer) {
150223
151224 return buffer ;
152225 }
226+
227+ static final Class <?> simpleLeakAwareCompositeByteBufClass ;
228+ static final Field leakFieldForComposite ;
229+ static final Class <?> simpleLeakAwareByteBufClass ;
230+ static final Field leakFieldForNormal ;
231+ static final Field allLeaksField ;
232+
233+ static {
234+ try {
235+ {
236+ final Class <?> aClass = Class .forName ("io.netty.buffer.SimpleLeakAwareCompositeByteBuf" );
237+ final Field leakField = aClass .getDeclaredField ("leak" );
238+
239+ leakField .setAccessible (true );
240+
241+ simpleLeakAwareCompositeByteBufClass = aClass ;
242+ leakFieldForComposite = leakField ;
243+ }
244+
245+ {
246+ final Class <?> aClass = Class .forName ("io.netty.buffer.SimpleLeakAwareByteBuf" );
247+ final Field leakField = aClass .getDeclaredField ("leak" );
248+
249+ leakField .setAccessible (true );
250+
251+ simpleLeakAwareByteBufClass = aClass ;
252+ leakFieldForNormal = leakField ;
253+ }
254+
255+ {
256+ final Class <?> aClass =
257+ Class .forName ("io.netty.util.ResourceLeakDetector$DefaultResourceLeak" );
258+ final Field field = aClass .getDeclaredField ("allLeaks" );
259+
260+ field .setAccessible (true );
261+
262+ allLeaksField = field ;
263+ }
264+ } catch (Exception e ) {
265+ throw new RuntimeException (e );
266+ }
267+ }
268+
269+ @ SuppressWarnings ("unchecked" )
270+ static Set <Object > resolveTrackingInfo (ByteBuf byteBuf ) throws Exception {
271+ if (ResourceLeakDetector .getLevel ().ordinal ()
272+ >= ResourceLeakDetector .Level .ADVANCED .ordinal ()) {
273+ if (simpleLeakAwareCompositeByteBufClass .isInstance (byteBuf )) {
274+ return (Set <Object >) allLeaksField .get (leakFieldForComposite .get (byteBuf ));
275+ } else if (simpleLeakAwareByteBufClass .isInstance (byteBuf )) {
276+ return (Set <Object >) allLeaksField .get (leakFieldForNormal .get (byteBuf ));
277+ }
278+ }
279+
280+ return Collections .emptySet ();
281+ }
153282}
0 commit comments