Skip to content

Commit

Permalink
fix: simplify remaining deadline metric impl (#2410)
Browse files Browse the repository at this point in the history
* fix: clean up remaining deadline metric

* make naming consistent with gax

* add a comment and skip record 0

* add comment

* update calculation and test

* fix test
  • Loading branch information
mutianf authored Nov 13, 2024
1 parent a11d56f commit 9796d57
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import com.google.cloud.bigtable.data.v2.stub.changestream.GenerateInitialChangeStreamPartitionsUserCallable;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable;
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsTracerFactory;
Expand Down Expand Up @@ -549,9 +548,7 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readRowsSettings().getRetrySettings().getTotalTimeout()));
.withRetrySettings(settings.readRowsSettings().getRetrySettings()));
}

/**
Expand Down Expand Up @@ -588,9 +585,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readRowSettings().getRetrySettings().getTotalTimeout()));
.withRetrySettings(settings.readRowSettings().getRetrySettings()));
} else {
ServerStreamingCallable<ReadRowsRequest, RowT> readRowsCallable =
createReadRowsBaseCallable(
Expand All @@ -612,9 +607,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
readRowCallable,
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readRowSettings().getRetrySettings().getTotalTimeout()),
.withRetrySettings(settings.readRowSettings().getRetrySettings()),
clientContext.getTracerFactory(),
getSpanName("ReadRow"),
/*allowNoResponses=*/ true);
Expand Down Expand Up @@ -733,9 +726,7 @@ private <RowT> UnaryCallable<Query, List<RowT>> createBulkReadRowsCallable(
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.bulkReadRowsSettings().getRetrySettings().getTotalTimeout()));
.withRetrySettings(settings.readRowsSettings().getRetrySettings()));
}

/**
Expand Down Expand Up @@ -805,9 +796,7 @@ public ApiFuture<List<KeyOffset>> futureCall(String s, ApiCallContext apiCallCon
.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.sampleRowKeysSettings().getRetrySettings().getTotalTimeout())));
.withRetrySettings(settings.sampleRowKeysSettings().getRetrySettings())));
}

/**
Expand Down Expand Up @@ -933,9 +922,7 @@ private UnaryCallable<BulkMutation, MutateRowsAttemptResult> createMutateRowsBas
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.bulkMutateRowsSettings().getRetrySettings().getTotalTimeout()));
.withRetrySettings(settings.bulkMutateRowsSettings().getRetrySettings()));
}

/**
Expand Down Expand Up @@ -1143,12 +1130,8 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings
.generateInitialChangeStreamPartitionsSettings()
.getRetrySettings()
.getTotalTimeout()));
.withRetrySettings(
settings.generateInitialChangeStreamPartitionsSettings().getRetrySettings()));
}

/**
Expand Down Expand Up @@ -1223,9 +1206,7 @@ private UnaryCallable<ReadModifyWriteRow, Row> createReadModifyWriteRowCallable(
return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.readChangeStreamSettings().getRetrySettings().getTotalTimeout()));
.withRetrySettings(settings.readChangeStreamSettings().getRetrySettings()));
}

/**
Expand Down Expand Up @@ -1314,9 +1295,7 @@ public Map<String, String> extract(ExecuteQueryRequest executeQueryRequest) {
traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.executeQuerySettings().getRetrySettings().getTotalTimeout())),
.withRetrySettings(settings.executeQuerySettings().getRetrySettings())),
requestContext);
}

Expand Down Expand Up @@ -1396,11 +1375,7 @@ public ApiFuture<RespT> futureCall(ReqT reqT, ApiCallContext apiCallContext) {
getSpanName(methodDescriptor.getBareMethodName()));

return traced.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
callSettings.getRetrySettings().getTotalTimeout()));
clientContext.getDefaultCallContext().withRetrySettings(callSettings.getRetrySettings()));
}

private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnaryCallableNew(
Expand Down Expand Up @@ -1429,11 +1404,7 @@ private <BaseReqT, BaseRespT, ReqT, RespT> UnaryCallable<ReqT, RespT> createUnar

return new BigtableUnaryOperationCallable<>(
transformed,
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
callSettings.getRetrySettings().getTotalTimeout()),
clientContext.getDefaultCallContext().withRetrySettings(callSettings.getRetrySettings()),
clientContext.getTracerFactory(),
getSpanName(methodDescriptor.getBareMethodName()),
/* allowNoResponse= */ false);
Expand Down Expand Up @@ -1470,9 +1441,7 @@ public Map<String, String> extract(PingAndWarmRequest request) {
return pingAndWarm.withDefaultCallContext(
clientContext
.getDefaultCallContext()
.withOption(
BigtableTracer.OPERATION_TIMEOUT_KEY,
settings.pingAndWarmSettings().getRetrySettings().getTotalTimeout()));
.withRetrySettings(settings.pingAndWarmSettings().getRetrySettings()));
}

