Skip to content

Commit e122454

Browse files
committed
improves RequestInterceptor API to have FrameType in all the calls
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 1398cba commit e122454

12 files changed

Lines changed: 102 additions & 90 deletions

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
141141
p.release();
142142

143143
if (interceptor != null) {
144-
interceptor.onCancel(streamId);
144+
interceptor.onCancel(streamId, FrameType.REQUEST_FNF);
145145
}
146146

147147
return;
@@ -153,7 +153,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
153153
lazyTerminate(STATE, this);
154154

155155
if (interceptor != null) {
156-
interceptor.onTerminate(streamId, e);
156+
interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, e);
157157
}
158158

159159
actual.onError(e);
@@ -163,7 +163,7 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
163163
lazyTerminate(STATE, this);
164164

165165
if (interceptor != null) {
166-
interceptor.onTerminate(streamId, null);
166+
interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, null);
167167
}
168168

169169
actual.onComplete();
@@ -262,7 +262,7 @@ public Void block() {
262262
lazyTerminate(STATE, this);
263263

264264
if (interceptor != null) {
265-
interceptor.onTerminate(streamId, e);
265+
interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, e);
266266
}
267267

268268
throw Exceptions.propagate(e);
@@ -271,7 +271,7 @@ public Void block() {
271271
lazyTerminate(STATE, this);
272272

273273
if (interceptor != null) {
274-
interceptor.onTerminate(streamId, null);
274+
interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, null);
275275
}
276276

277277
return null;

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetResponderSubscriber.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.netty.util.ReferenceCountUtil;
2222
import io.rsocket.Payload;
2323
import io.rsocket.RSocket;
24+
import io.rsocket.frame.FrameType;
2425
import io.rsocket.frame.decoder.PayloadDecoder;
2526
import io.rsocket.plugins.RequestInterceptor;
2627
import org.reactivestreams.Subscription;
@@ -101,7 +102,7 @@ public void onNext(Void voidVal) {}
101102
public void onError(Throwable t) {
102103
final RequestInterceptor requestInterceptor = this.requestInterceptor;
103104
if (requestInterceptor != null) {
104-
requestInterceptor.onTerminate(this.streamId, t);
105+
requestInterceptor.onTerminate(this.streamId, FrameType.REQUEST_FNF, t);
105106
}
106107

107108
logger.debug("Dropped Outbound error", t);
@@ -111,7 +112,7 @@ public void onError(Throwable t) {
111112
public void onComplete() {
112113
final RequestInterceptor requestInterceptor = this.requestInterceptor;
113114
if (requestInterceptor != null) {
114-
requestInterceptor.onTerminate(this.streamId, null);
115+
requestInterceptor.onTerminate(this.streamId, FrameType.REQUEST_FNF, null);
115116
}
116117
}
117118

@@ -131,7 +132,7 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
131132

132133
final RequestInterceptor requestInterceptor = this.requestInterceptor;
133134
if (requestInterceptor != null) {
134-
requestInterceptor.onTerminate(streamId, t);
135+
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_FNF, t);
135136
}
136137

137138
logger.debug("Reassembly has failed", t);
@@ -151,7 +152,7 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
151152

152153
final RequestInterceptor requestInterceptor = this.requestInterceptor;
153154
if (requestInterceptor != null) {
154-
requestInterceptor.onTerminate(this.streamId, t);
155+
requestInterceptor.onTerminate(this.streamId, FrameType.REQUEST_FNF, t);
155156
}
156157

157158
logger.debug("Reassembly has failed", t);
@@ -175,7 +176,7 @@ public final void handleCancel() {
175176

176177
final RequestInterceptor requestInterceptor = this.requestInterceptor;
177178
if (requestInterceptor != null) {
178-
requestInterceptor.onCancel(streamId);
179+
requestInterceptor.onCancel(streamId, FrameType.REQUEST_FNF);
179180
}
180181
}
181182
}

rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ void sendFirstPayload(Payload firstPayload, long initialRequestN) {
260260
this.inboundDone = true;
261261

262262
if (requestInterceptor != null) {
263-
requestInterceptor.onTerminate(streamId, t);
263+
requestInterceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
264264
}
265265

266266
this.inboundSubscriber.onError(t);
@@ -281,7 +281,7 @@ void sendFirstPayload(Payload firstPayload, long initialRequestN) {
281281
connection.sendFrame(streamId, cancelFrame);
282282

283283
if (requestInterceptor != null) {
284-
requestInterceptor.onCancel(streamId);
284+
requestInterceptor.onCancel(streamId, FrameType.REQUEST_CHANNEL);
285285
}
286286
return;
287287
}
@@ -364,7 +364,7 @@ void propagateErrorSafely(Throwable t) {
364364
if (!this.inboundDone) {
365365
final RequestInterceptor interceptor = requestInterceptor;
366366
if (interceptor != null) {
367-
interceptor.onTerminate(this.streamId, t);
367+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, t);
368368
}
369369

370370
this.inboundDone = true;
@@ -386,7 +386,7 @@ public final void cancel() {
386386

387387
final RequestInterceptor requestInterceptor = this.requestInterceptor;
388388
if (requestInterceptor != null) {
389-
requestInterceptor.onCancel(this.streamId);
389+
requestInterceptor.onCancel(this.streamId, FrameType.REQUEST_CHANNEL);
390390
}
391391
}
392392

@@ -449,7 +449,7 @@ public void onError(Throwable t) {
449449
synchronized (this) {
450450
final RequestInterceptor interceptor = requestInterceptor;
451451
if (interceptor != null) {
452-
interceptor.onTerminate(streamId, t);
452+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
453453
}
454454

455455
this.inboundDone = true;
@@ -492,7 +492,7 @@ public void onComplete() {
492492
if (isInboundTerminated) {
493493
final RequestInterceptor interceptor = requestInterceptor;
494494
if (interceptor != null) {
495-
interceptor.onTerminate(streamId, null);
495+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, null);
496496
}
497497
}
498498
}
@@ -515,7 +515,7 @@ public final void handleComplete() {
515515

516516
final RequestInterceptor interceptor = requestInterceptor;
517517
if (interceptor != null) {
518-
interceptor.onTerminate(streamId, null);
518+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, null);
519519
}
520520
}
521521

@@ -538,7 +538,7 @@ public final void handleError(Throwable cause) {
538538
} else if (isInboundTerminated(previousState)) {
539539
final RequestInterceptor interceptor = this.requestInterceptor;
540540
if (interceptor != null) {
541-
interceptor.onTerminate(this.streamId, cause);
541+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, cause);
542542
}
543543

544544
Operators.onErrorDropped(cause, this.inboundSubscriber.currentContext());
@@ -555,7 +555,7 @@ public final void handleError(Throwable cause) {
555555

556556
final RequestInterceptor interceptor = requestInterceptor;
557557
if (interceptor != null) {
558-
interceptor.onTerminate(streamId, cause);
558+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, cause);
559559
}
560560

561561
this.inboundSubscriber.onError(cause);
@@ -599,7 +599,7 @@ public void handleCancel() {
599599
if (inboundTerminated) {
600600
final RequestInterceptor interceptor = requestInterceptor;
601601
if (interceptor != null) {
602-
interceptor.onTerminate(this.streamId, null);
602+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, null);
603603
}
604604
}
605605
}

rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public void cancel() {
310310
if (isOutboundTerminated) {
311311
final RequestInterceptor interceptor = requestInterceptor;
312312
if (interceptor != null) {
313-
interceptor.onTerminate(streamId, null);
313+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, null);
314314
}
315315
}
316316
}
@@ -337,7 +337,7 @@ public final void handleCancel() {
337337

338338
final RequestInterceptor interceptor = this.requestInterceptor;
339339
if (interceptor != null) {
340-
interceptor.onCancel(this.streamId);
340+
interceptor.onCancel(this.streamId, FrameType.REQUEST_CHANNEL);
341341
}
342342
return;
343343
}
@@ -349,7 +349,7 @@ public final void handleCancel() {
349349

350350
final RequestInterceptor interceptor = this.requestInterceptor;
351351
if (interceptor != null) {
352-
interceptor.onCancel(this.streamId);
352+
interceptor.onCancel(this.streamId, FrameType.REQUEST_CHANNEL);
353353
}
354354
}
355355

@@ -464,7 +464,7 @@ public final void handleError(Throwable t) {
464464

465465
final RequestInterceptor interceptor = requestInterceptor;
466466
if (interceptor != null) {
467-
interceptor.onTerminate(this.streamId, t);
467+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, t);
468468
}
469469
}
470470

