Skip to content

Commit 7235a39

Browse files
committed
Remove Service API from ChannelImpl
This change loses asynchronous notification of channel state-change and a way to wait until the channel is actually connected. Both of these are expected to be added back as part of a health API. The important distinction from Service is that ChannelImpl never permanently fails and can revert from being started to connecting again. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83875407
1 parent 198a756 commit 7235a39

8 files changed

Lines changed: 206 additions & 103 deletions

File tree

core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131

3232
package com.google.net.stubby;
3333

34+
import static com.google.net.stubby.AbstractServiceBuilder.DEFAULT_EXECUTOR;
35+
3436
import com.google.common.base.Preconditions;
35-
import com.google.common.util.concurrent.MoreExecutors;
36-
import com.google.common.util.concurrent.Service;
3737
import com.google.net.stubby.transport.ClientTransportFactory;
3838

3939
import java.util.concurrent.ExecutorService;
@@ -45,16 +45,52 @@
4545
*
4646
* @param <BuilderT> The concrete type of this builder.
4747
*/
48-
public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBuilder<BuilderT>>
49-
extends AbstractServiceBuilder<ChannelImpl, BuilderT> {
48+
public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBuilder<BuilderT>> {
49+
@Nullable
50+
private ExecutorService userExecutor;
5051

51-
@Override
52-
protected final ChannelImpl buildImpl(ExecutorService executor) {
53-
ChannelEssentials essentials = buildEssentials();
54-
ChannelImpl channel = new ChannelImpl(essentials.transportFactory, executor);
55-
if (essentials.listener != null) {
56-
channel.addListener(essentials.listener, MoreExecutors.directExecutor());
52+
/**
53+
* Provides a custom executor.
54+
*
55+
* <p>It's an optional parameter. If the user has not provided an executor when the channel is
56+
* built, the builder will use a static cached thread pool.
57+
*
58+
* <p>The channel won't take ownership of the given executor. It's caller's responsibility to
59+
* shut down the executor when it's desired.
60+
*/
61+
@SuppressWarnings("unchecked")
62+
public final BuilderT executor(ExecutorService executor) {
63+
userExecutor = executor;
64+
return (BuilderT) this;
65+
}
66+
67+
/**
68+
* Builds a channel using the given parameters.
69+
*/
70+
public ChannelImpl build() {
71+
final ExecutorService executor;
72+
final boolean releaseExecutor;
73+
if (userExecutor != null) {
74+
executor = userExecutor;
75+
releaseExecutor = false;
76+
} else {
77+
executor = SharedResourceHolder.get(DEFAULT_EXECUTOR);
78+
releaseExecutor = true;
5779
}
80+
81+
final ChannelEssentials essentials = buildEssentials();
82+
ChannelImpl channel = new ChannelImpl(essentials.transportFactory, executor);
83+
channel.setTerminationRunnable(new Runnable() {
84+
@Override
85+
public void run() {
86+
if (releaseExecutor) {
87+
SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
88+
}
89+
if (essentials.terminationRunnable != null) {
90+
essentials.terminationRunnable.run();
91+
}
92+
}
93+
});
5894
return channel;
5995
}
6096

@@ -63,16 +99,16 @@ protected final ChannelImpl buildImpl(ExecutorService executor) {
6399
*/
64100
protected static class ChannelEssentials {
65101
final ClientTransportFactory transportFactory;
66-
@Nullable final Service.Listener listener;
102+
@Nullable final Runnable terminationRunnable;
67103

68104
/**
69105
* @param transportFactory the created channel uses this factory to create transports
70-
* @param listener will be called at the channel's life-cycle events
106+
* @param terminationRunnable will be called at the channel's life-cycle events
71107
*/
72108
public ChannelEssentials(ClientTransportFactory transportFactory,
73-
@Nullable Service.Listener listener) {
109+
@Nullable Runnable terminationRunnable) {
74110
this.transportFactory = Preconditions.checkNotNull(transportFactory);
75-
this.listener = listener;
111+
this.terminationRunnable = terminationRunnable;
76112
}
77113
}
78114

core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
abstract class AbstractServiceBuilder<ProductT extends Service,
5858
BuilderT extends AbstractServiceBuilder<ProductT, BuilderT>> {
5959

60-
private static final Resource<ExecutorService> DEFAULT_EXECUTOR =
60+
static final Resource<ExecutorService> DEFAULT_EXECUTOR =
6161
new Resource<ExecutorService>() {
6262
private static final String name = "grpc-default-executor";
6363
@Override

core/src/main/java/com/google/net/stubby/ChannelImpl.java

Lines changed: 130 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@
3131

3232
package com.google.net.stubby;
3333

34-
import static com.google.common.util.concurrent.Service.State.RUNNING;
35-
import static com.google.common.util.concurrent.Service.State.STARTING;
36-
3734
import com.google.common.base.Preconditions;
38-
import com.google.common.util.concurrent.AbstractService;
3935
import com.google.common.util.concurrent.FutureCallback;
4036
import com.google.common.util.concurrent.Futures;
4137
import com.google.common.util.concurrent.ListenableFuture;
4238
import com.google.common.util.concurrent.MoreExecutors;
39+
import com.google.common.util.concurrent.Service.Listener;
40+
import com.google.common.util.concurrent.Service.State;
4341
import com.google.common.util.concurrent.SettableFuture;
4442
import com.google.net.stubby.transport.ClientStream;
4543
import com.google.net.stubby.transport.ClientStreamListener;
@@ -52,6 +50,7 @@
5250
import java.util.Collection;
5351
import java.util.concurrent.Callable;
5452
import java.util.concurrent.ExecutorService;
53+
import java.util.concurrent.TimeUnit;
5554
import java.util.logging.Level;
5655
import java.util.logging.Logger;
5756

@@ -60,10 +59,17 @@
6059

6160
/** A communication channel for making outgoing RPCs. */
6261
@ThreadSafe
63-
public final class ChannelImpl extends AbstractService implements Channel {
62+
public final class ChannelImpl implements Channel {
6463

6564
private static final Logger log = Logger.getLogger(ChannelImpl.class.getName());
6665

66+
private static class NoopClientStream implements ClientStream {
67+
@Override public void writeMessage(InputStream message, int length, Runnable accepted) {}
68+
@Override public void flush() {}
69+
@Override public void cancel() {}
70+
@Override public void halfClose() {}
71+
}
72+
6773
private final ClientTransportFactory transportFactory;
6874
private final ExecutorService executor;
6975
/**
@@ -76,79 +82,136 @@ public final class ChannelImpl extends AbstractService implements Channel {
7682
/** The transport for new outgoing requests. */
7783
@GuardedBy("this")
7884
private ClientTransport activeTransport;
85+
@GuardedBy("this")
86+
private boolean shutdown;
87+
@GuardedBy("this")
88+
private boolean terminated;
89+
private Runnable terminationRunnable;
7990

8091
public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) {
8192
this.transportFactory = transportFactory;
8293
this.executor = executor;
8394
}
8495

85-
@Override
86-
protected void doStart() {
87-
obtainActiveTransport(true);
96+
/** Hack to allow executors to auto-shutdown. Not for general use. */
97+
// TODO(user): Replace with a real API.
98+
void setTerminationRunnable(Runnable runnable) {
99+
this.terminationRunnable = runnable;
88100
}
89101

90-
@Override
91-
protected synchronized void doStop() {
92-
if (transports.isEmpty()) {
93-
notifyStopped();
94-
} else {
95-
// The last TransportListener will call notifyStopped().
96-
if (activeTransport != null) {
97-
activeTransport.stopAsync();
98-
activeTransport = null;
102+
/**
103+
* Initiates an orderly shutdown in which preexisting calls continue but new calls are immediately
104+
* cancelled.
105+
*/
106+
public synchronized ChannelImpl shutdown() {
107+
shutdown = true;
108+
if (activeTransport != null) {
109+
activeTransport.stopAsync();
110+
activeTransport = null;
111+
} else if (transports.isEmpty()) {
112+
terminated = true;
113+
notifyAll();
114+
if (terminationRunnable != null) {
115+
terminationRunnable.run();
99116
}
100117
}
118+
return this;
101119
}
102120

121+
/**
122+
* Initiates a forceful shutdown in which preexisting and new calls are cancelled. Although
123+
* forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
124+
* return {@code false} immediately after this method returns.
125+
*
126+
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
127+
*/
128+
// TODO(user): cancel preexisting calls.
129+
public synchronized ChannelImpl shutdownNow() {
130+
shutdown();
131+
return this;
132+
}
133+
134+
/**
135+
* Returns whether the channel is shutdown. Shutdown channels immediately cancel any new calls,
136+
* but may still have some calls being processed.
137+
*
138+
* @see #shutdown()
139+
* @see #isTerminated()
140+
*/
141+
public synchronized boolean isShutdown() {
142+
return shutdown;
143+
}
144+
145+
/**
146+
* Waits for the channel to become terminated, giving up if the timeout is reached.
147+
*
148+
* @return whether the channel is terminated, as would be done by {@link #isTerminated()}.
149+
*/
150+
public synchronized boolean awaitTerminated(long timeout, TimeUnit unit)
151+
throws InterruptedException {
152+
long timeoutNanos = unit.toNanos(timeout);
153+
long endTimeNanos = System.nanoTime() + timeoutNanos;
154+
while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
155+
TimeUnit.NANOSECONDS.timedWait(this, timeoutNanos);
156+
}
157+
return terminated;
158+
}
159+
160+
/**
161+
* Returns whether the channel is terminated. Terminated channels have no running calls and
162+
* relevant resources released (like TCP connections).
163+
*
164+
* @see #isShutdown()
165+
*/
166+
public synchronized boolean isTerminated() {
167+
return terminated;
168+
}
169+
170+
/**
171+
* Creates a new outgoing call on the channel.
172+
*/
103173
@Override
104174
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
105175
return new CallImpl<ReqT, RespT>(method, new SerializingExecutor(executor));
106176
}
107177

108-
private synchronized ClientTransport obtainActiveTransport(boolean notifyWhenRunning) {
109-
if (activeTransport == null) {
110-
State state = state();
111-
if (state != RUNNING && state != STARTING) {
112-
throw new IllegalStateException("Not running");
113-
}
114-
ClientTransport newTransport = transportFactory.newClientTransport();
115-
activeTransport = newTransport;
116-
transports.add(newTransport);
117-
// activeTransport reference can be changed during calls to the transport, even if we hold the
118-
// lock, due to reentrancy.
119-
newTransport.addListener(new TransportListener(newTransport, notifyWhenRunning),
120-
MoreExecutors.directExecutor());
121-
newTransport.startAsync();
122-
return newTransport;
178+
private synchronized ClientTransport obtainActiveTransport() {
179+
if (shutdown) {
180+
return null;
181+
}
182+
if (activeTransport != null) {
183+
return activeTransport;
123184
}
124-
return activeTransport;
185+
ClientTransport newTransport = transportFactory.newClientTransport();
186+
activeTransport = newTransport;
187+
transports.add(newTransport);
188+
// activeTransport reference can be changed during calls to the transport, even if we hold the
189+
// lock, due to reentrancy.
190+
newTransport.addListener(new TransportListener(newTransport),
191+
MoreExecutors.directExecutor());
192+
newTransport.startAsync();
193+
return newTransport;
125194
}
126195

127-
private synchronized void transportFailedOrStopped(ClientTransport transport, Throwable t) {
128-
if (transport.state() == State.FAILED) {
129-
log.log(Level.SEVERE, "client transport failed " + transport.getClass().getName(),
130-
transport.failureCause());
131-
}
196+
private synchronized void transportFailedOrTerminated(ClientTransport transport) {
132197
if (activeTransport == transport) {
133198
activeTransport = null;
134199
}
135200
transports.remove(transport);
136-
if (state() != RUNNING && transports.isEmpty()) {
137-
if (t != null) {
138-
notifyFailed(t);
139-
} else {
140-
notifyStopped();
201+
if (shutdown && transports.isEmpty()) {
202+
terminated = true;
203+
notifyAll();
204+
if (terminationRunnable != null) {
205+
terminationRunnable.run();
141206
}
142207
}
143208
}
144209

145210
private class TransportListener extends Listener {
146211
private final ClientTransport transport;
147-
private final boolean notifyWhenRunning;
148212

149-
public TransportListener(ClientTransport transport, boolean notifyWhenRunning) {
213+
public TransportListener(ClientTransport transport) {
150214
this.transport = transport;
151-
this.notifyWhenRunning = notifyWhenRunning;
152215
}
153216

154217
@Override
@@ -162,20 +225,17 @@ public void stopping(State from) {
162225

163226
@Override
164227
public void failed(State from, Throwable failure) {
165-
transportFailedOrStopped(transport, failure);
228+
log.log(Level.SEVERE, "Client transport failed", failure);
229+
transportFailedOrTerminated(transport);
166230
}
167231

168232
@Override
169233
public void terminated(State from) {
170-
transportFailedOrStopped(transport, null);
234+
transportFailedOrTerminated(transport);
171235
}
172236

173237
@Override
174-
public void running() {
175-
if (notifyWhenRunning) {
176-
notifyStarted();
177-
}
178-
}
238+
public void running() {}
179239
}
180240

181241
private class CallImpl<ReqT, RespT> extends Call<ReqT, RespT> {
@@ -191,8 +251,23 @@ public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor execut
191251
@Override
192252
public void start(Listener<RespT> observer, Metadata.Headers headers) {
193253
Preconditions.checkState(stream == null, "Already started");
194-
stream = obtainActiveTransport(false).newStream(method, headers,
195-
new ClientStreamListenerImpl(observer));
254+
ClientStreamListener listener = new ClientStreamListenerImpl(observer);
255+
ClientTransport transport = obtainActiveTransport();
256+
if (transport == null) {
257+
stream = new NoopClientStream();
258+
listener.closed(Status.CANCELLED.withDescription("Channel is shutdown"),
259+
new Metadata.Trailers());
260+
return;
261+
}
262+
try {
263+
stream = transport.newStream(method, headers, listener);
264+
} catch (IllegalStateException ex) {
265+
// We can race with the transport and end up trying to use a terminated transport.
266+
// TODO(user): Improve the API to remove the possibility of the race.
267+
stream = new NoopClientStream();
268+
listener.closed(Status.fromThrowable(ex), new Metadata.Trailers());
269+
return;
270+
}
196271
}
197272

198273
@Override

0 commit comments

Comments
 (0)