Skip to content

Commit 653ffd9

Browse files
committed
ServerImpl
Interfaces were corrected. ChannelImpl was updated to use same style of exception handling of ServerImpl. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=76702816
1 parent eb3fc58 commit 653ffd9

10 files changed

Lines changed: 889 additions & 71 deletions

File tree

core/src/main/java/com/google/net/stubby/ChannelImpl.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package com.google.net.stubby;
22

33
import com.google.common.base.Preconditions;
4-
import com.google.common.base.Throwables;
54
import com.google.common.util.concurrent.AbstractService;
5+
import com.google.common.util.concurrent.Futures;
6+
import com.google.common.util.concurrent.FutureCallback;
67
import com.google.common.util.concurrent.ListenableFuture;
78
import com.google.common.util.concurrent.MoreExecutors;
89
import com.google.common.util.concurrent.SettableFuture;
@@ -161,19 +162,27 @@ private int available(InputStream is) {
161162
try {
162163
return is.available();
163164
} catch (IOException ex) {
164-
throw Throwables.propagate(ex);
165+
throw new RuntimeException(ex);
165166
}
166167
}
167168

168169
@Override
169170
public void sendPayload(ReqT payload, SettableFuture<Void> accepted) {
170171
Preconditions.checkState(stream != null, "Not started");
171-
InputStream payloadIs = method.streamRequest(payload);
172-
if (accepted == null) {
173-
stream.writeMessage(payloadIs, available(payloadIs), null);
174-
} else {
175-
inProcessFutures.add(accepted);
176-
stream.writeMessage(payloadIs, available(payloadIs), new AcceptedRunnable(accepted));
172+
boolean failed = true;
173+
try {
174+
InputStream payloadIs = method.streamRequest(payload);
175+
if (accepted == null) {
176+
stream.writeMessage(payloadIs, available(payloadIs), null);
177+
} else {
178+
inProcessFutures.add(accepted);
179+
stream.writeMessage(payloadIs, available(payloadIs), new AcceptedRunnable(accepted));
180+
}
181+
failed = false;
182+
} finally {
183+
if (failed) {
184+
cancel();
185+
}
177186
}
178187
stream.flush();
179188
}
@@ -211,18 +220,19 @@ public void run() {
211220
if (theirs == null) {
212221
ours.set(null);
213222
} else {
214-
theirs.addListener(new Runnable() {
223+
Futures.addCallback(theirs, new FutureCallback<Void>() {
215224
@Override
216-
public void run() {
217-
// TODO(user): If their Future fails, should we Call.cancel()?
225+
public void onSuccess(Void result) {
218226
ours.set(null);
219227
}
228+
@Override
229+
public void onFailure(Throwable t) {
230+
ours.setException(t);
231+
}
220232
}, MoreExecutors.directExecutor());
221233
}
222234
} catch (Throwable t) {
223-
ours.set(null);
224-
CallImpl.this.cancel();
225-
Throwables.propagate(t);
235+
ours.setException(t);
226236
}
227237
}
228238
});

core/src/main/java/com/google/net/stubby/ServerCall.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,6 @@ public abstract class ServerCall<ResponseT> {
3232
// before client half closes. It may be that we treat such a case as an error. If we permit such
3333
// a case then we either get to generate a half close or purposefully omit it.
3434
public abstract static class Listener<RequestT> {
35-
/**
36-
* Called upon receiving all header information from the remote end-point.
37-
* <p>This method should return quickly, as the same thread may be used to process other
38-
* streams.
39-
*
40-
* @param headers the fully buffered received headers.
41-
* @return a processing completion future, or {@code null} to indicate that processing of the
42-
* headers is immediately complete.
43-
*/
44-
@Nullable
45-
public abstract ListenableFuture<Void> headersRead(Metadata.Headers headers);
46-
4735
/**
4836
* A request payload has been receiveed. For streaming calls, there may be zero payload
4937
* messages.

core/src/main/java/com/google/net/stubby/ServerCallHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@ public interface ServerCallHandler<ReqT, RespT> {
2020
* @param call object for responding
2121
* @return listener for processing incoming messages for {@code call}
2222
*/
23-
ServerCall.Listener<ReqT> startCall(String fullMethodName, ServerCall<RespT> call);
23+
ServerCall.Listener<ReqT> startCall(String fullMethodName, ServerCall<RespT> call,
24+
Metadata.Headers headers);
2425
}

0 commit comments

Comments
 (0)