Skip to content

Commit 4f4f8e4

Browse files
committed
Remove Guava's Service from server transport
ServerImpl.start() now throws IOException to make the error explicit. This was previously being papered over by wrapping the exception in RuntimeException.
1 parent f920bad commit 4f4f8e4

13 files changed

Lines changed: 350 additions & 300 deletions

File tree

core/src/main/java/io/grpc/AbstractChannelBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ protected static class ChannelEssentials {
127127
* Constructor.
128128
*
129129
* @param transportFactory the created channel uses this factory to create transports
130-
* @param terminationRunnable will be called at the channel's life-cycle events
130+
* @param terminationRunnable will be called at the channel termination
131131
*/
132132
public ChannelEssentials(ClientTransportFactory transportFactory,
133133
@Nullable Runnable terminationRunnable) {

core/src/main/java/io/grpc/AbstractServerBuilder.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@
3434
import static io.grpc.AbstractChannelBuilder.DEFAULT_EXECUTOR;
3535

3636
import com.google.common.base.Preconditions;
37-
import com.google.common.util.concurrent.Service;
38-
39-
import io.grpc.transport.ServerListener;
4037

4138
import java.util.concurrent.ExecutorService;
4239

@@ -114,18 +111,39 @@ public ServerImpl build() {
114111
releaseExecutor = true;
115112
}
116113

117-
ServerImpl server = new ServerImpl(executor, registry);
118-
server.setTransportServer(buildTransportServer(server.serverListener()));
114+
final ServerEssentials essentials = buildEssentials();
115+
ServerImpl server = new ServerImpl(executor, registry, essentials.server);
119116
server.setTerminationRunnable(new Runnable() {
120117
@Override
121118
public void run() {
122119
if (releaseExecutor) {
123120
SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
124121
}
122+
if (essentials.terminationRunnable != null) {
123+
essentials.terminationRunnable.run();
124+
}
125125
}
126126
});
127127
return server;
128128
}
129129

130-
protected abstract Service buildTransportServer(ServerListener serverListener);
130+
protected abstract ServerEssentials buildEssentials();
131+
132+
protected static class ServerEssentials {
133+
final io.grpc.transport.Server server;
134+
@Nullable
135+
final Runnable terminationRunnable;
136+
137+
/**
138+
* Constructor.
139+
*
140+
* @param server the created server uses this server to accept transports
141+
* @param terminationRunnable will be called at the server termination
142+
*/
143+
public ServerEssentials(io.grpc.transport.Server server,
144+
@Nullable Runnable terminationRunnable) {
145+
this.server = Preconditions.checkNotNull(server, "server");
146+
this.terminationRunnable = terminationRunnable;
147+
}
148+
}
131149
}

core/src/main/java/io/grpc/ServerImpl.java

Lines changed: 44 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,11 @@
3333

3434
import com.google.common.base.Preconditions;
3535
import com.google.common.base.Throwables;
36-
import com.google.common.util.concurrent.MoreExecutors;
37-
import com.google.common.util.concurrent.Service;
3836

3937
import io.grpc.transport.ServerListener;
4038
import io.grpc.transport.ServerStream;
4139
import io.grpc.transport.ServerStreamListener;
40+
import io.grpc.transport.ServerTransport;
4241
import io.grpc.transport.ServerTransportListener;
4342

