Skip to content

Commit 1216de6

Browse files
author
Louis Ryan
committed
Add support to Netty builders for other channel types. Demonstrate and test use of this with local channels
1 parent 89cb2d1 commit 1216de6

6 files changed

Lines changed: 125 additions & 32 deletions

File tree

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2014, Google Inc. All rights reserved.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions are
6+
* met:
7+
*
8+
* * Redistributions of source code must retain the above copyright
9+
* notice, this list of conditions and the following disclaimer.
10+
* * Redistributions in binary form must reproduce the above
11+
* copyright notice, this list of conditions and the following disclaimer
12+
* in the documentation and/or other materials provided with the
13+
* distribution.
14+
*
15+
* * Neither the name of Google Inc. nor the names of its
16+
* contributors may be used to endorse or promote products derived from
17+
* this software without specific prior written permission.
18+
*
19+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21+
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22+
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23+
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26+
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27+
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28+
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
*/
31+
package io.grpc.testing.integration;
32+
33+
import org.junit.AfterClass;
34+
import org.junit.BeforeClass;
35+
36+
import io.grpc.ChannelImpl;
37+
import io.grpc.transport.netty.NegotiationType;
38+
import io.grpc.transport.netty.NettyChannelBuilder;
39+
import io.grpc.transport.netty.NettyServerBuilder;
40+
import io.netty.channel.local.LocalAddress;
41+
import io.netty.channel.local.LocalChannel;
42+
import io.netty.channel.local.LocalServerChannel;
43+
44+
/**
45+
* Run transport tests over the Netty in-process channel.
46+
*/
47+
public class Http2NettyLocalChannelTest extends AbstractTransportTest {
48+
49+
@BeforeClass
50+
public static void startServer() {
51+
startStaticServer(
52+
NettyServerBuilder
53+
.forAddress(new LocalAddress("in-process-1"))
54+
.channelType(LocalServerChannel.class));
55+
}
56+
57+
@AfterClass
58+
public static void stopServer() {
59+
stopStaticServer();
60+
}
61+
62+
@Override
63+
protected ChannelImpl createChannel() {
64+
return NettyChannelBuilder
65+
.forAddress(new LocalAddress("in-process-1"))
66+
.negotiationType(NegotiationType.PLAINTEXT)
67+
.channelType(LocalChannel.class)
68+
.build();
69+
}
70+
}

netty/src/main/java/io/grpc/transport/netty/NettyChannelBuilder.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@
3131

3232
package io.grpc.transport.netty;
3333

34+
import com.google.common.base.Preconditions;
35+
3436
import io.grpc.AbstractChannelBuilder;
3537
import io.grpc.SharedResourceHolder;
3638
import io.grpc.transport.ClientTransportFactory;
39+
import io.netty.channel.Channel;
3740
import io.netty.channel.EventLoopGroup;
41+
import io.netty.channel.socket.nio.NioSocketChannel;
3842
import io.netty.handler.ssl.SslContext;
3943

4044
import java.net.InetSocketAddress;
@@ -48,6 +52,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
4852
private final SocketAddress serverAddress;
4953

5054
private NegotiationType negotiationType = NegotiationType.TLS;
55+
private Class<? extends Channel> channelType = NioSocketChannel.class;
5156
private EventLoopGroup userEventLoopGroup;
5257
private SslContext sslContext;
5358

@@ -69,6 +74,14 @@ private NettyChannelBuilder(SocketAddress serverAddress) {
6974
this.serverAddress = serverAddress;
7075
}
7176

