Skip to content

Commit 0003e44

Browse files
Add simple server timeout support
Reintroduce throws Add timeoutExecutor shutdown Use a default future Move timeout cancellation Cancel the timeout in error cases
1 parent ad7820c commit 0003e44

6 files changed

Lines changed: 85 additions & 36 deletions

File tree

core/src/main/java/io/grpc/ChannelImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,15 +306,15 @@ public void transportTerminated() {
306306
}
307307
}
308308

309-
private class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
309+
private final class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
310310
private final MethodDescriptor<ReqT, RespT> method;
311311
private final SerializingExecutor callExecutor;
312312
private final boolean unaryRequest;
313313
private final CallOptions callOptions;
314314
private ClientStream stream;
315315
private volatile ScheduledFuture<?> deadlineCancellationFuture;
316316

317-
public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor,
317+
private CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor,
318318
CallOptions callOptions) {
319319
this.method = method;
320320
this.callExecutor = executor;
@@ -403,6 +403,7 @@ public void sendPayload(ReqT payload) {
403403
stream.writeMessage(payloadIs);
404404
failed = false;
405405
} finally {
406+
// TODO(notcarl): Find out if payloadIs needs to be closed.
406407
if (failed) {
407408
cancel();
408409
}

core/src/main/java/io/grpc/Metadata.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242

4343
import java.util.Arrays;
4444
import java.util.List;
45-
import java.util.Map;
4645
import java.util.Set;
4746

4847
import javax.annotation.concurrent.NotThreadSafe;
@@ -204,9 +203,9 @@ public byte[][] serialize() {
204203
Preconditions.checkState(serializable, "Can't serialize raw metadata");
205204
byte[][] serialized = new byte[store.size() * 2][];
206205
int i = 0;
207-
for (Map.Entry<String, MetadataEntry> entry : store.entries()) {
208-
serialized[i++] = entry.getValue().key.asciiName();
209-
serialized[i++] = entry.getValue().getSerialized();
206+
for (MetadataEntry entry : store.values()) {
207+
serialized[i++] = entry.key.asciiName();
208+
serialized[i++] = entry.getSerialized();
210209
}
211210
return serialized;
212211
}

core/src/main/java/io/grpc/ServerImpl.java

Lines changed: 63 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import com.google.common.base.Preconditions;
3535
import com.google.common.base.Throwables;
36+
import com.google.common.util.concurrent.Futures;
3637

3738
import io.grpc.transport.ServerListener;
3839
import io.grpc.transport.ServerStream;
@@ -42,9 +43,13 @@
4243

4344
import java.io.IOException;
4445
import java.io.InputStream;
46+
import java.util.ArrayList;
4547
import java.util.Collection;
4648
import java.util.HashSet;
4749
import java.util.concurrent.Executor;
50+
import java.util.concurrent.Executors;
51+
import java.util.concurrent.Future;
52+
import java.util.concurrent.ScheduledExecutorService;
4853
import java.util.concurrent.TimeUnit;
4954

5055
/**
@@ -64,6 +69,8 @@
6469
public final class ServerImpl extends Server {
6570
private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
6671

72+
private static final Future<?> DEFAULT_TIMEOUT_FUTURE = Futures.immediateCancelledFuture();
73+
6774
/** Executor for application processing. */
6875
private final Executor executor;
6976
private final HandlerRegistry registry;
@@ -77,6 +84,8 @@ public final class ServerImpl extends Server {
7784
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
7885
private final Collection<ServerTransport> transports = new HashSet<ServerTransport>();
7986

87+
private final ScheduledExecutorService timeoutService;
88+
8089
/**
8190
* Construct a server.
8291
*
@@ -88,6 +97,8 @@ public ServerImpl(Executor executor, HandlerRegistry registry,
8897
this.executor = Preconditions.checkNotNull(executor, "executor");
8998
this.registry = Preconditions.checkNotNull(registry, "registry");
9099
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
100+
// TODO(carl-mastrangelo): replace this with the shared scheduler once PR #576 is merged.
101+
this.timeoutService = Executors.newScheduledThreadPool(1);
91102
}
92103

93104
/** Hack to allow executors to auto-shutdown. Not for general use. */
@@ -122,6 +133,7 @@ public synchronized ServerImpl shutdown() {
122133
}
123134
transportServer.shutdown();
124135
shutdown = true;
136+
timeoutService.shutdown();
125137
return this;
126138
}
127139

@@ -224,8 +236,7 @@ public void serverShutdown() {
224236
synchronized (ServerImpl.this) {
225237
// transports collection can be modified during shutdown(), even if we hold the lock, due
226238
// to reentrancy.
227-
for (ServerTransport transport
228-
: transports.toArray(new ServerTransport[transports.size()])) {
239+
for (ServerTransport transport : new ArrayList<ServerTransport>(transports)) {
229240
transport.shutdown();
230241
}
231242
transportServerTerminated = true;
@@ -249,39 +260,61 @@ public void transportTerminated() {
249260
@Override
250261
public ServerStreamListener streamCreated(final ServerStream stream, final String methodName,
251262
final Metadata.Headers headers) {
263+
final Future<?> timeout = scheduleTimeout(stream, headers);
252264
SerializingExecutor serializingExecutor = new SerializingExecutor(executor);
253265
final JumpToApplicationThreadServerStreamListener jumpListener
254266
= new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream);
255267
// Run in serializingExecutor so jumpListener.setListener() is called before any callbacks
256268
// are delivered, including any errors. Callbacks can still be triggered, but they will be
257269
// queued.
258270
serializingExecutor.execute(new Runnable() {
259-
@Override
260-
public void run() {
261-
ServerStreamListener listener = NOOP_LISTENER;
262-
try {
263-
HandlerRegistry.Method method = registry.lookupMethod(methodName);
264-
if (method == null) {
265-
stream.close(
266-
Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
267-
new Metadata.Trailers());
268-
return;
269-
}
270-
listener = startCall(stream, methodName, method.getMethodDefinition(), headers);
271-
} catch (Throwable t) {
272-
stream.close(Status.fromThrowable(t), new Metadata.Trailers());
273-
throw Throwables.propagate(t);
274-
} finally {
275-
jumpListener.setListener(listener);
271+
@Override
272+
public void run() {
273+
ServerStreamListener listener = NOOP_LISTENER;
274+
try {
275+
HandlerRegistry.Method method = registry.lookupMethod(methodName);
276+
if (method == null) {
277+
stream.close(
278+
Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
279+
new Metadata.Trailers());
280+
timeout.cancel(true);
281+
return;
276282
}
283+
listener = startCall(stream, methodName, method.getMethodDefinition(), timeout,
284+
headers);
285+
} catch (Throwable t) {
286+
stream.close(Status.fromThrowable(t), new Metadata.Trailers());
287+
timeout.cancel(true);
288+
throw Throwables.propagate(t);
289+
} finally {
290+
jumpListener.setListener(listener);
277291
}
278-
});
292+
}
293+
});
279294
return jumpListener;
280295
}
281296

297+
private Future<?> scheduleTimeout(final ServerStream stream, Metadata.Headers headers) {
298+
Long timeoutMicros = headers.get(ChannelImpl.TIMEOUT_KEY);
299+
if (timeoutMicros == null) {
300+
return DEFAULT_TIMEOUT_FUTURE;
301+
}
302+
return timeoutService.schedule(new Runnable() {
303+
@Override
304+
public void run() {
305+
// This should rarely get run, since the client will likely cancel the stream before
306+
// the timeout is reached.
307+
stream.cancel(Status.DEADLINE_EXCEEDED);
308+
}
309+
},
310+
timeoutMicros,
311+
TimeUnit.MICROSECONDS);
312+
}
313+
282314
/** Never returns {@code null}. */
283315
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
284-
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata.Headers headers) {
316+
ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout,
317+
Metadata.Headers headers) {
285318
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
286319
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
287320
stream, methodDef.getMethodDescriptor());
@@ -291,7 +324,7 @@ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String
291324
throw new NullPointerException(
292325
"startCall() returned a null listener for method " + fullMethodName);
293326
}
294-
return call.newServerStreamListener(listener);
327+
return call.newServerStreamListener(listener, timeout);
295328
}
296329
}
297330

@@ -403,7 +436,7 @@ public void run() {
403436
}
404437
}
405438

406-
private class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
439+
private static class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
407440
private final ServerStream stream;
408441
private final MethodDescriptor<ReqT, RespT> method;
409442
private volatile boolean cancelled;
@@ -450,8 +483,9 @@ public boolean isCancelled() {
450483
return cancelled;
451484
}
452485

453-
private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener) {
454-
return new ServerStreamListenerImpl(listener);
486+
private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener,
487+
Future<?> timeout) {
488+
return new ServerStreamListenerImpl(listener, timeout);
455489
}
456490