4443
import java.io.IOException;
@@ -55,9 +54,7 @@
5554
* <pre><code>public class TcpTransportServerFactory {
5655
* public static Server newServer(Executor executor, HandlerRegistry registry,
5756
* String configuration) {
58-
* ServerImpl server = new ServerImpl(executor, registry);
59-
* return server.setTransportServer(
60-
* new TcpTransportServer(server.serverListener(), configuration));
57+
* return new ServerImpl(executor, registry, new TcpTransportServer(configuration));
6158
* }
6259
* }</code></pre>
6360
*
@@ -67,8 +64,6 @@
6764
public class ServerImpl implements Server {
6865
private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
6966

70-
private final ServerListener serverListener = new ServerListenerImpl();
71-
private final ServerTransportListener serverTransportListener = new ServerTransportListenerImpl();
7267
/** Executor for application processing. */
7368
private final Executor executor;
7469
private final HandlerRegistry registry;
@@ -77,46 +72,21 @@ public class ServerImpl implements Server {
7772
private boolean terminated;
7873
private Runnable terminationRunnable;
7974
/** Service encapsulating something similar to an accept() socket. */
80-
private Service transportServer;
75+
private final io.grpc.transport.Server transportServer;
8176
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
82-
private final Collection<Service> transports = new HashSet<Service>();
77+
private final Collection<ServerTransport> transports = new HashSet<ServerTransport>();
8378

8479
/**
85-
* Construct a server. {@link #setTransportServer(Service)} must be called before starting the
86-
* server.
80+
* Construct a server.
8781
*
8882
* @param executor to call methods on behalf of remote clients
8983
* @param registry of methods to expose to remote clients.
9084
*/
91-
public ServerImpl(Executor executor, HandlerRegistry registry) {
92-
this.executor = Preconditions.checkNotNull(executor);
93-
this.registry = Preconditions.checkNotNull(registry);
94-
}
95-
96-
/**
97-
* Set the transport server for the server. {@code transportServer} should be in state NEW and not
98-
* shared with any other {@code Server}s; it will be started and managed by the newly-created
99-
* server instance. Must be called before starting server.
100-
*
101-
* @return this object
102-
*/
103-
public synchronized ServerImpl setTransportServer(Service transportServer) {
104-
if (shutdown) {
105-
throw new IllegalStateException("Already shutdown");
106-
}
107-
Preconditions.checkState(this.transportServer == null, "transportServer already set");
108-
this.transportServer = Preconditions.checkNotNull(transportServer);
109-
Preconditions.checkArgument(
110-
transportServer.state() == Service.State.NEW, "transport server not in NEW state");
111-
transportServer.addListener(new TransportServiceListener(transportServer),
112-
MoreExecutors.directExecutor());
113-
transports.add(transportServer);
114-
return this;
115-
}
116-
117-
/** Listener to be called by transport factories to notify of new transport instances. */
118-
public ServerListener serverListener() {
119-
return serverListener;
85+
public ServerImpl(Executor executor, HandlerRegistry registry,
86+
io.grpc.transport.Server transportServer) {
87+
this.executor = Preconditions.checkNotNull(executor, "executor");
88+
this.registry = Preconditions.checkNotNull(registry, "registry");
89+
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
12090
}
12191

12292
/** Hack to allow executors to auto-shutdown. Not for general use. */
@@ -130,35 +100,27 @@ synchronized void setTerminationRunnable(Runnable runnable) {
130100
*
131101
* @return {@code this} object
132102
* @throws IllegalStateException if already started
103+
* @throws IOException if unable to bind
133104
*/
134-
public synchronized ServerImpl start() {
105+
public synchronized ServerImpl start() throws IOException {
135106
if (started) {
136107
throw new IllegalStateException("Already started");
137108
}
109+
// Start and wait for any port to actually be bound.
110+
transportServer.start(new ServerListenerImpl());
138111
started = true;
139-
try {
140-
// Start and wait for any port to actually be bound.
141-
transportServer.startAsync().awaitRunning();
142-
} catch (IllegalStateException ex) {
143-
Throwable t = transportServer.failureCause();
144-
if (t != null) {
145-
throw Throwables.propagate(t);
146-
}
147-
throw ex;
148-
}
149112
return this;
150113
}
151114

152115
/**
153116
* Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
154117
*/
155118
public synchronized ServerImpl shutdown() {
156-
shutdown = true;
157-
// transports collection can be modified during stopAsync(), even if we hold the lock, due to
158-
// reentrancy.
159-
for (Service transport : transports.toArray(new Service[transports.size()])) {
160-
transport.stopAsync();
119+
if (shutdown) {
120+
return this;
161121
}
122+
transportServer.shutdown();
123+
shutdown = true;
162124
return this;
163125
}
164126

@@ -226,10 +188,15 @@ public synchronized boolean isTerminated() {
226188
*
227189
* @param transport service to remove
228190
*/
229-
private synchronized void transportClosed(Service transport) {
191+
private synchronized void transportClosed(ServerTransport transport) {
230192
if (!transports.remove(transport)) {
231193
throw new AssertionError("Transport already removed");
232194
}
195+
checkForTermination();
196+
}
197+
198+
/** Notify of complete shutdown if necessary. */
199+
private synchronized void checkForTermination() {
233200
if (shutdown && transports.isEmpty()) {
234201
terminated = true;
235202
notifyAll();
@@ -241,50 +208,39 @@ private synchronized void transportClosed(Service transport) {
241208

242209
private class ServerListenerImpl implements ServerListener {
243210
@Override
244-
public ServerTransportListener transportCreated(Service transport) {
245-
Service.State transportState = transport.state();
246-
Preconditions.checkArgument(
247-
transportState == Service.State.STARTING || transportState == Service.State.RUNNING,
248-
"Created transport should be starting or running");
249-
synchronized (this) {
250-
if (shutdown) {
251-
transport.stopAsync();
252-
return serverTransportListener;
253-
}
211+
public ServerTransportListener transportCreated(ServerTransport transport) {
212+
synchronized (ServerImpl.this) {
254213
transports.add(transport);
255214
}
256-
// transports collection can be modified during this call, even if we hold the lock, due to
257-
// reentrancy.
258-
transport.addListener(new TransportServiceListener(transport),
259-
MoreExecutors.directExecutor());
260-
// We assume that transport.state() won't change by another thread before the listener was
261-
// registered.
262-
Preconditions.checkState(
263-
transport.state() == transportState, "transport changed state unexpectedly!");
264-
return serverTransportListener;
215+
return new ServerTransportListenerImpl(transport);
216+
}
217+
218+
@Override
219+
public void serverShutdown() {
220+
synchronized (ServerImpl.this) {
221+
// transports collection can be modified during shutdown(), even if we hold the lock, due
222+
// to reentrancy.
223+
for (ServerTransport transport
224+
: transports.toArray(new ServerTransport[transports.size()])) {
225+
transport.shutdown();
226+
}
227+
checkForTermination();
228+
}
265229
}
266230
}
267231

268-
/** Listens for lifecycle changes to a "TCP connection." */
269-
private class TransportServiceListener extends Service.Listener {
270-
private final Service transport;
232+
private class ServerTransportListenerImpl implements ServerTransportListener {
233+
private final ServerTransport transport;
271234

272-
public TransportServiceListener(Service transport) {
235+
public ServerTransportListenerImpl(ServerTransport transport) {
273236
this.transport = transport;
274237
}
275238

276239
@Override
277-
public void failed(Service.State from, Throwable failure) {
240+
public void transportTerminated() {
278241
transportClosed(transport);
279242
}
280243

281-
@Override
282-
public void terminated(Service.State from) {
283-
transportClosed(transport);
284-
}
285-
}
286-
287-
private class ServerTransportListenerImpl implements ServerTransportListener {
288244
@Override
289245
public ServerStreamListener streamCreated(final ServerStream stream, final String methodName,
290246
final Metadata.Headers headers) {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2015, Google Inc. All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
*
15+
* * Neither the name of Google Inc. nor the names of its
16+
* contributors may be used to endorse or promote products derived from
17+
* this software without specific prior written permission.
18+
*
19+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
*/
31+
32+
package io.grpc.transport;
33+
34+
import java.io.IOException;
35+
36+
/**
37+
* A server accepts new incomming connections. This is would commonly encapsulate a bound socket
38+
* that {@code accept(}}s new connections.
39+
*/
40+
public interface Server {
41+
/**
42+
* Starts transport. Implementations must not call {@code listener} until after {@code start()}
43+
* returns. The method only returns after it has done the equivalent of bind()ing, so it will be
44+
* able to service any connections created after returning.
45+
*
46+
* @param listener non-{@code null} listener of server events
47+
* @throws IOException if unable to bind
48+
*/
49+
void start(ServerListener listener) throws IOException;
50+
51+
/**
52+
* Initiates an orderly shutdown of the server. Existing transports continue, but new transports
53+
* will not be created (once {@link ServerListener#serverShutdown()} callback called).
54+
*/
55+
void shutdown();
56+
}

core/src/main/java/io/grpc/transport/ServerListener.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@
3131

3232
package io.grpc.transport;
3333

34-
import com.google.common.util.concurrent.Service;
35-
3634
/**
37-
* A listener to a server for transport creation events.
35+
* A listener to a server for transport creation events. Notifications must occur from the transport
36+
* thread.
3837
*/
3938
public interface ServerListener {
4039

@@ -44,5 +43,12 @@ public interface ServerListener {
4443
* @param transport the new transport to be observed.
4544
* @return a listener for stream creation events on the transport.
4645
*/
47-
ServerTransportListener transportCreated(Service transport);
46+
ServerTransportListener transportCreated(ServerTransport transport);
47+
48+
/**
49+
* The server is shutting down. No new transports will be processed, but existing streams may
50+
* continue. Shutdown is only caused by a call to {@link Server#shutdown()}. All resources have
51+
* been released.
52+
*/
53+
void serverShutdown();
4854
}

0 commit comments

Comments
 (0)