3333
3434import com .google .common .base .Preconditions ;
3535import com .google .common .base .Throwables ;
36+ import com .google .common .util .concurrent .Futures ;
3637
3738import io .grpc .transport .ServerListener ;
3839import io .grpc .transport .ServerStream ;
4243
4344import java .io .IOException ;
4445import java .io .InputStream ;
46+ import java .util .ArrayList ;
4547import java .util .Collection ;
4648import java .util .HashSet ;
4749import java .util .concurrent .Executor ;
50+ import java .util .concurrent .Executors ;
51+ import java .util .concurrent .Future ;
52+ import java .util .concurrent .ScheduledExecutorService ;
4853import java .util .concurrent .TimeUnit ;
4954
5055/**
6469public final class ServerImpl extends Server {
6570 private static final ServerStreamListener NOOP_LISTENER = new NoopListener ();
6671
72+ private static final Future <?> DEFAULT_TIMEOUT_FUTURE = Futures .immediateCancelledFuture ();
73+
6774 /** Executor for application processing. */
6875 private final Executor executor ;
6976 private final HandlerRegistry registry ;
@@ -77,6 +84,8 @@ public final class ServerImpl extends Server {
7784 /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
7885 private final Collection <ServerTransport > transports = new HashSet <ServerTransport >();
7986
87+ private final ScheduledExecutorService timeoutService ;
88+
8089 /**
8190 * Construct a server.
8291 *
@@ -88,6 +97,8 @@ public ServerImpl(Executor executor, HandlerRegistry registry,
8897 this .executor = Preconditions .checkNotNull (executor , "executor" );
8998 this .registry = Preconditions .checkNotNull (registry , "registry" );
9099 this .transportServer = Preconditions .checkNotNull (transportServer , "transportServer" );
100+ // TODO(carl-mastrangelo): replace this with the shared scheduler once PR #576 is merged.
101+ this .timeoutService = Executors .newScheduledThreadPool (1 );
91102 }
92103
93104 /** Hack to allow executors to auto-shutdown. Not for general use. */
@@ -122,6 +133,7 @@ public synchronized ServerImpl shutdown() {
122133 }
123134 transportServer .shutdown ();
124135 shutdown = true ;
136+ timeoutService .shutdown ();
125137 return this ;
126138 }
127139
@@ -224,8 +236,7 @@ public void serverShutdown() {
224236 synchronized (ServerImpl .this ) {
225237 // transports collection can be modified during shutdown(), even if we hold the lock, due
226238 // to reentrancy.
227- for (ServerTransport transport
228- : transports .toArray (new ServerTransport [transports .size ()])) {
239+ for (ServerTransport transport : new ArrayList <ServerTransport >(transports )) {
229240 transport .shutdown ();
230241 }
231242 transportServerTerminated = true ;
@@ -249,39 +260,61 @@ public void transportTerminated() {
249260 @ Override
250261 public ServerStreamListener streamCreated (final ServerStream stream , final String methodName ,
251262 final Metadata .Headers headers ) {
263+ final Future <?> timeout = scheduleTimeout (stream , headers );
252264 SerializingExecutor serializingExecutor = new SerializingExecutor (executor );
253265 final JumpToApplicationThreadServerStreamListener jumpListener
254266 = new JumpToApplicationThreadServerStreamListener (serializingExecutor , stream );
255267 // Run in serializingExecutor so jumpListener.setListener() is called before any callbacks
256268 // are delivered, including any errors. Callbacks can still be triggered, but they will be
257269 // queued.
258270 serializingExecutor .execute (new Runnable () {
259- @ Override
260- public void run () {
261- ServerStreamListener listener = NOOP_LISTENER ;
262- try {
263- HandlerRegistry .Method method = registry .lookupMethod (methodName );
264- if (method == null ) {
265- stream .close (
266- Status .UNIMPLEMENTED .withDescription ("Method not found: " + methodName ),
267- new Metadata .Trailers ());
268- return ;
269- }
270- listener = startCall (stream , methodName , method .getMethodDefinition (), headers );
271- } catch (Throwable t ) {
272- stream .close (Status .fromThrowable (t ), new Metadata .Trailers ());
273- throw Throwables .propagate (t );
274- } finally {
275- jumpListener .setListener (listener );
271+ @ Override
272+ public void run () {
273+ ServerStreamListener listener = NOOP_LISTENER ;
274+ try {
275+ HandlerRegistry .Method method = registry .lookupMethod (methodName );
276+ if (method == null ) {
277+ stream .close (
278+ Status .UNIMPLEMENTED .withDescription ("Method not found: " + methodName ),
279+ new Metadata .Trailers ());
280+ timeout .cancel (true );
281+ return ;
276282 }
283+ listener = startCall (stream , methodName , method .getMethodDefinition (), timeout ,
284+ headers );
285+ } catch (Throwable t ) {
286+ stream .close (Status .fromThrowable (t ), new Metadata .Trailers ());
287+ timeout .cancel (true );
288+ throw Throwables .propagate (t );
289+ } finally {
290+ jumpListener .setListener (listener );
277291 }
278- });
292+ }
293+ });
279294 return jumpListener ;
280295 }
281296
297+ private Future <?> scheduleTimeout (final ServerStream stream , Metadata .Headers headers ) {
298+ Long timeoutMicros = headers .get (ChannelImpl .TIMEOUT_KEY );
299+ if (timeoutMicros == null ) {
300+ return DEFAULT_TIMEOUT_FUTURE ;
301+ }
302+ return timeoutService .schedule (new Runnable () {
303+ @ Override
304+ public void run () {
305+ // This should rarely get run, since the client will likely cancel the stream before
306+ // the timeout is reached.
307+ stream .cancel (Status .DEADLINE_EXCEEDED );
308+ }
309+ },
310+ timeoutMicros ,
311+ TimeUnit .MICROSECONDS );
312+ }
313+
282314 /** Never returns {@code null}. */
283315 private <ReqT , RespT > ServerStreamListener startCall (ServerStream stream , String fullMethodName ,
284- ServerMethodDefinition <ReqT , RespT > methodDef , Metadata .Headers headers ) {
316+ ServerMethodDefinition <ReqT , RespT > methodDef , Future <?> timeout ,
317+ Metadata .Headers headers ) {
285318 // TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
286319 final ServerCallImpl <ReqT , RespT > call = new ServerCallImpl <ReqT , RespT >(
287320 stream , methodDef .getMethodDescriptor ());
@@ -291,7 +324,7 @@ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String
291324 throw new NullPointerException (
292325 "startCall() returned a null listener for method " + fullMethodName );
293326 }
294- return call .newServerStreamListener (listener );
327+ return call .newServerStreamListener (listener , timeout );
295328 }
296329 }
297330
@@ -403,7 +436,7 @@ public void run() {
403436 }
404437 }
405438
406- private class ServerCallImpl <ReqT , RespT > extends ServerCall <RespT > {
439+ private static class ServerCallImpl <ReqT , RespT > extends ServerCall <RespT > {
407440 private final ServerStream stream ;
408441 private final MethodDescriptor <ReqT , RespT > method ;
409442 private volatile boolean cancelled ;
@@ -450,8 +483,9 @@ public boolean isCancelled() {
450483 return cancelled ;
451484 }
452485
453- private ServerStreamListenerImpl newServerStreamListener (ServerCall .Listener <ReqT > listener ) {
454- return new ServerStreamListenerImpl (listener );
486+ private ServerStreamListenerImpl newServerStreamListener (ServerCall .Listener <ReqT > listener ,
487+ Future <?> timeout ) {
488+ return new ServerStreamListenerImpl (listener , timeout );
455489 }
456490
457491 /**
@@ -460,9 +494,11 @@ private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<Req
460494 */
461495 private class ServerStreamListenerImpl implements ServerStreamListener {
462496 private final ServerCall .Listener <ReqT > listener ;
497+ private final Future <?> timeout ;
463498
464- public ServerStreamListenerImpl (ServerCall .Listener <ReqT > listener ) {
499+ public ServerStreamListenerImpl (ServerCall .Listener <ReqT > listener , Future <?> timeout ) {
465500 this .listener = Preconditions .checkNotNull (listener , "listener must not be null" );
501+ this .timeout = timeout ;
466502 }
467503
468504 @ Override
@@ -493,6 +529,7 @@ public void halfClosed() {
493529
494530 @ Override
495531 public void closed (Status status ) {
532+ timeout .cancel (true );
496533 if (status .isOk ()) {
497534 listener .onComplete ();
498535 } else {
0 commit comments