Skip to content

Commit f9f5b6a

Browse files
author
Xudong Ma
committed
okhttp: make transport.start() async.
1 parent a6585e3 commit f9f5b6a

3 files changed

Lines changed: 402 additions & 181 deletions

File tree

okhttp/src/main/java/io/grpc/transport/okhttp/AsyncFrameWriter.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package io.grpc.transport.okhttp;
3333

34+
import com.google.common.base.Preconditions;
3435
import com.google.common.util.concurrent.SettableFuture;
3536

3637
import com.squareup.okhttp.internal.spdy.ErrorCode;
@@ -45,20 +46,26 @@
4546
import java.io.IOException;
4647
import java.util.List;
4748
import java.util.concurrent.ExecutionException;
48-
import java.util.concurrent.Executor;
4949

5050
class AsyncFrameWriter implements FrameWriter {
51-
private final FrameWriter frameWriter;
52-
private final Executor executor;
51+
private FrameWriter frameWriter;
52+
// Although writes are thread-safe, we serialize them to prevent consuming many Threads that are
53+
// just waiting on each other.
54+
private final SerializingExecutor executor;
5355
private final OkHttpClientTransport transport;
5456

55-
public AsyncFrameWriter(FrameWriter frameWriter, OkHttpClientTransport transport,
56-
Executor executor) {
57-
this.frameWriter = frameWriter;
57+
public AsyncFrameWriter(OkHttpClientTransport transport, SerializingExecutor executor) {
5858
this.transport = transport;
59-
// Although writes are thread-safe, we serialize them to prevent consuming many Threads that are
60-
// just waiting on each other.
61-
this.executor = new SerializingExecutor(executor);
59+
this.executor = executor;
60+
}
61+
62+
/**
63+
* Set the real frameWriter, should only be called by thread of executor.
64+
*/
65+
void setFrameWriter(FrameWriter frameWriter) {
66+
Preconditions.checkState(this.frameWriter == null,
67+
"AsyncFrameWriter's setFrameWriter() should only be called once.");
68+
this.frameWriter = frameWriter;
6269
}
6370

6471
@Override
@@ -206,7 +213,9 @@ public void close() {
206213
@Override
207214
public void run() {
208215
try {
209-
frameWriter.close();
216+
if (frameWriter != null) {
217+
frameWriter.close();
218+
}
210219
} catch (IOException e) {
211220
closeFuture.setException(e);
212221
} finally {
@@ -228,6 +237,9 @@ private abstract class WriteRunnable implements Runnable {
228237
@Override
229238
public final void run() {
230239
try {
240+
if (frameWriter == null) {
241+
throw new IOException("Unable to perform write due to unavailable frameWriter.");
242+
}
231243
doRun();
232244
} catch (IOException ex) {
233245
transport.onIoException(ex);
@@ -240,6 +252,7 @@ public final void run() {
240252

241253
@Override
242254
public int maxDataLength() {
243-
return frameWriter.maxDataLength();
255+
return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */
256+
: frameWriter.maxDataLength();
244257
}
245258
}

okhttp/src/main/java/io/grpc/transport/okhttp/OkHttpClientTransport.java

Lines changed: 95 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.squareup.okhttp.OkHttpTlsUpgrader;
4444
import com.squareup.okhttp.internal.spdy.ErrorCode;
4545
import com.squareup.okhttp.internal.spdy.FrameReader;
46+
import com.squareup.okhttp.internal.spdy.FrameWriter;
4647
import com.squareup.okhttp.internal.spdy.Header;
4748
import com.squareup.okhttp.internal.spdy.HeadersMode;
4849
import com.squareup.okhttp.internal.spdy.Http2;
@@ -53,6 +54,7 @@
5354
import io.grpc.Metadata;
5455
import io.grpc.MethodDescriptor;
5556
import io.grpc.MethodDescriptor.MethodType;
57+
import io.grpc.SerializingExecutor;
5658
import io.grpc.Status;
5759
import io.grpc.Status.Code;
5860
import io.grpc.transport.ClientStreamListener;
@@ -130,7 +132,7 @@ class OkHttpClientTransport implements ClientTransport {
130132
private final Random random = new Random();
131133
private final Ticker ticker;
132134
private Listener listener;
133-
private FrameReader frameReader;
135+
private FrameReader testFrameReader;
134136
private AsyncFrameWriter frameWriter;
135137
private OutboundFlowController outboundFlow;
136138
private final Object lock = new Object();
@@ -139,6 +141,8 @@ class OkHttpClientTransport implements ClientTransport {
139141
private final Map<Integer, OkHttpClientStream> streams =
140142
Collections.synchronizedMap(new HashMap<Integer, OkHttpClientStream>());
141143
private final Executor executor;
144+
// Wrap on executor, to guarantee some operations be executed serially.
145+
private final SerializingExecutor serializingExecutor;
142146
private int connectionUnacknowledgedBytesRead;
143147
private ClientFrameHandler clientFrameHandler;
144148
// The status used to finish all active streams when the transport is closed.
@@ -157,6 +161,9 @@ class OkHttpClientTransport implements ClientTransport {
157161
@GuardedBy("lock")
158162
private LinkedList<PendingStream> pendingStreams = new LinkedList<PendingStream>();
159163
private final ConnectionSpec connectionSpec;
164+
private FrameWriter testFrameWriter;
165+
// Used by test only.
166+
Runnable connectedCallback;
160167

161168
OkHttpClientTransport(String host, int port, String authorityHost, Executor executor,
162169
@Nullable SSLSocketFactory sslSocketFactory, ConnectionSpec connectionSpec) {
@@ -165,6 +172,7 @@ class OkHttpClientTransport implements ClientTransport {
165172
this.authorityHost = authorityHost;
166173
defaultAuthority = authorityHost + ":" + port;
167174
this.executor = Preconditions.checkNotNull(executor);
175+
serializingExecutor = new SerializingExecutor(executor);
168176
// Client initiated streams are odd, server initiated ones are even. Server should not need to
169177
// use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
170178
nextStreamId = 3;
@@ -177,29 +185,25 @@ class OkHttpClientTransport implements ClientTransport {
177185
* Create a transport connected to a fake peer for test.
178186
*/
179187
@VisibleForTesting
180-
OkHttpClientTransport(Executor executor, FrameReader frameReader, AsyncFrameWriter frameWriter,
181-
int nextStreamId, Socket socket) {
182-
this(executor, frameReader, frameWriter, nextStreamId, socket, Ticker.systemTicker());
183-
}
184-
185-
/**
186-
* Create a transport connected to a fake peer for test, with a custom ticker.
187-
*/
188-
@VisibleForTesting
189-
OkHttpClientTransport(Executor executor, FrameReader frameReader, AsyncFrameWriter frameWriter,
190-
int nextStreamId, Socket socket, Ticker ticker) {
188+
OkHttpClientTransport(Executor executor, FrameReader frameReader, FrameWriter testFrameWriter,
189+
int nextStreamId, Socket socket, Ticker ticker, Runnable connectedCallback) {
191190
host = null;
192191
port = 0;
193192
authorityHost = null;
194193
defaultAuthority = "notarealauthority:80";
195194
this.executor = Preconditions.checkNotNull(executor);
196-
this.frameReader = Preconditions.checkNotNull(frameReader);
197-
this.frameWriter = Preconditions.checkNotNull(frameWriter);
195+
serializingExecutor = new SerializingExecutor(executor);
196+
this.testFrameReader = Preconditions.checkNotNull(frameReader);
197+
this.testFrameWriter = Preconditions.checkNotNull(testFrameWriter);
198198
this.socket = Preconditions.checkNotNull(socket);
199-
this.outboundFlow = new OutboundFlowController(this, frameWriter);
200199
this.nextStreamId = nextStreamId;
201200
this.ticker = ticker;
202201
this.connectionSpec = null;
202+
this.connectedCallback = Preconditions.checkNotNull(connectedCallback);
203+
}
204+
205+
private boolean isForTest() {
206+
return host == null;
203207
}
204208

205209
@Override
@@ -315,36 +319,73 @@ private boolean startPendingStreams() {
315319
@Override
316320
public void start(Listener listener) {
317321
this.listener = Preconditions.checkNotNull(listener, "listener");
318-
// We set host to null for test.
319-
if (host != null) {
320-
BufferedSource source;
321-
BufferedSink sink;
322-
try {
323-
socket = new Socket(host, port);
324-
if (sslSocketFactory != null) {
325-
socket = OkHttpTlsUpgrader.upgrade(
326-
sslSocketFactory, socket, authorityHost, port, connectionSpec);
322+
323+
frameWriter = new AsyncFrameWriter(this, serializingExecutor);
324+
outboundFlow = new OutboundFlowController(this, frameWriter);
325+
326+
// Connecting in the serializingExecutor, so that some stream operations like synStream
327+
// will be executed after connected.
328+
serializingExecutor.execute(new Runnable() {
329+
@Override
330+
public void run() {
331+
if (isForTest()) {
332+
clientFrameHandler = new ClientFrameHandler(testFrameReader);
333+
executor.execute(clientFrameHandler);
334+
connectedCallback.run();
335+
frameWriter.setFrameWriter(testFrameWriter);
336+
return;
337+
}
338+
BufferedSource source;
339+
BufferedSink sink;
340+
Socket sock;
341+
try {
342+
sock = new Socket(host, port);
343+
if (sslSocketFactory != null) {
344+
sock = OkHttpTlsUpgrader.upgrade(
345+
sslSocketFactory, sock, authorityHost, port, connectionSpec);
346+
}
347+
sock.setTcpNoDelay(true);
348+
source = Okio.buffer(Okio.source(sock));
349+
sink = Okio.buffer(Okio.sink(sock));
350+
} catch (IOException e) {
351+
onIoException(e);
352+
// (and probably do all of this work asynchronously instead of in calling thread)
353+
throw new RuntimeException(e);
327354
}
328-
socket.setTcpNoDelay(true);
329-
source = Okio.buffer(Okio.source(socket));
330-
sink = Okio.buffer(Okio.sink(socket));
331-
} catch (IOException e) {
332-
// TODO(jhump): should we instead notify the listener of shutdown+terminated?
333-
// (and probably do all of this work asynchronously instead of in calling thread)
334-
throw Status.UNAVAILABLE.withDescription("Failed connecting").withCause(e)
335-
.asRuntimeException();
336-
}
337-
Variant variant = new Http2();
338-
frameReader = variant.newReader(source, true);
339-
frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor);
340-
outboundFlow = new OutboundFlowController(this, frameWriter);
341-
frameWriter.connectionPreface();
342-
Settings settings = new Settings();
343-
frameWriter.settings(settings);
344-
}
345355

346-
clientFrameHandler = new ClientFrameHandler();
347-
executor.execute(clientFrameHandler);
356+
FrameWriter rawFrameWriter;
357+
synchronized (lock) {
358+
if (stopped) {
359+
// In case user called shutdown() during the connecting.
360+
try {
361+
sock.close();
362+
} catch (IOException e) {
363+
log.log(Level.WARNING, "Failed closing socket", e);
364+
}
365+
return;
366+
}
367+
socket = sock;
368+
}
369+
370+
Variant variant = new Http2();
371+
rawFrameWriter = variant.newWriter(sink, true);
372+
frameWriter.setFrameWriter(rawFrameWriter);
373+
374+
try {
375+
// Do these with the raw FrameWriter, so that they will be done in this thread,
376+
// and before any possible pending stream operations.
377+
rawFrameWriter.connectionPreface();
378+
Settings settings = new Settings();
379+
rawFrameWriter.settings(settings);
380+
} catch (IOException e) {
381+
onIoException(e);
382+
throw new RuntimeException(e);
383+
}
384+
385+
clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
386+
executor.execute(clientFrameHandler);
387+
}
388+
});
348389
}
349390

350391
@Override
@@ -356,9 +397,8 @@ public void shutdown() {
356397
if (normalClose) {
357398
// Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
358399
// The GOAWAY is part of graceful shutdown.
359-
if (frameWriter != null) {
360-
frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
361-
}
400+
frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
401+
362402
onGoAway(Integer.MAX_VALUE, Status.UNAVAILABLE.withDescription("Transport stopped"));
363403
}
364404
stopIfNecessary();
@@ -469,6 +509,7 @@ void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode err
469509
void stopIfNecessary() {
470510
boolean shouldStop;
471511
Http2Ping outstandingPing = null;
512+
boolean socketConnected;
472513
synchronized (lock) {
473514
shouldStop = (goAway && streams.size() == 0);
474515
if (shouldStop) {
@@ -481,11 +522,12 @@ void stopIfNecessary() {
481522
ping = null;
482523
}
483524
}
525+
socketConnected = socket != null;
484526
}
485527
if (shouldStop) {
486528
// Wait for the frame writer to close.
487-
if (frameWriter != null) {
488-
frameWriter.close();
529+
frameWriter.close();
530+
if (socketConnected) {
489531
// Close the socket to break out the reader thread, which will close the
490532
// frameReader and notify the listener.
491533
try {
@@ -529,7 +571,11 @@ static Status toGrpcStatus(ErrorCode code) {
529571
*/
530572
@VisibleForTesting
531573
class ClientFrameHandler implements FrameReader.Handler, Runnable {
532-
ClientFrameHandler() {}
574+
FrameReader frameReader;
575+
576+
ClientFrameHandler(FrameReader frameReader) {
577+
this.frameReader = frameReader;
578+
}
533579

534580
@Override
535581
public void run() {

0 commit comments

Comments
 (0)