Skip to content

Commit 3eaa92b

Browse files
johnbcoughlinejona86
authored andcommitted
Send an RST_STREAM frame on server deadline
1 parent 9990794 commit 3eaa92b

10 files changed

Lines changed: 138 additions & 24 deletions

netty/src/main/java/io/grpc/transport/netty/CancelStreamCommand.java renamed to netty/src/main/java/io/grpc/transport/netty/CancelClientStreamCommand.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@
4343
/**
4444
* Command sent from a Netty client stream to the handler to cancel the stream.
4545
*/
46-
class CancelStreamCommand {
46+
class CancelClientStreamCommand {
4747
private final NettyClientStream stream;
4848
private final Status reason;
4949

50-
CancelStreamCommand(NettyClientStream stream, Status reason) {
50+
CancelClientStreamCommand(NettyClientStream stream, Status reason) {
5151
this.stream = Preconditions.checkNotNull(stream, "stream");
5252
Preconditions.checkNotNull(reason);
5353
Preconditions.checkArgument(EnumSet.of(CANCELLED, DEADLINE_EXCEEDED).contains(reason.getCode()),
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2015, Google Inc. All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
*
15+
* * Neither the name of Google Inc. nor the names of its
16+
* contributors may be used to endorse or promote products derived from
17+
* this software without specific prior written permission.
18+
*
19+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
*/
31+
32+
package io.grpc.transport.netty;
33+
34+
import com.google.common.base.MoreObjects;
35+
import com.google.common.base.Objects;
36+
import com.google.common.base.Preconditions;
37+
38+
import io.grpc.Status;
39+
40+
/**
41+
* Command sent from a Netty server stream to the handler to cancel the stream.
42+
*/
43+
class CancelServerStreamCommand {
44+
private final NettyServerStream stream;
45+
private final Status reason;
46+
47+
CancelServerStreamCommand(NettyServerStream stream, Status reason) {
48+
this.stream = Preconditions.checkNotNull(stream);
49+
this.reason = Preconditions.checkNotNull(reason);
50+
}
51+
52+
NettyServerStream stream() {
53+
return stream;
54+
}
55+
56+
Status reason() {
57+
return reason;
58+
}
59+
60+
@Override
61+
public boolean equals(Object o) {
62+
if (this == o) {
63+
return true;
64+
}
65+
if (o == null || getClass() != o.getClass()) {
66+
return false;
67+
}
68+
69+
CancelServerStreamCommand that = (CancelServerStreamCommand) o;
70+
71+
return Objects.equal(this.stream, that.stream)
72+
&& Objects.equal(this.reason, that.reason);
73+
}
74+
75+
@Override
76+
public int hashCode() {
77+
return Objects.hashCode(stream, reason);
78+
}
79+
80+
@Override
81+
public String toString() {
82+
return MoreObjects.toStringHelper(this)
83+
.add("stream", stream)
84+
.add("reason", reason)
85+
.toString();
86+
}
87+
}

netty/src/main/java/io/grpc/transport/netty/NettyClientHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
160160
createStream((CreateStreamCommand) msg, promise);
161161
} else if (msg instanceof SendGrpcFrameCommand) {
162162
sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
163-
} else if (msg instanceof CancelStreamCommand) {
164-
cancelStream(ctx, (CancelStreamCommand) msg, promise);
163+
} else if (msg instanceof CancelClientStreamCommand) {
164+
cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
165165
} else if (msg instanceof RequestMessagesCommand) {
166166
((RequestMessagesCommand) msg).requestMessages();
167167
} else if (msg instanceof SendPingCommand) {
@@ -322,7 +322,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
322322
/**
323323
* Cancels this stream.
324324
*/
325-
private void cancelStream(ChannelHandlerContext ctx, CancelStreamCommand cmd,
325+
private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
326326
ChannelPromise promise) {
327327
NettyClientStream stream = cmd.stream();
328328
stream.transportReportStatus(cmd.reason(), true, new Metadata.Trailers());

netty/src/main/java/io/grpc/transport/netty/NettyClientStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ void transportDataReceived(ByteBuf frame, boolean endOfStream) {
121121
@Override
122122
protected void sendCancel(Status reason) {
123123
// Send the cancel command to the handler.
124-
writeQueue.enqueue(new CancelStreamCommand(this, reason), true);
124+
writeQueue.enqueue(new CancelClientStreamCommand(this, reason), true);
125125
}
126126

127127
@Override

netty/src/main/java/io/grpc/transport/netty/NettyServerHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
234234
sendResponseHeaders(ctx, (SendResponseHeadersCommand) msg, promise);
235235
} else if (msg instanceof RequestMessagesCommand) {
236236
((RequestMessagesCommand) msg).requestMessages();
237+
} else if (msg instanceof CancelServerStreamCommand) {
238+
cancelStream(ctx, (CancelServerStreamCommand) msg, promise);
237239
} else {
238240
AssertionError e =
239241
new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
@@ -287,6 +289,12 @@ private void sendResponseHeaders(ChannelHandlerContext ctx, SendResponseHeadersC
287289
encoder().writeHeaders(ctx, cmd.streamId(), cmd.headers(), 0, cmd.endOfStream(), promise);
288290
}
289291

292+
private void cancelStream(ChannelHandlerContext ctx, CancelServerStreamCommand cmd,
293+
ChannelPromise promise) {
294+
cmd.stream().abortStream(cmd.reason(), false);
295+
encoder().writeRstStream(ctx, cmd.stream().id(), Http2Error.CANCEL.code(), promise);
296+
}
297+
290298
private Http2Stream requireHttp2Stream(int streamId) {
291299
Http2Stream stream = connection().stream(streamId);
292300
if (stream == null) {

netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,6 @@ protected void returnProcessedBytes(int processedBytes) {
125125

126126
@Override
127127
public void cancel(Status status) {
128-
// TODO(carl-mastrangelo): implement this
128+
writeQueue.enqueue(new CancelServerStreamCommand(this, status), true);
129129
}
130130
}

netty/src/test/java/io/grpc/transport/netty/NettyClientHandlerTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void cancelBufferedStreamShouldChangeClientStreamStatus() throws Exceptio
181181
verify(stream).id(eq(3));
182182
when(stream.id()).thenReturn(3);
183183
// Cancel the stream.
184-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.CANCELLED), true);
184+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.CANCELLED), true);
185185

186186
assertTrue(createPromise.isSuccess());
187187
verify(stream).transportReportStatus(eq(Status.CANCELLED), eq(true),
@@ -216,7 +216,7 @@ public void createStreamShouldSucceed() throws Exception {
216216
public void cancelShouldSucceed() throws Exception {
217217
createStream();
218218
verify(channel, times(1)).flush();
219-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.CANCELLED), true);
219+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.CANCELLED), true);
220220

221221
ByteBuf expected = rstStreamFrame(3, (int) Http2Error.CANCEL.code());
222222
verify(ctx).write(eq(expected), eq(promise));
@@ -227,7 +227,7 @@ public void cancelShouldSucceed() throws Exception {
227227
public void cancelDeadlineExceededShouldSucceed() throws Exception {
228228
createStream();
229229
verify(channel, times(1)).flush();
230-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.DEADLINE_EXCEEDED), true);
230+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.DEADLINE_EXCEEDED), true);
231231

232232
ByteBuf expected = rstStreamFrame(3, (int) Http2Error.CANCEL.code());
233233
verify(ctx).write(eq(expected), eq(promise));
@@ -244,7 +244,8 @@ public void cancelWhileBufferedShouldSucceed() throws Exception {
244244
verify(stream).id(idCaptor.capture());
245245
when(stream.id()).thenReturn(idCaptor.getValue());
246246
ChannelPromise cancelPromise = mock(ChannelPromise.class);
247-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.CANCELLED), cancelPromise, true);
247+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.CANCELLED), cancelPromise,
248+
true);
248249
verify(cancelPromise).setSuccess();
249250
verify(channel, times(2)).flush();
250251
verifyNoMoreInteractions(ctx);
@@ -259,29 +260,30 @@ public void cancelWhileBufferedShouldSucceed() throws Exception {
259260
public void cancelTwiceShouldSucceed() throws Exception {
260261
createStream();
261262

262-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.CANCELLED), promise, true);
263+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.CANCELLED), promise, true);
263264