private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> withRetries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.BaseApiTracer;
import java.time.Duration;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
* A Bigtable specific {@link ApiTracer} that includes additional contexts. This class is a base
Expand All @@ -32,10 +31,6 @@ public class BigtableTracer extends BaseApiTracer {

private volatile int attempt = 0;

@InternalApi("for internal use only")
public static final ApiCallContext.Key<Duration> OPERATION_TIMEOUT_KEY =
ApiCallContext.Key.create("OPERATION_TIMEOUT");

@Override
public void attemptStarted(int attemptNumber) {
this.attempt = attemptNumber;
Expand Down Expand Up @@ -108,10 +103,11 @@ public void grpcMessageSent() {
}

/**
* Record the operation timeout from user settings for calculating remaining deadline. This will
* be called in BuiltinMetricsTracer.
* Record the operation timeout from user settings for calculating remaining deadline. Currently,
* it's called in BuiltinMetricsTracer on attempt start from {@link BigtableTracerUnaryCallable}
* and {@link BigtableTracerStreamingCallable}.
*/
public void setOperationTimeout(Duration operationTimeout) {
public void setTotalTimeoutDuration(Duration totalTimeoutDuration) {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
Expand All @@ -27,7 +26,6 @@
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/**
* This callable will
Expand Down Expand Up @@ -61,13 +59,11 @@ public void call(
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
// tracer should always be an instance of bigtable tracer
if (context.getTracer() instanceof BigtableTracer) {
BigtableTracer tracer = (BigtableTracer) context.getTracer();
BigtableTracerResponseObserver<ResponseT> innerObserver =
new BigtableTracerResponseObserver<>(
responseObserver, (BigtableTracer) context.getTracer(), responseMetadata);
GrpcCallContext callContext = (GrpcCallContext) context;
Duration deadline = callContext.getOption(BigtableTracer.OPERATION_TIMEOUT_KEY);
if (deadline != null) {
((BigtableTracer) context.getTracer()).setOperationTimeout(deadline);
new BigtableTracerResponseObserver<>(responseObserver, tracer, responseMetadata);
if (context.getRetrySettings() != null) {
tracer.setTotalTimeoutDuration(context.getRetrySettings().getTotalTimeoutDuration());
}
innerCallable.call(
request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import javax.annotation.Nonnull;
import org.threeten.bp.Duration;

/**
* This callable will:
Expand Down Expand Up @@ -56,14 +54,13 @@ public BigtableTracerUnaryCallable(@Nonnull UnaryCallable<RequestT, ResponseT> i
public ApiFuture<ResponseT> futureCall(RequestT request, ApiCallContext context) {
// tracer should always be an instance of BigtableTracer
if (context.getTracer() instanceof BigtableTracer) {
BigtableTracer tracer = (BigtableTracer) context.getTracer();
final GrpcResponseMetadata responseMetadata = new GrpcResponseMetadata();
BigtableTracerUnaryCallback<ResponseT> callback =
new BigtableTracerUnaryCallback<ResponseT>(
(BigtableTracer) context.getTracer(), responseMetadata);
GrpcCallContext callContext = (GrpcCallContext) context;
Duration deadline = callContext.getOption(BigtableTracer.OPERATION_TIMEOUT_KEY);
if (deadline != null) {
((BigtableTracer) context.getTracer()).setOperationTimeout(deadline);
if (context.getRetrySettings() != null) {
tracer.setTotalTimeoutDuration(context.getRetrySettings().getTotalTimeoutDuration());
}
ApiFuture<ResponseT> future =
innerCallable.futureCall(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.bigtable.Version;
import com.google.common.base.Stopwatch;
import com.google.common.math.IntMath;
import io.grpc.Deadline;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
Expand All @@ -37,7 +38,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
Expand Down Expand Up @@ -90,8 +90,8 @@ class BuiltinMetricsTracer extends BigtableTracer {
private Long serverLatencies = null;
private final AtomicLong grpcMessageSentDelay = new AtomicLong(0);

private Duration operationTimeout = Duration.ofMillis(0);
private long remainingOperationTimeout = 0;
private Deadline operationDeadline = null;
private volatile long remainingDeadlineAtAttemptStart = 0;

// OpenCensus (and server) histogram buckets use [start, end), however OpenTelemetry uses (start,
// end]. To work around this, we measure all the latencies in nanoseconds and convert them
Expand Down Expand Up @@ -175,6 +175,9 @@ public void attemptStarted(Object request, int attemptNumber) {
this.attempt = attemptNumber;
attemptCount++;
attemptTimer = Stopwatch.createStarted();
if (operationDeadline != null) {
remainingDeadlineAtAttemptStart = operationDeadline.timeRemaining(TimeUnit.MILLISECONDS);
}
if (request != null) {
this.tableId = Util.extractTableId(request);
}
Expand All @@ -185,11 +188,6 @@ public void attemptStarted(Object request, int attemptNumber) {
}
}
}
// OperationTimeout is only set after the first attempt.
if (attemptCount > 1) {
remainingOperationTimeout =
operationTimeout.toMillis() - operationTimer.elapsed(TimeUnit.MILLISECONDS);
}
}

@Override
Expand Down Expand Up @@ -301,12 +299,16 @@ public void grpcMessageSent() {
grpcMessageSentDelay.set(attemptTimer.elapsed(TimeUnit.NANOSECONDS));
}

/*
This is called by BigtableTracerCallables that sets operation timeout from user settings.
*/
@Override
public void setOperationTimeout(Duration operationTimeout) {
this.operationTimeout = operationTimeout;
public void setTotalTimeoutDuration(java.time.Duration totalTimeoutDuration) {
// This method is called by BigtableTracerStreamingCallable and
// BigtableTracerUnaryCallable which is called per attempt. We only set
// the operationDeadline on the first attempt and when totalTimeout is set.
if (operationDeadline == null && !totalTimeoutDuration.isZero()) {
this.operationDeadline =
Deadline.after(totalTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS);
this.remainingDeadlineAtAttemptStart = totalTimeoutDuration.toMillis();
}
}

@Override
Expand Down Expand Up @@ -403,15 +405,10 @@ private void recordAttemptCompletion(@Nullable Throwable status) {
attemptLatenciesHistogram.record(
convertToMs(attemptTimer.elapsed(TimeUnit.NANOSECONDS)), attributes);

if (attemptCount <= 1) {
remainingDeadlineHistogram.record(operationTimeout.toMillis(), attributes);
} else if (remainingOperationTimeout >= 0) {
remainingDeadlineHistogram.record(remainingOperationTimeout, attributes);
} else if (operationTimeout.toMillis() != 0) {
// If the operationTimeout is set but remaining deadline is < 0, log a warning. This should
// never happen.
logger.log(
Level.WARNING, "The remaining deadline was less than 0: " + remainingOperationTimeout);
// When operationDeadline is set, it's possible that the deadline is passed by the time we send
// a new attempt. In this case we'll record 0.
if (operationDeadline != null) {
remainingDeadlineHistogram.record(Math.max(0, remainingDeadlineAtAttemptStart), attributes);
}

if (serverLatencies != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public void grpcMessageSent() {
}

@Override
public void setOperationTimeout(Duration operationTimeout) {
public void setTotalTimeoutDuration(java.time.Duration totalTimeoutDuration) {
for (BigtableTracer tracer : bigtableTracers) {
tracer.setOperationTimeout(operationTimeout);
tracer.setTotalTimeoutDuration(totalTimeoutDuration);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ public class BuiltinMetricsTracerTest {
private static final long APPLICATION_LATENCY = 200;
private static final long SLEEP_VARIABILITY = 15;
private static final String CLIENT_NAME = "java-bigtable/" + Version.VERSION;

private static final long CHANNEL_BLOCKING_LATENCY = 200;

@Rule public final MockitoRule mockitoRule = MockitoJUnit.rule();
Expand Down Expand Up @@ -221,7 +220,7 @@ public void sendHeaders(Metadata headers) {
.readRowsSettings()
.retrySettings()
.setTotalTimeoutDuration(Duration.ofMillis(9000))
.setMaxRpcTimeoutDuration(Duration.ofMillis(6000))
.setMaxRpcTimeoutDuration(Duration.ofMillis(9000))
.setRpcTimeoutMultiplier(1)
.setInitialRpcTimeoutDuration(Duration.ofMillis(6000))
.setInitialRetryDelayDuration(Duration.ofMillis(10))
Expand Down Expand Up @@ -809,7 +808,9 @@ public void testRemainingDeadline() {
.get(0);

double okRemainingDeadline = okHistogramPointData.getSum();
assertThat(okRemainingDeadline).isWithin(200).of(8500);
// first attempt latency + retry delay
double expected = 9000 - SERVER_LATENCY - CHANNEL_BLOCKING_LATENCY - 10;
assertThat(okRemainingDeadline).isIn(Range.closed(expected - 500, expected + 10));
}

private static class FakeService extends BigtableGrpc.BigtableImplBase {
Expand Down

0 comments on commit 9796d57

Please sign in to comment.