Skip to content

Commit ddea743

Browse files
author
Xudong Ma
committed
Remove stream id check for writing path, it breaks the starting of pending streams.
And fixes OkHttpClientTransport.mayHaveCreatedStream() for the case that streamId is Integer.MAX_VALUE - 2.
1 parent b42122b commit ddea743

3 files changed

Lines changed: 35 additions & 8 deletions

File tree

okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,9 @@ private void startStream(OkHttpClientStream stream) {
272272
frameWriter.flush();
273273
}
274274
if (nextStreamId >= Integer.MAX_VALUE - 2) {
275+
// Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
276+
// correctly.
277+
nextStreamId = Integer.MAX_VALUE;
275278
onGoAway(Integer.MAX_VALUE, Status.INTERNAL.withDescription("Stream ids exhausted"));
276279
} else {
277280
nextStreamId += 2;

okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,6 @@ void windowUpdate(@Nullable OkHttpClientStream stream, int delta) {
119119
*/
120120
void data(boolean outFinished, int streamId, Buffer source, boolean flush) {
121121
Preconditions.checkNotNull(source, "source");
122-
if (streamId <= 0 || !transport.mayHaveCreatedStream(streamId)) {
123-
throw new IllegalArgumentException("Invalid streamId: " + streamId);
124-
}
125122

126123
OkHttpClientStream stream = transport.getStream(streamId);
127124
if (stream == null) {

okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -603,13 +603,28 @@ public void streamIdExhausted() throws Exception {
603603
int startId = Integer.MAX_VALUE - 2;
604604
initTransport(startId, new ConnectedCallback(false));
605605

606-
MockStreamListener listener1 = new MockStreamListener();
607-
clientTransport.newStream(method, new Metadata.Headers(), listener1);
606+
MockStreamListener listener = new MockStreamListener();
607+
clientTransport.newStream(method, new Metadata.Headers(), listener).request(1);
608608

609+
// New stream should be failed.
609610
assertNewStreamFail();
610611

612+
// The alive stream should be functional, receives a message.
613+
frameHandler().headers(
614+
false, false, startId, 0, grpcResponseHeaders(), HeadersMode.HTTP_20_HEADERS);
615+
assertNotNull(listener.headers);
616+
String message = "hello";
617+
Buffer buffer = createMessageFrame(message);
618+
frameHandler().data(false, startId, buffer, (int) buffer.size());
619+
611620
getStream(startId).cancel(Status.CANCELLED);
612-
listener1.waitUntilStreamClosed();
621+
// Receives the second message after be cancelled.
622+
buffer = createMessageFrame(message);
623+
frameHandler().data(false, startId, buffer, (int) buffer.size());
624+
625+
listener.waitUntilStreamClosed();
626+
// Should only have the first message delivered.
627+
assertEquals(message, listener.messages.get(0));
613628
verify(frameWriter, timeout(TIME_OUT_MS)).rstStream(eq(startId), eq(ErrorCode.CANCEL));
614629
verify(transportListener).transportShutdown(isA(Status.class));
615630
verify(transportListener, timeout(TIME_OUT_MS)).transportTerminated();
@@ -627,6 +642,12 @@ public void pendingStreamSucceed() throws Exception {
627642
// The second stream should be pending.
628643
OkHttpClientStream stream2 =
629644
clientTransport.newStream(method, new Metadata.Headers(), listener2);
645+
String sentMessage = "hello";
646+
InputStream input = new ByteArrayInputStream(sentMessage.getBytes(UTF_8));
647+
assertEquals(5, input.available());
648+
stream2.writeMessage(input);
649+
stream2.flush();
650+
stream2.halfClose();
630651

631652
waitForStreamPending(1);
632653
assertEquals(1, activeStreamCount());
@@ -635,10 +656,16 @@ public void pendingStreamSucceed() throws Exception {
635656
stream1.cancel(Status.CANCELLED);
636657
listener1.waitUntilStreamClosed();
637658

638-
// The second stream should be active now.
659+
// The second stream should be active now, and the pending data should be sent out.
639660
assertEquals(1, activeStreamCount());
640661
assertEquals(0, clientTransport.getPendingStreamSize());
641-
stream2.cancel(Status.CANCELLED);
662+
ArgumentCaptor<Buffer> captor = ArgumentCaptor.forClass(Buffer.class);
663+
verify(frameWriter, timeout(TIME_OUT_MS))
664+
.data(eq(false), eq(5), captor.capture(), eq(5 + HEADER_LENGTH));
665+
Buffer sentFrame = captor.getValue();
666+
assertEquals(createMessageFrame(sentMessage), sentFrame);
667+
verify(frameWriter, timeout(TIME_OUT_MS)).data(eq(true), eq(5), any(Buffer.class), eq(0));
668+
stream2.sendCancel(Status.CANCELLED);
642669
}
643670

644671
@Test

0 commit comments

Comments
 (0)