3333
3434import com .google .common .base .Preconditions ;
3535import com .google .common .base .Throwables ;
36- import com .google .common .util .concurrent .MoreExecutors ;
37- import com .google .common .util .concurrent .Service ;
3836
3937import io .grpc .transport .ServerListener ;
4038import io .grpc .transport .ServerStream ;
4139import io .grpc .transport .ServerStreamListener ;
40+ import io .grpc .transport .ServerTransport ;
4241import io .grpc .transport .ServerTransportListener ;
4342
4443import java .io .IOException ;
5554 * <pre><code>public class TcpTransportServerFactory {
5655 * public static Server newServer(Executor executor, HandlerRegistry registry,
5756 * String configuration) {
58- * ServerImpl server = new ServerImpl(executor, registry);
59- * return server.setTransportServer(
60- * new TcpTransportServer(server.serverListener(), configuration));
57+ * return new ServerImpl(executor, registry, new TcpTransportServer(configuration));
6158 * }
6259 * }</code></pre>
6360 *
6764public class ServerImpl implements Server {
6865 private static final ServerStreamListener NOOP_LISTENER = new NoopListener ();
6966
70- private final ServerListener serverListener = new ServerListenerImpl ();
71- private final ServerTransportListener serverTransportListener = new ServerTransportListenerImpl ();
7267 /** Executor for application processing. */
7368 private final Executor executor ;
7469 private final HandlerRegistry registry ;
@@ -77,46 +72,21 @@ public class ServerImpl implements Server {
7772 private boolean terminated ;
7873 private Runnable terminationRunnable ;
7974 /** Service encapsulating something similar to an accept() socket. */
80- private Service transportServer ;
75+ private final io . grpc . transport . Server transportServer ;
8176 /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
82- private final Collection <Service > transports = new HashSet <Service >();
77+ private final Collection <ServerTransport > transports = new HashSet <ServerTransport >();
8378
8479 /**
85- * Construct a server. {@link #setTransportServer(Service)} must be called before starting the
86- * server.
80+ * Construct a server.
8781 *
8882 * @param executor to call methods on behalf of remote clients
8983 * @param registry of methods to expose to remote clients.
9084 */
91- public ServerImpl (Executor executor , HandlerRegistry registry ) {
92- this .executor = Preconditions .checkNotNull (executor );
93- this .registry = Preconditions .checkNotNull (registry );
94- }
95-
96- /**
97- * Set the transport server for the server. {@code transportServer} should be in state NEW and not
98- * shared with any other {@code Server}s; it will be started and managed by the newly-created
99- * server instance. Must be called before starting server.
100- *
101- * @return this object
102- */
103- public synchronized ServerImpl setTransportServer (Service transportServer ) {
104- if (shutdown ) {
105- throw new IllegalStateException ("Already shutdown" );
106- }
107- Preconditions .checkState (this .transportServer == null , "transportServer already set" );
108- this .transportServer = Preconditions .checkNotNull (transportServer );
109- Preconditions .checkArgument (
110- transportServer .state () == Service .State .NEW , "transport server not in NEW state" );
111- transportServer .addListener (new TransportServiceListener (transportServer ),
112- MoreExecutors .directExecutor ());
113- transports .add (transportServer );
114- return this ;
115- }
116-
117- /** Listener to be called by transport factories to notify of new transport instances. */
118- public ServerListener serverListener () {
119- return serverListener ;
85+ public ServerImpl (Executor executor , HandlerRegistry registry ,
86+ io .grpc .transport .Server transportServer ) {
87+ this .executor = Preconditions .checkNotNull (executor , "executor" );
88+ this .registry = Preconditions .checkNotNull (registry , "registry" );
89+ this .transportServer = Preconditions .checkNotNull (transportServer , "transportServer" );
12090 }
12191
12292 /** Hack to allow executors to auto-shutdown. Not for general use. */
@@ -130,35 +100,27 @@ synchronized void setTerminationRunnable(Runnable runnable) {
130100 *
131101 * @return {@code this} object
132102 * @throws IllegalStateException if already started
103+ * @throws IOException if unable to bind
133104 */
134- public synchronized ServerImpl start () {
105+ public synchronized ServerImpl start () throws IOException {
135106 if (started ) {
136107 throw new IllegalStateException ("Already started" );
137108 }
109+ // Start and wait for any port to actually be bound.
110+ transportServer .start (new ServerListenerImpl ());
138111 started = true ;
139- try {
140- // Start and wait for any port to actually be bound.
141- transportServer .startAsync ().awaitRunning ();
142- } catch (IllegalStateException ex ) {
143- Throwable t = transportServer .failureCause ();
144- if (t != null ) {
145- throw Throwables .propagate (t );
146- }
147- throw ex ;
148- }
149112 return this ;
150113 }
151114
152115 /**
153116 * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
154117 */
155118 public synchronized ServerImpl shutdown () {
156- shutdown = true ;
157- // transports collection can be modified during stopAsync(), even if we hold the lock, due to
158- // reentrancy.
159- for (Service transport : transports .toArray (new Service [transports .size ()])) {
160- transport .stopAsync ();
119+ if (shutdown ) {
120+ return this ;
161121 }
122+ transportServer .shutdown ();
123+ shutdown = true ;
162124 return this ;
163125 }
164126
@@ -226,10 +188,15 @@ public synchronized boolean isTerminated() {
226188 *
227189 * @param transport service to remove
228190 */
229- private synchronized void transportClosed (Service transport ) {
191+ private synchronized void transportClosed (ServerTransport transport ) {
230192 if (!transports .remove (transport )) {
231193 throw new AssertionError ("Transport already removed" );
232194 }
195+ checkForTermination ();
196+ }
197+
198+ /** Notify of complete shutdown if necessary. */
199+ private synchronized void checkForTermination () {
233200 if (shutdown && transports .isEmpty ()) {
234201 terminated = true ;
235202 notifyAll ();
@@ -241,50 +208,39 @@ private synchronized void transportClosed(Service transport) {
241208
242209 private class ServerListenerImpl implements ServerListener {
243210 @ Override
244- public ServerTransportListener transportCreated (Service transport ) {
245- Service .State transportState = transport .state ();
246- Preconditions .checkArgument (
247- transportState == Service .State .STARTING || transportState == Service .State .RUNNING ,
248- "Created transport should be starting or running" );
249- synchronized (this ) {
250- if (shutdown ) {
251- transport .stopAsync ();
252- return serverTransportListener ;
253- }
211+ public ServerTransportListener transportCreated (ServerTransport transport ) {
212+ synchronized (ServerImpl .this ) {
254213 transports .add (transport );
255214 }
256- // transports collection can be modified during this call, even if we hold the lock, due to
257- // reentrancy.
258- transport .addListener (new TransportServiceListener (transport ),
259- MoreExecutors .directExecutor ());
260- // We assume that transport.state() won't change by another thread before the listener was
261- // registered.
262- Preconditions .checkState (
263- transport .state () == transportState , "transport changed state unexpectedly!" );
264- return serverTransportListener ;
215+ return new ServerTransportListenerImpl (transport );
216+ }
217+
218+ @ Override
219+ public void serverShutdown () {
220+ synchronized (ServerImpl .this ) {
221+ // transports collection can be modified during shutdown(), even if we hold the lock, due
222+ // to reentrancy.
223+ for (ServerTransport transport
224+ : transports .toArray (new ServerTransport [transports .size ()])) {
225+ transport .shutdown ();
226+ }
227+ checkForTermination ();
228+ }
265229 }
266230 }
267231
268- /** Listens for lifecycle changes to a "TCP connection." */
269- private class TransportServiceListener extends Service .Listener {
270- private final Service transport ;
232+ private class ServerTransportListenerImpl implements ServerTransportListener {
233+ private final ServerTransport transport ;
271234
272- public TransportServiceListener ( Service transport ) {
235+ public ServerTransportListenerImpl ( ServerTransport transport ) {
273236 this .transport = transport ;
274237 }
275238
276239 @ Override
277- public void failed ( Service . State from , Throwable failure ) {
240+ public void transportTerminated ( ) {
278241 transportClosed (transport );
279242 }
280243
281- @ Override
282- public void terminated (Service .State from ) {
283- transportClosed (transport );
284- }
285- }
286-
287- private class ServerTransportListenerImpl implements ServerTransportListener {
288244 @ Override
289245 public ServerStreamListener streamCreated (final ServerStream stream , final String methodName ,
290246 final Metadata .Headers headers ) {
0 commit comments