Skip to content

Commit

Permalink
xds: Add counter and gauge metrics (#11661)
Browse files Browse the repository at this point in the history
Adds the following xDS client metrics defined in [A78](https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient).

Counters
- grpc.xds_client.server_failure
- grpc.xds_client.resource_updates_valid
- grpc.xds_client.resource_updates_invalid

Gauges
- grpc.xds_client.connected
- grpc.xds_client.resources
  • Loading branch information
DNVindhya authored Nov 26, 2024
1 parent 92de2f3 commit 20d09ce
Show file tree
Hide file tree
Showing 23 changed files with 1,381 additions and 184 deletions.
27 changes: 25 additions & 2 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public static final class Args {
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;
@Nullable private final String overrideAuthority;
@Nullable private final MetricRecorder metricRecorder;

private Args(
Integer defaultPort,
Expand All @@ -299,7 +300,8 @@ private Args(
@Nullable ScheduledExecutorService scheduledExecutorService,
@Nullable ChannelLogger channelLogger,
@Nullable Executor executor,
@Nullable String overrideAuthority) {
@Nullable String overrideAuthority,
@Nullable MetricRecorder metricRecorder) {
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
this.syncContext = checkNotNull(syncContext, "syncContext not set");
Expand All @@ -308,6 +310,7 @@ private Args(
this.channelLogger = channelLogger;
this.executor = executor;
this.overrideAuthority = overrideAuthority;
this.metricRecorder = metricRecorder;
}

/**
Expand Down Expand Up @@ -405,6 +408,14 @@ public String getOverrideAuthority() {
return overrideAuthority;
}

/**
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*/
@Nullable
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}


@Override
public String toString() {
Expand All @@ -417,6 +428,7 @@ public String toString() {
.add("channelLogger", channelLogger)
.add("executor", executor)
.add("overrideAuthority", overrideAuthority)
.add("metricRecorder", metricRecorder)
.toString();
}

Expand All @@ -435,6 +447,7 @@ public Builder toBuilder() {
builder.setChannelLogger(channelLogger);
builder.setOffloadExecutor(executor);
builder.setOverrideAuthority(overrideAuthority);
builder.setMetricRecorder(metricRecorder);
return builder;
}

Expand All @@ -461,6 +474,7 @@ public static final class Builder {
private ChannelLogger channelLogger;
private Executor executor;
private String overrideAuthority;
private MetricRecorder metricRecorder;

Builder() {
}
Expand Down Expand Up @@ -547,6 +561,14 @@ public Builder setOverrideAuthority(String authority) {
return this;
}

/**
* See {@link Args#getMetricRecorder()}. This is an optional field.
*/
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
this.metricRecorder = metricRecorder;
return this;
}

/**
* Builds an {@link Args}.
*
Expand All @@ -556,7 +578,8 @@ public Args build() {
return
new Args(
defaultPort, proxyDetector, syncContext, serviceConfigParser,
scheduledExecutorService, channelLogger, executor, overrideAuthority);
scheduledExecutorService, channelLogger, executor, overrideAuthority,
metricRecorder);
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions api/src/test/java/io/grpc/NameResolverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class NameResolverTest {
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
private final Executor executor = Executors.newSingleThreadExecutor();
private final String overrideAuthority = "grpc.io";
private final MetricRecorder metricRecorder = new MetricRecorder() {};
@Mock NameResolver.Listener mockListener;

@Test
Expand All @@ -77,6 +78,7 @@ public void args() {
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);

NameResolver.Args args2 = args.toBuilder().build();
assertThat(args2.getDefaultPort()).isEqualTo(defaultPort);
Expand All @@ -87,6 +89,7 @@ public void args() {
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
assertThat(args2.getOverrideAuthority()).isSameInstanceAs(overrideAuthority);
assertThat(args.getMetricRecorder()).isSameInstanceAs(metricRecorder);

assertThat(args2).isNotSameInstanceAs(args);
assertThat(args2).isNotEqualTo(args);
Expand All @@ -102,6 +105,7 @@ private NameResolver.Args createArgs() {
.setChannelLogger(channelLogger)
.setOffloadExecutor(executor)
.setOverrideAuthority(overrideAuthority)
.setMetricRecorder(metricRecorder)
.build();
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ ClientStream newSubstream(
builder.maxHedgedAttempts,
loadBalancerFactory);
this.authorityOverride = builder.authorityOverride;
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
this.nameResolverArgs =
NameResolver.Args.newBuilder()
.setDefaultPort(builder.getDefaultPort())
Expand All @@ -599,6 +601,7 @@ ClientStream newSubstream(
.setChannelLogger(channelLogger)
.setOffloadExecutor(this.offloadExecutorHolder)
.setOverrideAuthority(this.authorityOverride)
.setMetricRecorder(this.metricRecorder)
.build();
this.nameResolver = getNameResolver(
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
Expand Down Expand Up @@ -671,8 +674,6 @@ public CallTracer create() {
}
serviceConfigUpdated = true;
}
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
}

@VisibleForTesting
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,30 @@ public void metricRecorder_recordsToMetricSink() {
eq(optionalLabelValues));
}

@Test
public void metricRecorder_fromNameResolverArgs_recordsToMetricSink() {
MetricSink mockSink1 = mock(MetricSink.class);
MetricSink mockSink2 = mock(MetricSink.class);
channelBuilder.addMetricSink(mockSink1);
channelBuilder.addMetricSink(mockSink2);
createChannel();

LongCounterMetricInstrument counter = metricInstrumentRegistry.registerLongCounter(
"test_counter", "Time taken by metric recorder", "s",
ImmutableList.of("grpc.method"), Collections.emptyList(), false);
List<String> requiredLabelValues = ImmutableList.of("testMethod");
List<String> optionalLabelValues = Collections.emptyList();

NameResolver.Args args = helper.getNameResolverArgs();
assertThat(args.getMetricRecorder()).isNotNull();
args.getMetricRecorder()
.addLongCounter(counter, 10, requiredLabelValues, optionalLabelValues);
verify(mockSink1).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
verify(mockSink2).addLongCounter(eq(counter), eq(10L), eq(requiredLabelValues),
eq(optionalLabelValues));
}

@Test
public void shutdownWithNoTransportsEverCreated() {
channelBuilder.nameResolverFactory(
Expand Down Expand Up @@ -2240,6 +2264,7 @@ public void lbHelper_getNameResolverArgs() {
assertThat(args.getSynchronizationContext())
.isSameInstanceAs(helper.getSynchronizationContext());
assertThat(args.getServiceConfigParser()).isNotNull();
assertThat(args.getMetricRecorder()).isNotNull();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.grpc.xds;

import io.grpc.Internal;
import io.grpc.MetricRecorder;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.client.XdsClient;
import io.grpc.xds.client.XdsInitializationException;
Expand All @@ -36,6 +37,11 @@ public static void setDefaultProviderBootstrapOverride(Map<String, ?> bootstrap)

public static ObjectPool<XdsClient> getOrCreate(String target)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
return getOrCreate(target, new MetricRecorder() {});
}

public static ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target, metricRecorder);
}
}
28 changes: 20 additions & 8 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.grpc.MetricRecorder;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
Expand Down Expand Up @@ -51,6 +52,8 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private static final boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
System.getenv("GRPC_LOG_XDS_NODE_ID"));
private static final Logger log = Logger.getLogger(XdsClientImpl.class.getName());
private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();

private final Bootstrapper bootstrapper;
private final Object lock = new Object();
Expand Down Expand Up @@ -82,7 +85,8 @@ public ObjectPool<XdsClient> get(String target) {
}

@Override
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
throws XdsInitializationException {
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
if (ref == null) {
synchronized (lock) {
Expand All @@ -98,7 +102,7 @@ public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitialization
if (bootstrapInfo.servers().isEmpty()) {
throw new XdsInitializationException("No xDS server provided");
}
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
targetToXdsClientMap.put(target, ref);
}
}
Expand All @@ -111,31 +115,33 @@ public ImmutableList<String> getTargets() {
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
}


private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
}

@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
new ExponentialBackoffPolicy.Provider();
private final BootstrapInfo bootstrapInfo;
private final String target; // The target associated with the xDS client.
private final MetricRecorder metricRecorder;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
@GuardedBy("lock")
private XdsClient xdsClient;
@GuardedBy("lock")
private int refCount;
@GuardedBy("lock")
private XdsClientMetricReporterImpl metricReporter;

@VisibleForTesting
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
MetricRecorder metricRecorder) {
this.bootstrapInfo = checkNotNull(bootstrapInfo);
this.target = target;
this.metricRecorder = metricRecorder;
}

@Override
Expand All @@ -146,6 +152,7 @@ public XdsClient getObject() {
log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
}
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
xdsClient = new XdsClientImpl(
DEFAULT_XDS_TRANSPORT_FACTORY,
bootstrapInfo,
Expand All @@ -154,7 +161,9 @@ public XdsClient getObject() {
GrpcUtil.STOPWATCH_SUPPLIER,
TimeProvider.SYSTEM_TIME_PROVIDER,
MessagePrinter.INSTANCE,
new TlsContextManagerImpl(bootstrapInfo));
new TlsContextManagerImpl(bootstrapInfo),
metricReporter);
metricReporter.setXdsClient(xdsClient);
}
refCount++;
return xdsClient;
Expand All @@ -168,6 +177,9 @@ public XdsClient returnObject(Object object) {
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
metricReporter.close();
metricReporter = null;
targetToXdsClientMap.remove(target);
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
}
return null;
Expand Down
Loading

0 comments on commit 20d09ce

Please sign in to comment.