@@ -490,7 +490,7 @@ public void handleComplete() {
490490
if (isOutboundTerminated) {
491491
final RequestInterceptor interceptor = this.requestInterceptor;
492492
if (interceptor != null) {
493-
interceptor.onTerminate(this.streamId, null);
493+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, null);
494494
}
495495
}
496496
}
@@ -514,7 +514,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
514514
} else if (isOutboundTerminated(previousState)) {
515515
final RequestInterceptor interceptor = this.requestInterceptor;
516516
if (interceptor != null) {
517-
interceptor.onTerminate(this.streamId, t);
517+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, t);
518518
}
519519

520520
Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
@@ -530,7 +530,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
530530

531531
final RequestInterceptor interceptor = requestInterceptor;
532532
if (interceptor != null) {
533-
interceptor.onTerminate(streamId, t);
533+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
534534
}
535535
return;
536536
}
@@ -572,7 +572,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
572572
} else if (isOutboundTerminated(previousState)) {
573573
final RequestInterceptor interceptor = this.requestInterceptor;
574574
if (interceptor != null) {
575-
interceptor.onTerminate(this.streamId, e);
575+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, e);
576576
}
577577

578578
Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
@@ -591,7 +591,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
591591

592592
final RequestInterceptor interceptor = this.requestInterceptor;
593593
if (interceptor != null) {
594-
interceptor.onTerminate(streamId, e);
594+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, e);
595595
}
596596

597597
return;
@@ -620,7 +620,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
620620
} else if (isOutboundTerminated(previousState)) {
621621
final RequestInterceptor interceptor = this.requestInterceptor;
622622
if (interceptor != null) {
623-
interceptor.onTerminate(this.streamId, t);
623+
interceptor.onTerminate(this.streamId, FrameType.REQUEST_CHANNEL, t);
624624
}
625625

626626
Operators.onErrorDropped(t, this.inboundSubscriber.currentContext());
@@ -638,7 +638,7 @@ public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload)
638638

639639
final RequestInterceptor interceptor = requestInterceptor;
640640
if (interceptor != null) {
641-
interceptor.onTerminate(streamId, t);
641+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
642642
}
643643

644644
return;
@@ -690,7 +690,7 @@ public void onNext(Payload p) {
690690

691691
final RequestInterceptor interceptor = this.requestInterceptor;
692692
if (interceptor != null) {
693-
interceptor.onTerminate(streamId, e);
693+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, e);
694694
}
695695

696696
Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
@@ -705,7 +705,7 @@ public void onNext(Payload p) {
705705

706706
final RequestInterceptor interceptor = this.requestInterceptor;
707707
if (interceptor != null) {
708-
interceptor.onTerminate(streamId, e);
708+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, e);
709709
}
710710
return;
711711
}
@@ -720,7 +720,7 @@ public void onNext(Payload p) {
720720
} else if (isOutboundTerminated(previousState)) {
721721
final RequestInterceptor interceptor = this.requestInterceptor;
722722
if (interceptor != null) {
723-
interceptor.onTerminate(streamId, e);
723+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, e);
724724
}
725725

726726
Operators.onErrorDropped(e, this.inboundSubscriber.currentContext());
@@ -736,7 +736,7 @@ public void onNext(Payload p) {
736736

737737
final RequestInterceptor interceptor = requestInterceptor;
738738
if (interceptor != null) {
739-
interceptor.onTerminate(streamId, e);
739+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, e);
740740
}
741741
return;
742742
}
@@ -749,7 +749,7 @@ public void onNext(Payload p) {
749749
long previousState = this.tryTerminate(false);
750750
final RequestInterceptor interceptor = requestInterceptor;
751751
if (interceptor != null && !isTerminated(previousState)) {
752-
interceptor.onTerminate(streamId, t);
752+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
753753
}
754754
}
755755
}
@@ -810,7 +810,7 @@ && isFirstFrameSent(previousState)
810810

811811
final RequestInterceptor interceptor = this.requestInterceptor;
812812
if (interceptor != null) {
813-
interceptor.onTerminate(streamId, t);
813+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, t);
814814
}
815815
}
816816

@@ -840,7 +840,7 @@ public void onComplete() {
840840
if (isInboundTerminated) {
841841
final RequestInterceptor interceptor = this.requestInterceptor;
842842
if (interceptor != null) {
843-
interceptor.onTerminate(streamId, null);
843+
interceptor.onTerminate(streamId, FrameType.REQUEST_CHANNEL, null);
844844
}
845845
}
846846
}

0 commit comments

Comments
 (0)