Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.api.gax.core.ApiClock;
import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
import com.google.common.collect.Lists;
Expand All @@ -36,7 +35,6 @@
import com.google.pubsub.v1.SubscriberGrpc.SubscriberFutureStub;
import com.google.pubsub.v1.Subscription;
import io.grpc.Channel;
import io.grpc.auth.MoreCallCredentials;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -64,7 +62,6 @@ final class PollingSubscriberConnection extends AbstractApiService implements Ac

public PollingSubscriberConnection(
String subscription,
Credentials credentials,
MessageReceiver receiver,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Expand All @@ -75,9 +72,7 @@ public PollingSubscriberConnection(
ApiClock clock) {
this.subscription = subscription;
this.executor = executor;
stub =
SubscriberGrpc.newFutureStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
stub = SubscriberGrpc.newFutureStub(channel);
messageDispatcher =
new MessageDispatcher(
receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.google.api.gax.core.ApiClock;
import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -34,7 +33,6 @@
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ClientResponseObserver;
Expand All @@ -58,7 +56,6 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;

private final Channel channel;
private final Credentials credentials;

private final String subscription;
private final ScheduledExecutorService executor;
Expand All @@ -67,7 +64,6 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

public StreamingSubscriberConnection(
String subscription,
Credentials credentials,
MessageReceiver receiver,
Duration ackExpirationPadding,
Duration maxAckExtensionPeriod,
Expand All @@ -79,7 +75,6 @@ public StreamingSubscriberConnection(
ApiClock clock) {
this.subscription = subscription;
this.executor = executor;
this.credentials = credentials;
this.channel = channel;
this.messageDispatcher =
new MessageDispatcher(
Expand Down Expand Up @@ -152,9 +147,7 @@ private void initialize() {
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
(ClientCallStreamObserver<StreamingPullRequest>)
(ClientCalls.asyncBidiStreamingCall(
channel.newCall(
SubscriberGrpc.METHOD_STREAMING_PULL,
CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials))),
channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, CallOptions.DEFAULT),
responseObserver));
logger.log(
Level.FINER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.gax.core.CurrentMillisClock;
import com.google.api.gax.core.FlowControlSettings;
import com.google.api.gax.core.FlowController;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.api.stats.Distribution;
Expand All @@ -32,12 +33,9 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.pubsub.v1.SubscriptionName;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -98,8 +96,8 @@ public class Subscriber extends AbstractApiService {
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
private final int numChannels;
private final FlowController flowController;
private final ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder;
private final Credentials credentials;
private final ChannelProvider channelProvider;
private final List<ManagedChannel> channels;
private final MessageReceiver receiver;
private final List<StreamingSubscriberConnection> streamingSubscriberConnections;
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
Expand Down Expand Up @@ -134,29 +132,10 @@ public void close() throws IOException {
});
}

// TODO(pongad): remove this when we move to ManagedChannelBuilder
String defaultEndpoint = SubscriptionAdminSettings.getDefaultEndpoint();
int colonPos = defaultEndpoint.indexOf(':');

channelBuilder =
builder.channelBuilder.isPresent()
? builder.channelBuilder.get()
: NettyChannelBuilder.forAddress(
defaultEndpoint.substring(0, colonPos),
Integer.parseInt(defaultEndpoint.substring(colonPos+1)))
.maxMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.flowControlWindow(5000000) // 2.5 MB
.negotiationType(NegotiationType.TLS)
.sslContext(GrpcSslContexts.forClient().ciphers(null).build())
.executor(executor);

credentials =
builder.credentials.isPresent()
? builder.credentials.get()
: GoogleCredentials.getApplicationDefault()
.createScoped(SubscriptionAdminSettings.getDefaultServiceScopes());
channelProvider = builder.channelProvider;

numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
channels = new ArrayList<ManagedChannel>(numChannels);
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);
}
Expand Down Expand Up @@ -218,6 +197,29 @@ public ApiService startAsync() {
@Override
protected void doStart() {
logger.log(Level.FINE, "Starting subscriber group.");

try {
for (int i = 0; i < numChannels; i++) {
final ManagedChannel channel =
channelProvider.needsExecutor()
? channelProvider.getChannel(executor)
: channelProvider.getChannel();
channels.add(channel);
if (channelProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() {
channel.shutdown();
}
});
}
}
} catch (IOException e) {
// doesn't matter what we throw, the Service will just catch it and fail to start.
throw new IllegalStateException(e);
}

// Streaming pull is not enabled on the service yet.
// startStreamingConnections();
startPollingConnections();
Expand All @@ -244,13 +246,12 @@ private void startStreamingConnections() {
streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
cachedSubscriptionNameString,
credentials,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
streamAckDeadlineSeconds,
ackLatencyDistribution,
channelBuilder.build(),
channels.get(i),
flowController,
executor,
clock));
Expand Down Expand Up @@ -321,12 +322,11 @@ private void startPollingConnections() {
pollingSubscriberConnections.add(
new PollingSubscriberConnection(
cachedSubscriptionNameString,
credentials,
receiver,
ackExpirationPadding,
maxAckExtensionPeriod,
ackLatencyDistribution,
channelBuilder.build(),
channels.get(i),
flowController,
executor,
clock));
Expand Down Expand Up @@ -433,8 +433,10 @@ public static final class Builder {
FlowControlSettings flowControlSettings = FlowControlSettings.getDefaultInstance();

ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder =
Optional.absent();
ChannelProvider channelProvider =
SubscriptionAdminSettings.defaultChannelProviderBuilder()
.setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
.build();
Optional<ApiClock> clock = Optional.absent();

Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
Expand All @@ -453,15 +455,15 @@ public Builder setCredentials(Credentials credentials) {
}

/**
* ManagedChannelBuilder to use to create Channels.
* {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub
* endpoint.
*
* <p>Must point at Cloud Pub/Sub endpoint.
* <p>For performance, this client benefits from having multiple channels open at once. Users
* are encouraged to provide instances of {@code ChannelProvider} that creates new channels
* instead of returning pre-initialized ones.
*/
public Builder setChannelBuilder(
ManagedChannelBuilder<? extends ManagedChannelBuilder<?>> channelBuilder) {
this.channelBuilder =
Optional.<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>>of(
Preconditions.checkNotNull(channelBuilder));
public Builder setChannelProvider(ChannelProvider channelProvider) {
this.channelProvider = Preconditions.checkNotNull(channelProvider);
return this;
}

Expand Down Expand Up @@ -521,4 +523,3 @@ public Subscriber build() throws IOException {
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ public ManagedChannel getChannel(Executor executor) {

private FakeScheduledExecutorService fakeExecutor;

private FakeCredentials testCredentials;

private FakePublisherServiceImpl testPublisherServiceImpl;

private ServerImpl testServer;
Expand Down
Loading