457491
/**
@@ -460,9 +494,11 @@ private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<Req
460494
*/
461495
private class ServerStreamListenerImpl implements ServerStreamListener {
462496
private final ServerCall.Listener<ReqT> listener;
497+
private final Future<?> timeout;
463498

464-
public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener) {
499+
public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener, Future<?> timeout) {
465500
this.listener = Preconditions.checkNotNull(listener, "listener must not be null");
501+
this.timeout = timeout;
466502
}
467503

468504
@Override
@@ -493,6 +529,7 @@ public void halfClosed() {
493529

494530
@Override
495531
public void closed(Status status) {
532+
timeout.cancel(true);
496533
if (status.isOk()) {
497534
listener.onComplete();
498535
} else {

core/src/main/java/io/grpc/transport/ClientTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ ClientStream newStream(MethodDescriptor<?, ?> method,
6363
ClientStreamListener listener);
6464

6565
/**
66-
* Starts transport. Implementations must not call {@code listener} until after {@code start()}
66+
* Starts transport. Implementations must not call {@code listener} until after {@link #start}
6767
* returns.
6868
*
6969
* @param listener non-{@code null} listener of transport events
@@ -81,7 +81,7 @@ ClientStream newStream(MethodDescriptor<?, ?> method,
8181

8282
/**
8383
* Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
84-
* fail (once {@link Listener#transportShutdown()} callback called).
84+
* fail (once {@link Listener#transportShutdown} callback called).
8585
*/
8686
void shutdown();
8787

core/src/main/java/io/grpc/transport/ServerStream.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public interface ServerStream extends Stream {
4141

4242
/**
4343
* Writes custom metadata as headers on the response stream sent to the client. This method may
44-
* only be called once and cannot be called after calls to {@code Stream#writePayload}
45-
* or {@code #close}.
44+
* only be called once and cannot be called after calls to {@link Stream#writeMessage}
45+
* or {@link #close}.
4646
*
4747
* @param headers to send to client.
4848
*/
@@ -57,4 +57,10 @@ public interface ServerStream extends Stream {
5757
* @param trailers an additional block of metadata to pass to the client on stream closure.
5858
*/
5959
void close(Status status, Metadata.Trailers trailers);
60+
61+
62+
/**
63+
* Tears down the stream, typically in the event of a timeout.
64+
*/
65+
public void cancel(Status status);
6066
}

netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import static com.google.common.base.Preconditions.checkNotNull;
3535

3636
import io.grpc.Metadata;
37+
import io.grpc.Status;
3738
import io.grpc.transport.AbstractServerStream;
3839
import io.grpc.transport.WritableBuffer;
3940
import io.netty.buffer.ByteBuf;
@@ -121,4 +122,9 @@ protected void returnProcessedBytes(int processedBytes) {
121122
handler.returnProcessedBytes(http2Stream, processedBytes);
122123
writeQueue.scheduleFlush();
123124
}
125+
126+
@Override
127+
public void cancel(Status status) {
128+
// TODO(carl-mastrangelo): implement this
129+
}
124130
}

0 commit comments

Comments
 (0)