3131
3232package com .google .net .stubby ;
3333
34- import static com .google .common .util .concurrent .Service .State .RUNNING ;
35- import static com .google .common .util .concurrent .Service .State .STARTING ;
36-
3734import com .google .common .base .Preconditions ;
38- import com .google .common .util .concurrent .AbstractService ;
3935import com .google .common .util .concurrent .FutureCallback ;
4036import com .google .common .util .concurrent .Futures ;
4137import com .google .common .util .concurrent .ListenableFuture ;
4238import com .google .common .util .concurrent .MoreExecutors ;
39+ import com .google .common .util .concurrent .Service .Listener ;
40+ import com .google .common .util .concurrent .Service .State ;
4341import com .google .common .util .concurrent .SettableFuture ;
4442import com .google .net .stubby .transport .ClientStream ;
4543import com .google .net .stubby .transport .ClientStreamListener ;
5250import java .util .Collection ;
5351import java .util .concurrent .Callable ;
5452import java .util .concurrent .ExecutorService ;
53+ import java .util .concurrent .TimeUnit ;
5554import java .util .logging .Level ;
5655import java .util .logging .Logger ;
5756
6059
6160/** A communication channel for making outgoing RPCs. */
6261@ ThreadSafe
63- public final class ChannelImpl extends AbstractService implements Channel {
62+ public final class ChannelImpl implements Channel {
6463
6564 private static final Logger log = Logger .getLogger (ChannelImpl .class .getName ());
6665
66+ private static class NoopClientStream implements ClientStream {
67+ @ Override public void writeMessage (InputStream message , int length , Runnable accepted ) {}
68+ @ Override public void flush () {}
69+ @ Override public void cancel () {}
70+ @ Override public void halfClose () {}
71+ }
72+
6773 private final ClientTransportFactory transportFactory ;
6874 private final ExecutorService executor ;
6975 /**
@@ -76,79 +82,136 @@ public final class ChannelImpl extends AbstractService implements Channel {
7682 /** The transport for new outgoing requests. */
7783 @ GuardedBy ("this" )
7884 private ClientTransport activeTransport ;
85+ @ GuardedBy ("this" )
86+ private boolean shutdown ;
87+ @ GuardedBy ("this" )
88+ private boolean terminated ;
89+ private Runnable terminationRunnable ;
7990
8091 public ChannelImpl (ClientTransportFactory transportFactory , ExecutorService executor ) {
8192 this .transportFactory = transportFactory ;
8293 this .executor = executor ;
8394 }
8495
85- @ Override
86- protected void doStart () {
87- obtainActiveTransport (true );
96+ /** Hack to allow executors to auto-shutdown. Not for general use. */
97+ // TODO(user): Replace with a real API.
98+ void setTerminationRunnable (Runnable runnable ) {
99+ this .terminationRunnable = runnable ;
88100 }
89101
90- @ Override
91- protected synchronized void doStop () {
92- if (transports .isEmpty ()) {
93- notifyStopped ();
94- } else {
95- // The last TransportListener will call notifyStopped().
96- if (activeTransport != null ) {
97- activeTransport .stopAsync ();
98- activeTransport = null ;
102+ /**
103+ * Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
104+ * cancelled.
105+ */
106+ public synchronized ChannelImpl shutdown () {
107+ shutdown = true ;
108+ if (activeTransport != null ) {
109+ activeTransport .stopAsync ();
110+ activeTransport = null ;
111+ } else if (transports .isEmpty ()) {
112+ terminated = true ;
113+ notifyAll ();
114+ if (terminationRunnable != null ) {
115+ terminationRunnable .run ();
99116 }
100117 }
118+ return this ;
101119 }
102120
121+ /**
122+ * Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
123+ * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
124+ * return {@code false} immediately after this method returns.
125+ *
126+ * <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
127+ */
128+ // TODO(user): cancel preexisting calls.
129+ public synchronized ChannelImpl shutdownNow () {
130+ shutdown ();
131+ return this ;
132+ }
133+
134+ /**
135+ * Returns whether the channel is shutdown. Shutdown channels immediately cancel any new calls,
136+ * but may still have some calls being processed.
137+ *
138+ * @see #shutdown()
139+ * @see #isTerminated()
140+ */
141+ public synchronized boolean isShutdown () {
142+ return shutdown ;
143+ }
144+
145+ /**
146+ * Waits for the channel to become terminated, giving up if the timeout is reached.
147+ *
148+ * @return whether the channel is terminated, as would be done by {@link #isTerminated()}.
149+ */
150+ public synchronized boolean awaitTerminated (long timeout , TimeUnit unit )
151+ throws InterruptedException {
152+ long timeoutNanos = unit .toNanos (timeout );
153+ long endTimeNanos = System .nanoTime () + timeoutNanos ;
154+ while (!terminated && (timeoutNanos = endTimeNanos - System .nanoTime ()) > 0 ) {
155+ TimeUnit .NANOSECONDS .timedWait (this , timeoutNanos );
156+ }
157+ return terminated ;
158+ }
159+
160+ /**
161+ * Returns whether the channel is terminated. Terminated channels have no running calls and
162+ * relevant resources released (like TCP connections).
163+ *
164+ * @see #isShutdown()
165+ */
166+ public synchronized boolean isTerminated () {
167+ return terminated ;
168+ }
169+
170+ /**
171+ * Creates a new outgoing call on the channel.
172+ */
103173 @ Override
104174 public <ReqT , RespT > Call <ReqT , RespT > newCall (MethodDescriptor <ReqT , RespT > method ) {
105175 return new CallImpl <ReqT , RespT >(method , new SerializingExecutor (executor ));
106176 }
107177
108- private synchronized ClientTransport obtainActiveTransport (boolean notifyWhenRunning ) {
109- if (activeTransport == null ) {
110- State state = state ();
111- if (state != RUNNING && state != STARTING ) {
112- throw new IllegalStateException ("Not running" );
113- }
114- ClientTransport newTransport = transportFactory .newClientTransport ();
115- activeTransport = newTransport ;
116- transports .add (newTransport );
117- // activeTransport reference can be changed during calls to the transport, even if we hold the
118- // lock, due to reentrancy.
119- newTransport .addListener (new TransportListener (newTransport , notifyWhenRunning ),
120- MoreExecutors .directExecutor ());
121- newTransport .startAsync ();
122- return newTransport ;
178+ private synchronized ClientTransport obtainActiveTransport () {
179+ if (shutdown ) {
180+ return null ;
181+ }
182+ if (activeTransport != null ) {
183+ return activeTransport ;
123184 }
124- return activeTransport ;
185+ ClientTransport newTransport = transportFactory .newClientTransport ();
186+ activeTransport = newTransport ;
187+ transports .add (newTransport );
188+ // activeTransport reference can be changed during calls to the transport, even if we hold the
189+ // lock, due to reentrancy.
190+ newTransport .addListener (new TransportListener (newTransport ),
191+ MoreExecutors .directExecutor ());
192+ newTransport .startAsync ();
193+ return newTransport ;
125194 }
126195
127- private synchronized void transportFailedOrStopped (ClientTransport transport , Throwable t ) {
128- if (transport .state () == State .FAILED ) {
129- log .log (Level .SEVERE , "client transport failed " + transport .getClass ().getName (),
130- transport .failureCause ());
131- }
196+ private synchronized void transportFailedOrTerminated (ClientTransport transport ) {
132197 if (activeTransport == transport ) {
133198 activeTransport = null ;
134199 }
135200 transports .remove (transport );
136- if (state () != RUNNING && transports .isEmpty ()) {
137- if ( t != null ) {
138- notifyFailed ( t );
139- } else {
140- notifyStopped ();
201+ if (shutdown && transports .isEmpty ()) {
202+ terminated = true ;
203+ notifyAll ( );
204+ if ( terminationRunnable != null ) {
205+ terminationRunnable . run ();
141206 }
142207 }
143208 }
144209
145210 private class TransportListener extends Listener {
146211 private final ClientTransport transport ;
147- private final boolean notifyWhenRunning ;
148212
149- public TransportListener (ClientTransport transport , boolean notifyWhenRunning ) {
213+ public TransportListener (ClientTransport transport ) {
150214 this .transport = transport ;
151- this .notifyWhenRunning = notifyWhenRunning ;
152215 }
153216
154217 @ Override
@@ -162,20 +225,17 @@ public void stopping(State from) {
162225
163226 @ Override
164227 public void failed (State from , Throwable failure ) {
165- transportFailedOrStopped (transport , failure );
228+ log .log (Level .SEVERE , "Client transport failed" , failure );
229+ transportFailedOrTerminated (transport );
166230 }
167231
168232 @ Override
169233 public void terminated (State from ) {
170- transportFailedOrStopped (transport , null );
234+ transportFailedOrTerminated (transport );
171235 }
172236
173237 @ Override
174- public void running () {
175- if (notifyWhenRunning ) {
176- notifyStarted ();
177- }
178- }
238+ public void running () {}
179239 }
180240
181241 private class CallImpl <ReqT , RespT > extends Call <ReqT , RespT > {
@@ -191,8 +251,23 @@ public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor execut
191251 @ Override
192252 public void start (Listener <RespT > observer , Metadata .Headers headers ) {
193253 Preconditions .checkState (stream == null , "Already started" );
194- stream = obtainActiveTransport (false ).newStream (method , headers ,
195- new ClientStreamListenerImpl (observer ));
254+ ClientStreamListener listener = new ClientStreamListenerImpl (observer );
255+ ClientTransport transport = obtainActiveTransport ();
256+ if (transport == null ) {
257+ stream = new NoopClientStream ();
258+ listener .closed (Status .CANCELLED .withDescription ("Channel is shutdown" ),
259+ new Metadata .Trailers ());
260+ return ;
261+ }
262+ try {
263+ stream = transport .newStream (method , headers , listener );
264+ } catch (IllegalStateException ex ) {
265+ // We can race with the transport and end up trying to use a terminated transport.
266+ // TODO(user): Improve the API to remove the possibility of the race.
267+ stream = new NoopClientStream ();
268+ listener .closed (Status .fromThrowable (ex ), new Metadata .Trailers ());
269+ return ;
270+ }
196271 }
197272
198273 @ Override
0 commit comments