264265
ByteBuf expected = rstStreamFrame(3, (int) Http2Error.CANCEL.code());
265266
verify(ctx).write(eq(expected), any(ChannelPromise.class));
266267

267268
promise = mock(ChannelPromise.class);
268269

269-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.CANCELLED), promise, true);
270+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.CANCELLED), promise, true);
270271
verify(promise).setSuccess();
271272
}
272273

273274
@Test
274275
public void cancelTwiceDifferentReasons() throws Exception {
275276
createStream();
276277

277-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.DEADLINE_EXCEEDED), promise, true);
278+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.DEADLINE_EXCEEDED), promise,
279+
true);
278280

279281
ByteBuf expected = rstStreamFrame(3, (int) Http2Error.CANCEL.code());
280282
verify(ctx).write(eq(expected), any(ChannelPromise.class));
281283

282284
promise = mock(ChannelPromise.class);
283285

284-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.CANCELLED), promise, true);
286+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.CANCELLED), promise, true);
285287
verify(promise).setSuccess();
286288
}
287289

@@ -383,7 +385,7 @@ public void cancelStreamShouldCreateAndThenFailBufferedStream() throws Exception
383385
writeQueue.enqueue(new CreateStreamCommand(grpcHeaders, stream), true);
384386
verify(stream).id(3);
385387
when(stream.id()).thenReturn(3);
386-
writeQueue.enqueue(new CancelStreamCommand(stream, Status.CANCELLED), true);
388+
writeQueue.enqueue(new CancelClientStreamCommand(stream, Status.CANCELLED), true);
387389
verify(stream).transportReportStatus(eq(Status.CANCELLED), eq(true),
388390
any(Metadata.Trailers.class));
389391
}

