Skip to content

Commit ba103fb

Browse files
author
Xudong Ma
committed
OkHttp: make the pending stream cancellable.
1 parent 7d3d80e commit ba103fb

3 files changed

Lines changed: 47 additions & 2 deletions

File tree

okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientStream.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.squareup.okhttp.internal.spdy.ErrorCode;
3838
import com.squareup.okhttp.internal.spdy.Header;
3939

40+
import io.grpc.Metadata;
4041
import io.grpc.MethodDescriptor.MethodType;
4142
import io.grpc.Status;
4243
import io.grpc.transport.ClientStreamListener;
@@ -90,6 +91,8 @@ static OkHttpClientStream newStream(ClientStreamListener listener,
9091
private List<Header> requestHeaders;
9192
@GuardedBy("lock")
9293
private Queue<PendingData> pendingData = new LinkedList<PendingData>();
94+
@GuardedBy("lock")
95+
private boolean cancelSent = false;
9396

9497

9598
private OkHttpClientStream(ClientStreamListener listener,
@@ -131,7 +134,7 @@ public Integer id() {
131134
@GuardedBy("lock")
132135
public void start(Integer id) {
133136
checkNotNull(id, "id");
134-
checkState(this.id == null, "Can only set id once");
137+
checkState(this.id == null, "the stream has been started with id %s", this.id);
135138
this.id = id;
136139
frameWriter.synStream(false, false, id, 0, requestHeaders);
137140
requestHeaders = null;
@@ -228,7 +231,25 @@ protected void returnProcessedBytes(int processedBytes) {
228231

229232
@Override
230233
protected void sendCancel(Status reason) {
231-
transport.finishStream(id(), reason, ErrorCode.CANCEL);
234+
synchronized (lock) {
235+
if (cancelSent) {
236+
return;
237+
}
238+
cancelSent = true;
239+
if (pendingData != null) {
240+
// stream is pending.
241+
transport.removePendingStream(this);
242+
// release holding data, so they can be GCed or returned to pool earlier.
243+
requestHeaders = null;
244+
for (PendingData data : pendingData) {
245+
data.buffer.clear();
246+
}
247+
pendingData = null;
248+
transportReportStatus(reason, true, new Metadata.Trailers());
249+
} else {
250+
transport.finishStream(id(), reason, ErrorCode.CANCEL);
251+
}
252+
}
232253
}
233254

234255
@Override

okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,14 @@ private boolean startPendingStreams() {
293293
return hasStreamStarted;
294294
}
295295

296+
/**
297+
* Removes given pending stream, used when a pending stream is cancelled.
298+
*/
299+
@GuardedBy("lock")
300+
void removePendingStream(OkHttpClientStream pendingStream) {
301+
pendingStreams.remove(pendingStream);
302+
}
303+
296304
@Override
297305
public void start(Listener listener) {
298306
this.listener = Preconditions.checkNotNull(listener, "listener");

okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -641,6 +641,22 @@ public void pendingStreamSucceed() throws Exception {
641641
stream2.cancel(Status.CANCELLED);
642642
}
643643

644+
@Test
645+
public void pendingStreamCancelled() throws Exception {
646+
initTransport();
647+
setMaxConcurrentStreams(0);
648+
MockStreamListener listener = new MockStreamListener();
649+
OkHttpClientStream stream
650+
= clientTransport.newStream(method, new Metadata.Headers(), listener);
651+
waitForStreamPending(1);
652+
stream.sendCancel(Status.CANCELLED);
653+
// The second cancel should be an no-op.
654+
stream.sendCancel(Status.UNKNOWN);
655+
listener.waitUntilStreamClosed();
656+
assertEquals(0, clientTransport.getPendingStreamSize());
657+
assertEquals(Status.CANCELLED.getCode(), listener.status.getCode());
658+
}
659+
644660
@Test
645661
public void pendingStreamFailedByGoAway() throws Exception {
646662
initTransport();

0 commit comments

Comments
 (0)