Skip to content

Commit bb9699e

Browse files
zhangkun83ejona86
authored andcommitted
Fixes a few issues in netty server transport:
- Creates and passes a transport instance to ServerListener.transportCreated(). - Keeps the "/" prefix of the fully qualified method name when passing it to the handler registry. - Adds necessary "this." when accessing a member variable in ServerCalls. - BlockingResponseStream.buffer should be added with BlockingResponseStream.this as as the mark of end of data. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=76922440
1 parent 53acb1c commit bb9699e

6 files changed

Lines changed: 78 additions & 43 deletions

File tree

core/src/main/java/com/google/net/stubby/ServerImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,9 @@ public void run() {
234234
try {
235235
HandlerRegistry.Method method = registry.lookupMethod(methodName);
236236
if (method == null) {
237-
stream.close(Status.UNIMPLEMENTED, new Metadata.Trailers());
237+
stream.close(
238+
Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
239+
new Metadata.Trailers());
238240
return;
239241
}
240242
listener = startCall(stream, methodName, method.getMethodDefinition(), headers);

core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public static String getFullMethodNameFromPath(String path) {
5454
if (!path.startsWith("/")) {
5555
return null;
5656
}
57-
return path.substring(1);
57+
return path;
5858
}
5959

6060
private TransportFrameUtil() {}

core/src/main/java/com/google/net/stubby/newtransport/netty/NettyServer.java

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,6 @@
88
import com.google.net.stubby.newtransport.ServerListener;
99
import com.google.net.stubby.newtransport.ServerTransportListener;
1010

11-
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
12-
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
13-
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
14-
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
15-
import io.netty.handler.codec.http2.Http2Connection;
16-
import io.netty.handler.codec.http2.Http2FrameLogger;
17-
import io.netty.handler.codec.http2.Http2FrameReader;
18-
import io.netty.handler.codec.http2.Http2FrameWriter;
19-
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
20-
import io.netty.handler.codec.http2.Http2OutboundFlowController;
21-
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
22-
import io.netty.util.internal.logging.InternalLogLevel;
2311

2412
import io.netty.bootstrap.ServerBootstrap;
2513
import io.netty.channel.Channel;
@@ -30,8 +18,6 @@
3018
import io.netty.channel.nio.NioEventLoopGroup;
3119
import io.netty.channel.socket.SocketChannel;
3220
import io.netty.channel.socket.nio.NioServerSocketChannel;
33-
import io.netty.handler.codec.http2.DefaultHttp2Connection;
34-
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
3521

3622
/**
3723
* Implementation of the {@link com.google.common.util.concurrent.Service} interface for a
@@ -57,9 +43,9 @@ public NettyServer(final ServerListener serverListener, int port, EventLoopGroup
5743
this.channelInitializer = new ChannelInitializer<SocketChannel>() {
5844
@Override
5945
public void initChannel(SocketChannel ch) throws Exception {
60-
// TODO(user): pass a real transport object
61-
ServerTransportListener transportListener = serverListener.transportCreated(null);
62-
ch.pipeline().addLast(newHandler(transportListener));
46+
NettyServerTransport transport = new NettyServerTransport();
47+
transport.startAsync();
48+
transport.bind(ch, serverListener);
6349
}
6450
};
6551
this.bossGroup = bossGroup;
@@ -113,26 +99,4 @@ public void operationComplete(ChannelFuture future) throws Exception {
11399
workerGroup.shutdownGracefully();
114100
}
115101
}
116-
117-
private static NettyServerHandler newHandler(ServerTransportListener transportListener) {
118-
Http2Connection connection =
119-
new DefaultHttp2Connection(true, new DefaultHttp2StreamRemovalPolicy());
120-
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
121-
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
122-
123-
Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG);
124-
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
125-
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
126-
127-
DefaultHttp2InboundFlowController inboundFlow =
128-
new DefaultHttp2InboundFlowController(connection, frameWriter);
129-
Http2OutboundFlowController outboundFlow =
130-
new DefaultHttp2OutboundFlowController(connection, frameWriter);
131-
return new NettyServerHandler(transportListener,
132-
connection,
133-
frameReader,
134-
frameWriter,
135-
inboundFlow,
136-
outboundFlow);
137-
}
138102
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.google.net.stubby.newtransport.netty;
2+
3+
import com.google.common.base.Preconditions;
4+
import com.google.common.util.concurrent.AbstractService;
5+
import com.google.net.stubby.newtransport.ServerListener;
6+
import com.google.net.stubby.newtransport.ServerTransportListener;
7+
8+
import io.netty.channel.socket.SocketChannel;
9+
import io.netty.handler.codec.http2.DefaultHttp2Connection;
10+
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
11+
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
12+
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
13+
import io.netty.handler.codec.http2.DefaultHttp2OutboundFlowController;
14+
import io.netty.handler.codec.http2.DefaultHttp2StreamRemovalPolicy;
15+
import io.netty.handler.codec.http2.Http2Connection;
16+
import io.netty.handler.codec.http2.Http2FrameLogger;
17+
import io.netty.handler.codec.http2.Http2FrameReader;
18+
import io.netty.handler.codec.http2.Http2FrameWriter;
19+
import io.netty.handler.codec.http2.Http2InboundFrameLogger;
20+
import io.netty.handler.codec.http2.Http2OutboundFlowController;
21+
import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
22+
import io.netty.util.internal.logging.InternalLogLevel;
23+
24+
/**
25+
* The Netty-based server transport.
26+
*/
27+
class NettyServerTransport extends AbstractService {
28+
29+
NettyServerHandler handler;
30+
31+
@Override
32+
protected void doStart() {
33+
notifyStarted();
34+
}
35+
36+
@Override
37+
protected void doStop() {
38+
// TODO(user): signal GO_AWAY and optionally terminate the socket after a timeout
39+
notifyStopped();
40+
}
41+
42+
/**
43+
* This must be called when the transport is starting or running.
44+
*/
45+
void bind(SocketChannel ch, ServerListener serverListener) {
46+
Preconditions.checkState(handler == null, "Handler already registered");
47+
ServerTransportListener transportListener = serverListener.transportCreated(this);
48+
Http2Connection connection =
49+
new DefaultHttp2Connection(true, new DefaultHttp2StreamRemovalPolicy());
50+
Http2FrameReader frameReader = new DefaultHttp2FrameReader();
51+
Http2FrameWriter frameWriter = new DefaultHttp2FrameWriter();
52+
53+
Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG);
54+
frameReader = new Http2InboundFrameLogger(frameReader, frameLogger);
55+
frameWriter = new Http2OutboundFrameLogger(frameWriter, frameLogger);
56+
57+
DefaultHttp2InboundFlowController inboundFlow =
58+
new DefaultHttp2InboundFlowController(connection, frameWriter);
59+
Http2OutboundFlowController outboundFlow =
60+
new DefaultHttp2OutboundFlowController(connection, frameWriter);
61+
handler = new NettyServerHandler(transportListener,
62+
connection,
63+
frameReader,
64+
frameWriter,
65+
inboundFlow,
66+
outboundFlow);
67+
ch.pipeline().addLast(handler);
68+
}
69+
}

stub/src/main/java/com/google/net/stubby/stub/Calls.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public ListenableFuture<Void> onPayload(T value) {
357357
public void onClose(Status status, Metadata.Trailers trailers) {
358358
Preconditions.checkState(!done, "Call already closed");
359359
if (status.isOk()) {
360-
buffer.add(this);
360+
buffer.add(BlockingResponseStream.this);
361361
} else {
362362
buffer.add(status.asRuntimeException());
363363
}

stub/src/main/java/com/google/net/stubby/stub/ServerCalls.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public ServerCall.Listener<ReqT> startCall(
3232
ReqT request;
3333
@Override
3434
public ListenableFuture<Void> onPayload(ReqT request) {
35-
if (request == null) {
35+
if (this.request == null) {
3636
// We delay calling method.invoke() until onHalfClose(), because application may call
3737
// close(OK) inside invoke(), while close(OK) is not allowed before onHalfClose().
3838
this.request = request;

0 commit comments

Comments
 (0)