4343import com .squareup .okhttp .OkHttpTlsUpgrader ;
4444import com .squareup .okhttp .internal .spdy .ErrorCode ;
4545import com .squareup .okhttp .internal .spdy .FrameReader ;
46+ import com .squareup .okhttp .internal .spdy .FrameWriter ;
4647import com .squareup .okhttp .internal .spdy .Header ;
4748import com .squareup .okhttp .internal .spdy .HeadersMode ;
4849import com .squareup .okhttp .internal .spdy .Http2 ;
5354import io .grpc .Metadata ;
5455import io .grpc .MethodDescriptor ;
5556import io .grpc .MethodDescriptor .MethodType ;
57+ import io .grpc .SerializingExecutor ;
5658import io .grpc .Status ;
5759import io .grpc .Status .Code ;
5860import io .grpc .transport .ClientStreamListener ;
@@ -130,7 +132,7 @@ class OkHttpClientTransport implements ClientTransport {
130132 private final Random random = new Random ();
131133 private final Ticker ticker ;
132134 private Listener listener ;
133- private FrameReader frameReader ;
135+ private FrameReader testFrameReader ;
134136 private AsyncFrameWriter frameWriter ;
135137 private OutboundFlowController outboundFlow ;
136138 private final Object lock = new Object ();
@@ -139,6 +141,8 @@ class OkHttpClientTransport implements ClientTransport {
139141 private final Map <Integer , OkHttpClientStream > streams =
140142 Collections .synchronizedMap (new HashMap <Integer , OkHttpClientStream >());
141143 private final Executor executor ;
144+ // Wrap on executor, to guarantee some operations be executed serially.
145+ private final SerializingExecutor serializingExecutor ;
142146 private int connectionUnacknowledgedBytesRead ;
143147 private ClientFrameHandler clientFrameHandler ;
144148 // The status used to finish all active streams when the transport is closed.
@@ -157,6 +161,9 @@ class OkHttpClientTransport implements ClientTransport {
157161 @ GuardedBy ("lock" )
158162 private LinkedList <PendingStream > pendingStreams = new LinkedList <PendingStream >();
159163 private final ConnectionSpec connectionSpec ;
164+ private FrameWriter testFrameWriter ;
165+ // Used by test only.
166+ Runnable connectedCallback ;
160167
161168 OkHttpClientTransport (String host , int port , String authorityHost , Executor executor ,
162169 @ Nullable SSLSocketFactory sslSocketFactory , ConnectionSpec connectionSpec ) {
@@ -165,6 +172,7 @@ class OkHttpClientTransport implements ClientTransport {
165172 this .authorityHost = authorityHost ;
166173 defaultAuthority = authorityHost + ":" + port ;
167174 this .executor = Preconditions .checkNotNull (executor );
175+ serializingExecutor = new SerializingExecutor (executor );
168176 // Client initiated streams are odd, server initiated ones are even. Server should not need to
169177 // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
170178 nextStreamId = 3 ;
@@ -177,29 +185,25 @@ class OkHttpClientTransport implements ClientTransport {
177185 * Create a transport connected to a fake peer for test.
178186 */
179187 @ VisibleForTesting
180- OkHttpClientTransport (Executor executor , FrameReader frameReader , AsyncFrameWriter frameWriter ,
181- int nextStreamId , Socket socket ) {
182- this (executor , frameReader , frameWriter , nextStreamId , socket , Ticker .systemTicker ());
183- }
184-
185- /**
186- * Create a transport connected to a fake peer for test, with a custom ticker.
187- */
188- @ VisibleForTesting
189- OkHttpClientTransport (Executor executor , FrameReader frameReader , AsyncFrameWriter frameWriter ,
190- int nextStreamId , Socket socket , Ticker ticker ) {
188+ OkHttpClientTransport (Executor executor , FrameReader frameReader , FrameWriter testFrameWriter ,
189+ int nextStreamId , Socket socket , Ticker ticker , Runnable connectedCallback ) {
191190 host = null ;
192191 port = 0 ;
193192 authorityHost = null ;
194193 defaultAuthority = "notarealauthority:80" ;
195194 this .executor = Preconditions .checkNotNull (executor );
196- this .frameReader = Preconditions .checkNotNull (frameReader );
197- this .frameWriter = Preconditions .checkNotNull (frameWriter );
195+ serializingExecutor = new SerializingExecutor (executor );
196+ this .testFrameReader = Preconditions .checkNotNull (frameReader );
197+ this .testFrameWriter = Preconditions .checkNotNull (testFrameWriter );
198198 this .socket = Preconditions .checkNotNull (socket );
199- this .outboundFlow = new OutboundFlowController (this , frameWriter );
200199 this .nextStreamId = nextStreamId ;
201200 this .ticker = ticker ;
202201 this .connectionSpec = null ;
202+ this .connectedCallback = Preconditions .checkNotNull (connectedCallback );
203+ }
204+
205+ private boolean isForTest () {
206+ return host == null ;
203207 }
204208
205209 @ Override
@@ -315,36 +319,73 @@ private boolean startPendingStreams() {
315319 @ Override
316320 public void start (Listener listener ) {
317321 this .listener = Preconditions .checkNotNull (listener , "listener" );
318- // We set host to null for test.
319- if (host != null ) {
320- BufferedSource source ;
321- BufferedSink sink ;
322- try {
323- socket = new Socket (host , port );
324- if (sslSocketFactory != null ) {
325- socket = OkHttpTlsUpgrader .upgrade (
326- sslSocketFactory , socket , authorityHost , port , connectionSpec );
322+
323+ frameWriter = new AsyncFrameWriter (this , serializingExecutor );
324+ outboundFlow = new OutboundFlowController (this , frameWriter );
325+
326+ // Connecting in the serializingExecutor, so that some stream operations like synStream
327+ // will be executed after connected.
328+ serializingExecutor .execute (new Runnable () {
329+ @ Override
330+ public void run () {
331+ if (isForTest ()) {
332+ clientFrameHandler = new ClientFrameHandler (testFrameReader );
333+ executor .execute (clientFrameHandler );
334+ connectedCallback .run ();
335+ frameWriter .setFrameWriter (testFrameWriter );
336+ return ;
337+ }
338+ BufferedSource source ;
339+ BufferedSink sink ;
340+ Socket sock ;
341+ try {
342+ sock = new Socket (host , port );
343+ if (sslSocketFactory != null ) {
344+ sock = OkHttpTlsUpgrader .upgrade (
345+ sslSocketFactory , sock , authorityHost , port , connectionSpec );
346+ }
347+ sock .setTcpNoDelay (true );
348+ source = Okio .buffer (Okio .source (sock ));
349+ sink = Okio .buffer (Okio .sink (sock ));
350+ } catch (IOException e ) {
351+ onIoException (e );
352+ // (and probably do all of this work asynchronously instead of in calling thread)
353+ throw new RuntimeException (e );
327354 }
328- socket .setTcpNoDelay (true );
329- source = Okio .buffer (Okio .source (socket ));
330- sink = Okio .buffer (Okio .sink (socket ));
331- } catch (IOException e ) {
332- // TODO(jhump): should we instead notify the listener of shutdown+terminated?
333- // (and probably do all of this work asynchronously instead of in calling thread)
334- throw Status .UNAVAILABLE .withDescription ("Failed connecting" ).withCause (e )
335- .asRuntimeException ();
336- }
337- Variant variant = new Http2 ();
338- frameReader = variant .newReader (source , true );
339- frameWriter = new AsyncFrameWriter (variant .newWriter (sink , true ), this , executor );
340- outboundFlow = new OutboundFlowController (this , frameWriter );
341- frameWriter .connectionPreface ();
342- Settings settings = new Settings ();
343- frameWriter .settings (settings );
344- }
345355
346- clientFrameHandler = new ClientFrameHandler ();
347- executor .execute (clientFrameHandler );
356+ FrameWriter rawFrameWriter ;
357+ synchronized (lock ) {
358+ if (stopped ) {
359+ // In case user called shutdown() during the connecting.
360+ try {
361+ sock .close ();
362+ } catch (IOException e ) {
363+ log .log (Level .WARNING , "Failed closing socket" , e );
364+ }
365+ return ;
366+ }
367+ socket = sock ;
368+ }
369+
370+ Variant variant = new Http2 ();
371+ rawFrameWriter = variant .newWriter (sink , true );
372+ frameWriter .setFrameWriter (rawFrameWriter );
373+
374+ try {
375+ // Do these with the raw FrameWriter, so that they will be done in this thread,
376+ // and before any possible pending stream operations.
377+ rawFrameWriter .connectionPreface ();
378+ Settings settings = new Settings ();
379+ rawFrameWriter .settings (settings );
380+ } catch (IOException e ) {
381+ onIoException (e );
382+ throw new RuntimeException (e );
383+ }
384+
385+ clientFrameHandler = new ClientFrameHandler (variant .newReader (source , true ));
386+ executor .execute (clientFrameHandler );
387+ }
388+ });
348389 }
349390
350391 @ Override
@@ -356,9 +397,8 @@ public void shutdown() {
356397 if (normalClose ) {
357398 // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
358399 // The GOAWAY is part of graceful shutdown.
359- if (frameWriter != null ) {
360- frameWriter .goAway (0 , ErrorCode .NO_ERROR , new byte [0 ]);
361- }
400+ frameWriter .goAway (0 , ErrorCode .NO_ERROR , new byte [0 ]);
401+
362402 onGoAway (Integer .MAX_VALUE , Status .UNAVAILABLE .withDescription ("Transport stopped" ));
363403 }
364404 stopIfNecessary ();
@@ -469,6 +509,7 @@ void finishStream(int streamId, @Nullable Status status, @Nullable ErrorCode err
469509 void stopIfNecessary () {
470510 boolean shouldStop ;
471511 Http2Ping outstandingPing = null ;
512+ boolean socketConnected ;
472513 synchronized (lock ) {
473514 shouldStop = (goAway && streams .size () == 0 );
474515 if (shouldStop ) {
@@ -481,11 +522,12 @@ void stopIfNecessary() {
481522 ping = null ;
482523 }
483524 }
525+ socketConnected = socket != null ;
484526 }
485527 if (shouldStop ) {
486528 // Wait for the frame writer to close.
487- if ( frameWriter != null ) {
488- frameWriter . close ();
529+ frameWriter . close ();
530+ if ( socketConnected ) {
489531 // Close the socket to break out the reader thread, which will close the
490532 // frameReader and notify the listener.
491533 try {
@@ -529,7 +571,11 @@ static Status toGrpcStatus(ErrorCode code) {
529571 */
530572 @ VisibleForTesting
531573 class ClientFrameHandler implements FrameReader .Handler , Runnable {
532- ClientFrameHandler () {}
574+ FrameReader frameReader ;
575+
576+ ClientFrameHandler (FrameReader frameReader ) {
577+ this .frameReader = frameReader ;
578+ }
533579
534580 @ Override
535581 public void run () {
0 commit comments