77+
/**
78+
* Specify the channel type to use, by default we use {@link NioSocketChannel}.
79+
*/
80+
public NettyChannelBuilder channelType(Class<? extends Channel> channelType) {
81+
this.channelType = Preconditions.checkNotNull(channelType);
82+
return this;
83+
}
84+
7285
/**
7386
* Sets the negotiation type for the HTTP/2 connection.
7487
*
@@ -104,7 +117,7 @@ protected ChannelEssentials buildEssentials() {
104117
final EventLoopGroup group = (userEventLoopGroup == null)
105118
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP) : userEventLoopGroup;
106119
ClientTransportFactory transportFactory = new NettyClientTransportFactory(
107-
serverAddress, negotiationType, group, sslContext);
120+
serverAddress, channelType, negotiationType, group, sslContext);
108121
Runnable terminationRunnable = null;
109122
if (userEventLoopGroup == null) {
110123
terminationRunnable = new Runnable() {

netty/src/main/java/io/grpc/transport/netty/NettyClientTransport.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@
4747
import io.netty.channel.ChannelFuture;
4848
import io.netty.channel.ChannelFutureListener;
4949
import io.netty.channel.EventLoopGroup;
50-
import io.netty.channel.local.LocalAddress;
51-
import io.netty.channel.local.LocalChannel;
5250
import io.netty.channel.socket.nio.NioSocketChannel;
5351
import io.netty.handler.codec.AsciiString;
5452
import io.netty.handler.codec.http2.DefaultHttp2Connection;
@@ -85,6 +83,7 @@ class NettyClientTransport implements ClientTransport {
8583
private static final Logger log = Logger.getLogger(NettyClientTransport.class.getName());
8684

8785
private final SocketAddress address;
86+
private final Class<? extends Channel> channelType;
8887
private final EventLoopGroup group;
8988
private final Http2Negotiator.Negotiation negotiation;
9089
private final NettyClientHandler handler;
@@ -109,22 +108,23 @@ class NettyClientTransport implements ClientTransport {
109108
@GuardedBy("this")
110109
private boolean terminated;
111110

112-
NettyClientTransport(SocketAddress address, NegotiationType negotiationType,
113-
EventLoopGroup group, SslContext sslContext) {
111+
NettyClientTransport(SocketAddress address, Class<? extends Channel> channelType,
112+
NegotiationType negotiationType, EventLoopGroup group, SslContext sslContext) {
114113
Preconditions.checkNotNull(negotiationType, "negotiationType");
115114
this.address = Preconditions.checkNotNull(address, "address");
116115
this.group = Preconditions.checkNotNull(group, "group");
116+
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
117117

118118
InetSocketAddress inetAddress = null;
119119
if (address instanceof InetSocketAddress) {
120120
inetAddress = (InetSocketAddress) address;
121121
authority = new AsciiString(inetAddress.getHostString() + ":" + inetAddress.getPort());
122-
} else if (address instanceof LocalAddress) {
123-
authority = new AsciiString(address.toString());
124-
Preconditions.checkArgument(negotiationType != NegotiationType.TLS,
125-
"TLS not supported for in-process transport");
126122
} else {
127-
throw new IllegalStateException("Unknown socket address type " + address.toString());
123+
Preconditions.checkState(negotiationType != NegotiationType.TLS,
124+
"TLS not supported for non-internet socket types");
125+
// Specialized address types are allowed to support custom Channel types so just assume their
126+
// toString() values are valid :authority values
127+
authority = new AsciiString(address.toString());
128128
}
129129

130130
DefaultHttp2StreamRemovalPolicy streamRemovalPolicy = new DefaultHttp2StreamRemovalPolicy();
@@ -201,10 +201,8 @@ public void start(Listener transportListener) {
201201
listener = Preconditions.checkNotNull(transportListener, "listener");
202202
Bootstrap b = new Bootstrap();
203203
b.group(group);
204-
if (address instanceof LocalAddress) {
205-
b.channel(LocalChannel.class);
206-
} else {
207-
b.channel(NioSocketChannel.class);
204+
b.channel(channelType);
205+
if (NioSocketChannel.class.isAssignableFrom(channelType)) {
208206
b.option(SO_KEEPALIVE, true);
209207
}
210208
b.handler(negotiation.initializer());

netty/src/main/java/io/grpc/transport/netty/NettyClientTransportFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.common.base.Preconditions;
3535

3636
import io.grpc.transport.ClientTransportFactory;
37+
import io.netty.channel.Channel;
3738
import io.netty.channel.EventLoopGroup;
3839
import io.netty.handler.ssl.SslContext;
3940

@@ -46,19 +47,21 @@ class NettyClientTransportFactory implements ClientTransportFactory {
4647

4748
private final SocketAddress address;
4849
private final NegotiationType negotiationType;
50+
private final Class<? extends Channel> channelType;
4951
private final EventLoopGroup group;
5052
private final SslContext sslContext;
5153

52-
public NettyClientTransportFactory(SocketAddress address, NegotiationType negotiationType,
53-
EventLoopGroup group, SslContext sslContext) {
54+
public NettyClientTransportFactory(SocketAddress address, Class<? extends Channel> channelType,
55+
NegotiationType negotiationType, EventLoopGroup group, SslContext sslContext) {
5456
this.address = Preconditions.checkNotNull(address, "address");
5557
this.group = Preconditions.checkNotNull(group, "group");
5658
this.negotiationType = Preconditions.checkNotNull(negotiationType, "negotiationType");
59+
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
5760
this.sslContext = sslContext;
5861
}
5962

6063
@Override
6164
public NettyClientTransport newClientTransport() {
62-
return new NettyClientTransport(address, negotiationType, group, sslContext);
65+
return new NettyClientTransport(address, channelType, negotiationType, group, sslContext);
6366
}
6467
}

netty/src/main/java/io/grpc/transport/netty/NettyServer.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444
import io.netty.channel.ChannelFutureListener;
4545
import io.netty.channel.ChannelInitializer;
4646
import io.netty.channel.EventLoopGroup;
47-
import io.netty.channel.local.LocalAddress;
48-
import io.netty.channel.local.LocalServerChannel;
47+
import io.netty.channel.ServerChannel;
4948
import io.netty.channel.socket.nio.NioServerSocketChannel;
5049
import io.netty.handler.ssl.SslContext;
5150

@@ -59,22 +58,25 @@
5958
*/
6059
public class NettyServer extends AbstractService {
6160
private final SocketAddress address;
61+
private final Class<? extends ServerChannel> channelType;
6262
private final ChannelInitializer<Channel> channelInitializer;
6363
private final EventLoopGroup bossGroup;
6464
private final EventLoopGroup workerGroup;
6565
private Channel channel;
6666

67-
public NettyServer(ServerListener serverListener, SocketAddress address, EventLoopGroup bossGroup,
67+
public NettyServer(ServerListener serverListener, SocketAddress address,
68+
Class<? extends ServerChannel> channelType, EventLoopGroup bossGroup,
6869
EventLoopGroup workerGroup) {
69-
this(serverListener, address, bossGroup, workerGroup, null);
70+
this(serverListener, address, channelType, bossGroup, workerGroup, null);
7071
}
7172

7273
public NettyServer(final ServerListener serverListener, SocketAddress address,
73-
EventLoopGroup bossGroup,
74+
Class<? extends ServerChannel> channelType, EventLoopGroup bossGroup,
7475
EventLoopGroup workerGroup, @Nullable final SslContext sslContext) {
75-
Preconditions.checkNotNull(bossGroup, "bossGroup");
76-
Preconditions.checkNotNull(workerGroup, "workerGroup");
7776
this.address = address;
77+
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
78+
this.bossGroup = Preconditions.checkNotNull(bossGroup, "bossGroup");
79+
this.workerGroup = Preconditions.checkNotNull(workerGroup, "workerGroup");
7880
this.channelInitializer = new ChannelInitializer<Channel>() {
7981
@Override
8082
public void initChannel(Channel ch) throws Exception {
@@ -83,18 +85,14 @@ public void initChannel(Channel ch) throws Exception {
8385
// TODO(nmittler): Should we wait for transport shutdown before shutting down server?
8486
}
8587
};
86-
this.bossGroup = bossGroup;
87-
this.workerGroup = workerGroup;
8888
}
8989

9090
@Override
9191
protected void doStart() {
9292
ServerBootstrap b = new ServerBootstrap();
9393
b.group(bossGroup, workerGroup);
94-
if (address instanceof LocalAddress) {
95-
b.channel(LocalServerChannel.class);
96-
} else {
97-
b.channel(NioServerSocketChannel.class);
94+
b.channel(channelType);
95+
if (NioServerSocketChannel.class.isAssignableFrom(channelType)) {
9896
b.option(SO_BACKLOG, 128);
9997
b.childOption(SO_KEEPALIVE, true);
10098
}

netty/src/main/java/io/grpc/transport/netty/NettyServerBuilder.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
package io.grpc.transport.netty;
3333

34+
import com.google.common.base.Preconditions;
3435
import com.google.common.util.concurrent.MoreExecutors;
3536
import com.google.common.util.concurrent.Service;
3637

@@ -39,6 +40,8 @@
3940
import io.grpc.SharedResourceHolder;
4041
import io.grpc.transport.ServerListener;
4142
import io.netty.channel.EventLoopGroup;
43+
import io.netty.channel.ServerChannel;
44+
import io.netty.channel.socket.nio.NioServerSocketChannel;
4245
import io.netty.handler.ssl.SslContext;
4346
import io.grpc.ServerImpl;
4447

@@ -51,7 +54,7 @@
5154
public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerBuilder> {
5255

5356
private final SocketAddress address;
54-
57+
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
5558
private EventLoopGroup userBossEventLoopGroup;
5659
private EventLoopGroup userWorkerEventLoopGroup;
5760
private SslContext sslContext;
@@ -101,6 +104,14 @@ private NettyServerBuilder(SocketAddress address) {
101104
this.address = address;
102105
}
103106

107+
/**
108+
* Specify the channel type to use, by default we use {@link NioServerSocketChannel}.
109+
*/
110+
public NettyServerBuilder channelType(Class<? extends ServerChannel> channelType) {
111+
this.channelType = Preconditions.checkNotNull(channelType);
112+
return this;
113+
}
114+
104115
/**
105116
* Provides the boss EventGroupLoop to the server.
106117
*
@@ -162,7 +173,7 @@ protected Service buildTransportServer(ServerListener serverListener) {
162173
final EventLoopGroup workerEventLoopGroup = (userWorkerEventLoopGroup == null)
163174
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP)
164175
: userWorkerEventLoopGroup;
165-
NettyServer server = new NettyServer(serverListener, address, bossEventLoopGroup,
176+
NettyServer server = new NettyServer(serverListener, address, channelType, bossEventLoopGroup,
166177
workerEventLoopGroup, sslContext);
167178
if (userBossEventLoopGroup == null) {
168179
server.addListener(new ClosureHook() {

0 commit comments

Comments
 (0)