netty/src/test/java/io/grpc/transport/netty/NettyClientStreamTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ public void cancelShouldSendCommand() {
104104
// Set stream id to indicate it has been created
105105
stream().id(STREAM_ID);
106106
stream().cancel(Status.CANCELLED);
107-
ArgumentCaptor<CancelStreamCommand> commandCaptor =
108-
ArgumentCaptor.forClass(CancelStreamCommand.class);
107+
ArgumentCaptor<CancelClientStreamCommand> commandCaptor =
108+
ArgumentCaptor.forClass(CancelClientStreamCommand.class);
109109
verify(writeQueue).enqueue(commandCaptor.capture(), eq(true));
110110
assertEquals(commandCaptor.getValue().reason(), Status.CANCELLED);
111111
}
@@ -115,16 +115,16 @@ public void deadlineExceededCancelShouldSendCommand() {
115115
// Set stream id to indicate it has been created
116116
stream().id(STREAM_ID);
117117
stream().cancel(Status.DEADLINE_EXCEEDED);
118-
ArgumentCaptor<CancelStreamCommand> commandCaptor =
119-
ArgumentCaptor.forClass(CancelStreamCommand.class);
118+
ArgumentCaptor<CancelClientStreamCommand> commandCaptor =
119+
ArgumentCaptor.forClass(CancelClientStreamCommand.class);
120120
verify(writeQueue).enqueue(commandCaptor.capture(), eq(true));
121121
assertEquals(commandCaptor.getValue().reason(), Status.DEADLINE_EXCEEDED);
122122
}
123123

124124
@Test
125125
public void cancelShouldStillSendCommandIfStreamNotCreatedToCancelCreation() {
126126
stream().cancel(Status.CANCELLED);
127-
verify(writeQueue).enqueue(isA(CancelStreamCommand.class), eq(true));
127+
verify(writeQueue).enqueue(isA(CancelClientStreamCommand.class), eq(true));
128128
}
129129

130130
@Test
@@ -244,13 +244,13 @@ public void invalidInboundHeadersCancelStream() throws Exception {
244244

245245
// We are now waiting for 100 bytes of error context on the stream, cancel has not yet been
246246
// sent
247-
verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class));
247+
verify(channel, never()).writeAndFlush(any(CancelClientStreamCommand.class));
248248
stream().transportDataReceived(Unpooled.buffer(100).writeZero(100), false);
249-
verify(channel, never()).writeAndFlush(any(CancelStreamCommand.class));
249+
verify(channel, never()).writeAndFlush(any(CancelClientStreamCommand.class));
250250
stream().transportDataReceived(Unpooled.buffer(1000).writeZero(1000), false);
251251

252252
// Now verify that cancel is sent and an error is reported to the listener
253-
verify(writeQueue).enqueue(any(CancelStreamCommand.class), eq(true));
253+
verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true));
254254
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
255255
verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
256256
assertEquals(Status.UNKNOWN.getCode(), captor.getValue().getCode());

netty/src/test/java/io/grpc/transport/netty/NettyServerHandlerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,15 @@ public void connectionWindowShouldBeOverridden() throws Exception {
335335
assertEquals(flowControlWindow, actualInitialWindowSize);
336336
}
337337

338+
@Test
339+
public void cancelShouldSendRstStream() throws Exception {
340+
createStream();
341+
writeQueue.enqueue(new CancelServerStreamCommand(stream, Status.DEADLINE_EXCEEDED), true);
342+
ByteBuf expected = rstStreamFrame(stream.id(), (int) Http2Error.CANCEL.code());
343+
ByteBuf actual = captureWrite(ctx);
344+
assertEquals(expected, actual);
345+
}
346+
338347
private void createStream() throws Exception {
339348
Http2Headers headers = new DefaultHttp2Headers()
340349
.method(HTTP_METHOD)

netty/src/test/java/io/grpc/transport/netty/NettyServerStreamTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,14 @@ public void emptyFramerShouldSendNoPayload() throws Exception {
234234
true);
235235
}
236236

237+
@Test
238+
public void cancelStreamShouldSucceed() {
239+
stream().cancel(Status.DEADLINE_EXCEEDED);
240+
verify(writeQueue).enqueue(
241+
new CancelServerStreamCommand(stream(), Status.DEADLINE_EXCEEDED),
242+
true);
243+
}
244+
237245
@Override
238246
protected NettyServerStream createStream() {
239247
when(handler.getWriteQueue()).thenReturn(writeQueue);

0 commit comments

Comments
 (0)