Skip to content

Commit 28497e3

Browse files
lryanejona86
authored andcommitted
Add support for servers to deliver response headers
Fixes some propagation issues for trailers too Adds some more testing for metadata exchange ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=77979593
1 parent 29d8c03 commit 28497e3

24 files changed

Lines changed: 423 additions & 113 deletions

core/src/main/java/com/google/net/stubby/Metadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public static <T> Key<T> of(String name, Marshaller<T> marshaller) {
449449
*/
450450
private Key(String name, Marshaller<T> marshaller) {
451451
this.name = Preconditions.checkNotNull(name, "name").toLowerCase().intern();
452-
this.asciiName = name.getBytes(US_ASCII);
452+
this.asciiName = this.name.getBytes(US_ASCII);
453453
this.marshaller = Preconditions.checkNotNull(marshaller);
454454
}
455455

core/src/main/java/com/google/net/stubby/ServerCall.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*
1515
* <p>No generic method for determining message receipt or providing acknowledgement is provided.
1616
* Applications are expected to utilize normal payload messages for such signals, as a response
17-
* natually acknowledges its request.
17+
* naturally acknowledges its request.
1818
*
1919
* <p>Methods are guaranteed to be non-blocking. Implementations are not required to be thread-safe.
2020
*/
@@ -33,7 +33,7 @@ public abstract class ServerCall<ResponseT> {
3333
// a case then we either get to generate a half close or purposefully omit it.
3434
public abstract static class Listener<RequestT> {
3535
/**
36-
* A request payload has been receiveed. For streaming calls, there may be zero payload
36+
* A request payload has been received. For streaming calls, there may be zero payload
3737
* messages.
3838
*/
3939
@Nullable
@@ -63,16 +63,14 @@ public abstract static class Listener<RequestT> {
6363
}
6464

6565
/**
66-
* Close the call with the provided status. No further sending or receiving will occur. If {@code
67-
* status} is not equal to {@link Status#OK}, then the call is said to have failed.
68-
*
69-
* <p>If {@code status} is not {@link Status#CANCELLED} and no errors or cancellations are known
70-
* to have occured, then a {@link Listener#onComplete} notification should be expected.
71-
* Otherwise {@link Listener#onCancel} has been or will be called.
66+
* Send response header metadata prior to sending a response payload. This method may
67+
* only be called once and cannot be called after calls to {@code Stream#sendPayload}
68+
* or {@code #close}.
7269
*
73-
* @throws IllegalStateException if call is already {@code close}d
70+
* @param headers metadata to send prior to any response body.
71+
* @throws IllegalStateException if {@code close} has been called or a payload has been sent.
7472
*/
75-
public abstract void close(Status status, Metadata.Trailers trailers);
73+
public abstract void sendHeaders(Metadata.Headers headers);
7674

7775
/**
7876
* Send a payload message. Payload messages are the primary form of communication associated with
@@ -83,6 +81,18 @@ public abstract static class Listener<RequestT> {
8381
*/
8482
public abstract void sendPayload(ResponseT payload);
8583

84+
/**
85+
* Close the call with the provided status. No further sending or receiving will occur. If {@code
86+
* status} is not equal to {@link Status#OK}, then the call is said to have failed.
87+
*
88+
* <p>If {@code status} is not {@link Status#CANCELLED} and no errors or cancellations are known
89+
* to have occured, then a {@link Listener#onComplete} notification should be expected.
90+
* Otherwise {@link Listener#onCancel} has been or will be called.
91+
*
92+
* @throws IllegalStateException if call is already {@code close}d
93+
*/
94+
public abstract void close(Status status, Metadata.Trailers trailers);
95+
8696
/**
8797
* Returns {@code true} when the call is cancelled and the server is encouraged to abort
8898
* processing to save resources, since the client will not be processing any further methods.

core/src/main/java/com/google/net/stubby/ServerImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,8 @@ public ServerCallImpl(ServerStream stream, ServerMethodDefinition<ReqT, RespT> m
395395
}
396396

397397
@Override
398-
public void close(Status status, Metadata.Trailers trailers) {
399-
stream.close(status, trailers);
398+
public void sendHeaders(Metadata.Headers headers) {
399+
stream.writeHeaders(headers);
400400
}
401401

402402
@Override
@@ -411,6 +411,11 @@ public void sendPayload(RespT payload) {
411411
}
412412
}
413413

414+
@Override
415+
public void close(Status status, Metadata.Trailers trailers) {
416+
stream.close(status, trailers);
417+
}
418+
414419
@Override
415420
public boolean isCancelled() {
416421
return cancelled;

core/src/main/java/com/google/net/stubby/ServerInterceptors.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,25 @@
22

33
import com.google.common.base.Preconditions;
44
import com.google.common.collect.ImmutableList;
5-
import com.google.net.stubby.ServerMethodDefinition;
6-
import com.google.net.stubby.ServerCallHandler;
7-
import com.google.net.stubby.ServerInterceptor;
8-
import com.google.net.stubby.ServerServiceDefinition;
95

10-
import java.util.List;
6+
import java.util.Arrays;
117
import java.util.Iterator;
8+
import java.util.List;
129

1310
/** Utility class for {@link ServerInterceptor}s. */
1411
public class ServerInterceptors {
1512
// Prevent instantiation
1613
private ServerInterceptors() {}
1714

15+
/**
16+
* Create a new {@code ServerServiceDefinition} whose {@link ServerCallHandler}s will call {@code
17+
* interceptors} before calling the pre-existing {@code ServerCallHandler}.
18+
*/
19+
public static ServerServiceDefinition intercept(ServerServiceDefinition serviceDef,
20+
ServerInterceptor... interceptors) {
21+
return intercept(serviceDef, Arrays.asList(interceptors));
22+
}
23+
1824
/**
1925
* Create a new {@code ServerServiceDefinition} whose {@link ServerCallHandler}s will call {@code
2026
* interceptors} before calling the pre-existing {@code ServerCallHandler}.
@@ -92,4 +98,36 @@ public ServerCall.Listener<ReqT> startCall(String method, ServerCall<RespT> call
9298
}
9399
}
94100
}
101+
102+
/**
103+
* Utility base class for decorating {@link ServerCall} instances.
104+
*/
105+
public static class ForwardingServerCall<RespT> extends ServerCall<RespT> {
106+
107+
private final ServerCall<RespT> delegate;
108+
109+
public ForwardingServerCall(ServerCall<RespT> delegate) {
110+
this.delegate = delegate;
111+
}
112+
113+
@Override
114+
public void sendHeaders(Metadata.Headers headers) {
115+
delegate.sendHeaders(headers);
116+
}
117+
118+
@Override
119+
public void sendPayload(RespT payload) {
120+
delegate.sendPayload(payload);
121+
}
122+
123+
@Override
124+
public void close(Status status, Metadata.Trailers trailers) {
125+
delegate.close(status, trailers);
126+
}
127+
128+
@Override
129+
public boolean isCancelled() {
130+
return delegate.isCancelled();
131+
}
132+
}
95133
}

core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.InputStream;
1313
import java.nio.ByteBuffer;
1414

15+
import javax.annotation.Nullable;
1516
import javax.annotation.concurrent.GuardedBy;
1617

1718
/**
@@ -43,6 +44,11 @@ protected ListenableFuture<Void> receiveMessage(InputStream is, int length) {
4344
return listener.messageRead(is, length);
4445
}
4546

47+
@Override
48+
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
49+
super.writeMessage(message, length, accepted);
50+
}
51+
4652
/** gRPC protocol v1 support */
4753
@Override
4854
protected void receiveStatus(Status status) {

core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.InputStream;
1313
import java.nio.ByteBuffer;
1414

15+
import javax.annotation.Nullable;
1516
import javax.annotation.concurrent.GuardedBy;
1617

1718
/**
@@ -23,10 +24,13 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
2324

2425
private final Object stateLock = new Object();
2526
private volatile StreamState state = StreamState.OPEN;
27+
private boolean headersSent = false;
2628
/** Whether listener.closed() has been called. */
2729
@GuardedBy("stateLock")
2830
private boolean listenerClosed;
29-
/** Whether the stream was closed gracefull by the application (vs. a transport-level failure). */
31+
/**
32+
* Whether the stream was closed gracefully by the application (vs. a transport-level failure).
33+
*/
3034
private boolean gracefulClose;
3135
/** Saved trailers from close() that need to be sent once the framer has sent all messages. */
3236
private Metadata.Trailers stashedTrailers;
@@ -47,6 +51,24 @@ protected void receiveStatus(Status status) {
4751
Preconditions.checkState(status == Status.OK, "Received status can only be OK on server");
4852
}
4953

54+
@Override
55+
public void writeHeaders(Metadata.Headers headers) {
56+
Preconditions.checkNotNull(headers, "headers");
57+
outboundPhase(Phase.HEADERS);
58+
headersSent = true;
59+
internalSendHeaders(headers);
60+
outboundPhase(Phase.MESSAGE);
61+
}
62+
63+
@Override
64+
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
65+
if (!headersSent) {
66+
writeHeaders(new Metadata.Headers());
67+
headersSent = true;
68+
}
69+
super.writeMessage(message, length, accepted);
70+
}
71+
5072
@Override
5173
public final void close(Status status, Metadata.Trailers trailers) {
5274
Preconditions.checkNotNull(status, "status");
@@ -78,12 +100,19 @@ protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
78100
sendFrame(frame, false);
79101
}
80102
if (endOfStream) {
81-
sendTrailers(stashedTrailers);
103+
sendTrailers(stashedTrailers, headersSent);
104+
headersSent = true;
82105
stashedTrailers = null;
83106
}
84107
}
85108
}
86109

110+
/**
111+
* Sends response headers to the remote end points.
112+
* @param headers to be sent to client.
113+
*/
114+
protected abstract void internalSendHeaders(Metadata.Headers headers);
115+
87116
/**
88117
* Sends an outbound frame to the remote end point.
89118
*
@@ -97,8 +126,9 @@ protected final void internalSendFrame(ByteBuffer frame, boolean endOfStream) {
97126
* Sends trailers to the remote end point. This call implies end of stream.
98127
*
99128
* @param trailers metadata to be sent to end point
129+
* @param headersSent true if response headers have already been sent.
100130
*/
101-
protected abstract void sendTrailers(Metadata.Trailers trailers);
131+
protected abstract void sendTrailers(Metadata.Trailers trailers, boolean headersSent);
102132

103133
/**
104134
* The Stream is considered completely closed and there is no further opportunity for error. It

core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void dispose() {
9090
}
9191

9292
@Override
93-
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
93+
public void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
9494
Preconditions.checkNotNull(message, "message");
9595
Preconditions.checkArgument(length >= 0, "length must be >= 0");
9696
outboundPhase(Phase.MESSAGE);

core/src/main/java/com/google/net/stubby/newtransport/HttpUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public final class HttpUtil {
2323
/**
2424
* Content-Type used for GRPC-over-HTTP/2.
2525
*/
26-
public static final String CONTENT_TYPE_PROTORPC = "application/grpc";
26+
public static final String CONTENT_TYPE_GRPC = "application/grpc";
2727

2828
/**
2929
* The HTTP method used for GRPC requests.

core/src/main/java/com/google/net/stubby/newtransport/ServerStream.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,18 @@
88
*/
99
public interface ServerStream extends Stream {
1010

11+
/**
12+
* Writes custom metadata as headers on the response stream sent to the client. This method may
13+
* only be called once and cannot be called after calls to {@code Stream#writePayload}
14+
* or {@code #close}.
15+
*
16+
* @param headers to send to client.
17+
*/
18+
void writeHeaders(Metadata.Headers headers);
19+
1120
/**
1221
* Closes the stream for both reading and writing. A status code of
13-
* {@link com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the
22+
* {@link com.google.net.stubby.Status.Code#OK} implies normal termination of the
1423
* stream. Any other value implies abnormal termination.
1524
*
1625
* @param status details of the closure

core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private void initListener() {
137137
private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream)
138138
throws Http2Exception {
139139
NettyClientStream stream = clientStream(connection().requireStream(streamId));
140-
stream.inboundHeadersRecieved(headers, endStream);
140+
stream.inboundHeadersReceived(headers, endStream);
141141
}
142142

143143
/**

0 commit comments

Comments
 (0)