Skip to content

Commit

Permalink
feat: Add totalSlotMs to JobStatistics (#3250)
Browse files Browse the repository at this point in the history
* feat: Add totalSlotMs to JobStatistics

* fix: testQuery IT flakiness

* Fix query statistics test
  • Loading branch information
PhongChuong authored Apr 19, 2024
1 parent 0b0b414 commit 75ea095
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/** A Google BigQuery Job statistics. */
public abstract class JobStatistics implements Serializable {

private static final long serialVersionUID = 1433024714741660399L;
private static final long serialVersionUID = 1433024714741660400L;

private final Long creationTime;
private final Long endTime;
Expand All @@ -51,6 +51,7 @@ public abstract class JobStatistics implements Serializable {
private final List<ReservationUsage> reservationUsage;
private final TransactionInfo transactionInfo;
private final SessionInfo sessionInfo;
private final Long totalSlotMs;

/** A Google BigQuery Copy Job statistics. */
public static class CopyStatistics extends JobStatistics {
Expand Down Expand Up @@ -390,7 +391,7 @@ static LoadStatistics fromPb(com.google.api.services.bigquery.model.JobStatistic
/** A Google BigQuery Query Job statistics. */
public static class QueryStatistics extends JobStatistics {

private static final long serialVersionUID = 7539354109226732353L;
private static final long serialVersionUID = 7539354109226732354L;

private final BiEngineStats biEngineStats;
private final Integer billingTier;
Expand All @@ -407,7 +408,6 @@ public static class QueryStatistics extends JobStatistics {
private final Long totalBytesBilled;
private final Long totalBytesProcessed;
private final Long totalPartitionsProcessed;
private final Long totalSlotMs;
private final List<QueryStage> queryPlan;
private final List<TimelineSample> timeline;
private final Schema schema;
Expand Down Expand Up @@ -567,7 +567,6 @@ static final class Builder extends JobStatistics.Builder<QueryStatistics, Builde
private Long totalBytesBilled;
private Long totalBytesProcessed;
private Long totalPartitionsProcessed;
private Long totalSlotMs;
private List<QueryStage> queryPlan;
private List<TimelineSample> timeline;
private Schema schema;
Expand Down Expand Up @@ -599,7 +598,6 @@ private Builder(com.google.api.services.bigquery.model.JobStatistics statisticsP
this.totalBytesBilled = statisticsPb.getQuery().getTotalBytesBilled();
this.totalBytesProcessed = statisticsPb.getQuery().getTotalBytesProcessed();
this.totalPartitionsProcessed = statisticsPb.getQuery().getTotalPartitionsProcessed();
this.totalSlotMs = statisticsPb.getQuery().getTotalSlotMs();
if (statisticsPb.getQuery().getStatementType() != null) {
this.statementType = StatementType.valueOf(statisticsPb.getQuery().getStatementType());
}
Expand Down Expand Up @@ -719,11 +717,6 @@ Builder setTotalPartitionsProcessed(Long totalPartitionsProcessed) {
return self();
}

Builder setTotalSlotMs(Long totalSlotMs) {
this.totalSlotMs = totalSlotMs;
return self();
}

Builder setQueryPlan(List<QueryStage> queryPlan) {
this.queryPlan = queryPlan;
return self();
Expand Down Expand Up @@ -777,7 +770,6 @@ private QueryStatistics(Builder builder) {
this.totalBytesBilled = builder.totalBytesBilled;
this.totalBytesProcessed = builder.totalBytesProcessed;
this.totalPartitionsProcessed = builder.totalPartitionsProcessed;
this.totalSlotMs = builder.totalSlotMs;
this.queryPlan = builder.queryPlan;
this.timeline = builder.timeline;
this.schema = builder.schema;
Expand Down Expand Up @@ -874,11 +866,6 @@ public Long getTotalPartitionsProcessed() {
return totalPartitionsProcessed;
}

/** Returns the slot-milliseconds consumed by the query. */
public Long getTotalSlotMs() {
return totalSlotMs;
}

/**
* Returns the query plan as a list of stages or {@code null} if a query plan is not available.
* Each stage involves a number of steps that read from data sources, perform a series of
Expand Down Expand Up @@ -984,7 +971,6 @@ com.google.api.services.bigquery.model.JobStatistics toPb() {
queryStatisticsPb.setTotalBytesBilled(totalBytesBilled);
queryStatisticsPb.setTotalBytesProcessed(totalBytesProcessed);
queryStatisticsPb.setTotalPartitionsProcessed(totalPartitionsProcessed);
queryStatisticsPb.setTotalSlotMs(totalSlotMs);
if (ddlTargetTable != null) {
queryStatisticsPb.setDdlTargetTable(ddlTargetTable.toPb());
}
Expand Down Expand Up @@ -1589,6 +1575,7 @@ abstract static class Builder<T extends JobStatistics, B extends Builder<T, B>>
private List<ReservationUsage> reservationUsage;
private TransactionInfo transactionInfo;
private SessionInfo sessionInfo;
private Long totalSlotMs;

protected Builder() {}

Expand All @@ -1598,6 +1585,9 @@ protected Builder(com.google.api.services.bigquery.model.JobStatistics statistic
this.startTime = statisticsPb.getStartTime();
this.numChildJobs = statisticsPb.getNumChildJobs();
this.parentJobId = statisticsPb.getParentJobId();
if (statisticsPb.getTotalSlotMs() != null) {
this.totalSlotMs = statisticsPb.getTotalSlotMs();
}
if (statisticsPb.getScriptStatistics() != null) {
this.scriptStatistics = ScriptStatistics.fromPb(statisticsPb.getScriptStatistics());
}
Expand Down Expand Up @@ -1633,6 +1623,11 @@ B setStartTime(Long startTime) {
return self();
}

B setTotalSlotMs(Long totalSlotMs) {
this.totalSlotMs = totalSlotMs;
return self();
}

abstract T build();
}

Expand All @@ -1646,6 +1641,7 @@ protected JobStatistics(Builder builder) {
this.reservationUsage = builder.reservationUsage;
this.transactionInfo = builder.transactionInfo;
this.sessionInfo = builder.sessionInfo;
this.totalSlotMs = builder.totalSlotMs;
}

/** Returns the creation time of the job in milliseconds since epoch. */
Expand Down Expand Up @@ -1699,6 +1695,11 @@ public SessionInfo getSessionInfo() {
return sessionInfo;
}

/** Returns the slot-milliseconds for the job. */
public Long getTotalSlotMs() {
return totalSlotMs;
}

ToStringHelper toStringHelper() {
return MoreObjects.toStringHelper(this)
.add("creationTime", creationTime)
Expand All @@ -1709,7 +1710,8 @@ ToStringHelper toStringHelper() {
.add("scriptStatistics", scriptStatistics)
.add("reservationUsage", reservationUsage)
.add("transactionInfo", transactionInfo)
.add("sessionInfo", sessionInfo);
.add("sessionInfo", sessionInfo)
.add("totalSlotMs", totalSlotMs);
}

@Override
Expand All @@ -1727,7 +1729,8 @@ final int baseHashCode() {
scriptStatistics,
reservationUsage,
transactionInfo,
sessionInfo);
sessionInfo,
totalSlotMs);
}

final boolean baseEquals(JobStatistics jobStatistics) {
Expand All @@ -1742,6 +1745,7 @@ com.google.api.services.bigquery.model.JobStatistics toPb() {
statistics.setStartTime(startTime);
statistics.setNumChildJobs(numChildJobs);
statistics.setParentJobId(parentJobId);
statistics.setTotalSlotMs(totalSlotMs);
if (scriptStatistics != null) {
statistics.setScriptStatistics(scriptStatistics.toPb());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class JobStatisticsTest {
.setStartTime(START_TIME)
.setCopiedRows(COPIED_ROW)
.setCopiedLogicalBytes(COPIED_LOGICAL_BYTES)
.setTotalSlotMs(TOTAL_SLOT_MS)
.build();
private static final ExtractStatistics EXTRACT_STATISTICS =
ExtractStatistics.newBuilder()
Expand All @@ -110,6 +111,7 @@ public class JobStatisticsTest {
.setStartTime(START_TIME)
.setDestinationUriFileCounts(FILE_COUNT)
.setInputBytes(INPUT_BYTES)
.setTotalSlotMs(TOTAL_SLOT_MS)
.build();
private static final LoadStatistics LOAD_STATISTICS =
LoadStatistics.newBuilder()
Expand All @@ -121,6 +123,7 @@ public class JobStatisticsTest {
.setOutputBytes(OUTPUT_BYTES)
.setOutputRows(OUTPUT_ROWS)
.setBadRecords(BAD_RECORDS)
.setTotalSlotMs(TOTAL_SLOT_MS)
.build();
private static final LoadStatistics LOAD_STATISTICS_INCOMPLETE =
LoadStatistics.newBuilder()
Expand All @@ -130,6 +133,7 @@ public class JobStatisticsTest {
.setInputBytes(INPUT_BYTES)
.setInputFiles(INPUT_FILES)
.setBadRecords(BAD_RECORDS)
.setTotalSlotMs(TOTAL_SLOT_MS)
.build();
private static final List<String> SUBSTEPS1 = ImmutableList.of("substep1", "substep2");
private static final List<String> SUBSTEPS2 = ImmutableList.of("substep3", "substep4");
Expand Down Expand Up @@ -272,18 +276,21 @@ public void testBuilder() {
assertEquals(CREATION_TIME, EXTRACT_STATISTICS.getCreationTime());
assertEquals(START_TIME, EXTRACT_STATISTICS.getStartTime());
assertEquals(END_TIME, EXTRACT_STATISTICS.getEndTime());
assertEquals(TOTAL_SLOT_MS, EXTRACT_STATISTICS.getTotalSlotMs());
assertEquals(FILE_COUNT, EXTRACT_STATISTICS.getDestinationUriFileCounts());
assertEquals(INPUT_BYTES, EXTRACT_STATISTICS.getInputBytes());

assertEquals(CREATION_TIME, COPY_STATISTICS.getCreationTime());
assertEquals(START_TIME, COPY_STATISTICS.getStartTime());
assertEquals(END_TIME, COPY_STATISTICS.getEndTime());
assertEquals(TOTAL_SLOT_MS, COPY_STATISTICS.getTotalSlotMs());
assertEquals(COPIED_LOGICAL_BYTES, COPY_STATISTICS.getCopiedLogicalBytes());
assertEquals(COPIED_ROW, COPY_STATISTICS.getCopiedRows());

assertEquals(CREATION_TIME, LOAD_STATISTICS.getCreationTime());
assertEquals(START_TIME, LOAD_STATISTICS.getStartTime());
assertEquals(END_TIME, LOAD_STATISTICS.getEndTime());
assertEquals(TOTAL_SLOT_MS, LOAD_STATISTICS.getTotalSlotMs());
assertEquals(INPUT_BYTES, LOAD_STATISTICS.getInputBytes());
assertEquals(INPUT_FILES, LOAD_STATISTICS.getInputFiles());
assertEquals(OUTPUT_BYTES, LOAD_STATISTICS.getOutputBytes());
Expand All @@ -293,6 +300,7 @@ public void testBuilder() {
assertEquals(CREATION_TIME, QUERY_STATISTICS.getCreationTime());
assertEquals(START_TIME, QUERY_STATISTICS.getStartTime());
assertEquals(END_TIME, QUERY_STATISTICS.getEndTime());
assertEquals(TOTAL_SLOT_MS, QUERY_STATISTICS.getTotalSlotMs());
assertEquals(BI_ENGINE_STATS, QUERY_STATISTICS.getBiEngineStats());
assertEquals(BILLING_TIER, QUERY_STATISTICS.getBillingTier());
assertEquals(CACHE_HIT, QUERY_STATISTICS.getCacheHit());
Expand All @@ -308,7 +316,6 @@ public void testBuilder() {
assertEquals(TOTAL_BYTES_BILLED, QUERY_STATISTICS.getTotalBytesBilled());
assertEquals(TOTAL_BYTES_PROCESSED, QUERY_STATISTICS.getTotalBytesProcessed());
assertEquals(TOTAL_PARTITION_PROCESSED, QUERY_STATISTICS.getTotalPartitionsProcessed());
assertEquals(TOTAL_SLOT_MS, QUERY_STATISTICS.getTotalSlotMs());
assertEquals(QUERY_PLAN, QUERY_STATISTICS.getQueryPlan());
assertEquals(TIMELINE, QUERY_STATISTICS.getTimeline());

Expand Down Expand Up @@ -472,6 +479,7 @@ private void compareStatistics(JobStatistics expected, JobStatistics value) {
assertEquals(expected.getNumChildJobs(), value.getNumChildJobs());
assertEquals(expected.getParentJobId(), value.getParentJobId());
assertEquals(expected.getScriptStatistics(), value.getScriptStatistics());
assertEquals(expected.getTotalSlotMs(), value.getTotalSlotMs());
}

private void compareScriptStatistics(ScriptStatistics expected, ScriptStatistics value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3160,6 +3160,23 @@ public void testQuery() throws InterruptedException {
assertNotNull(statistics.getQueryPlan());
}

@Test
public void testQueryStatistics() throws InterruptedException {
// Use CURRENT_TIMESTAMP to avoid potential caching.
String query = "SELECT CURRENT_TIMESTAMP() AS ts";
QueryJobConfiguration config =
QueryJobConfiguration.newBuilder(query)
.setDefaultDataset(DatasetId.of(DATASET))
.setUseQueryCache(false)
.build();
Job job = bigquery.create(JobInfo.of(JobId.of(), config));
job = job.waitFor();

JobStatistics.QueryStatistics statistics = job.getStatistics();
assertNotNull(statistics.getQueryPlan());
assertThat(statistics.getTotalSlotMs()).isGreaterThan(0L);
}

@Test
public void testExecuteSelectDefaultConnectionSettings() throws SQLException {
// Use the default connection settings
Expand Down Expand Up @@ -4429,6 +4446,7 @@ public void testLoadSessionSupport() throws InterruptedException {

Job loadJob = bigquery.getJob(job.getJobId());
JobStatistics.LoadStatistics statistics = loadJob.getStatistics();
assertThat(statistics.getTotalSlotMs()).isGreaterThan(0L);
String sessionId = statistics.getSessionInfo().getSessionId();
assertNotNull(sessionId);

Expand Down Expand Up @@ -5678,6 +5696,7 @@ public void testExtractJob() throws InterruptedException, TimeoutException {
assertEquals(1L, extractStatistics.getDestinationUriFileCounts().size());
assertEquals(
loadStatistics.getOutputBytes().longValue(), extractStatistics.getInputBytes().longValue());
assertThat(extractStatistics.getTotalSlotMs()).isGreaterThan(0L);

String extractedCsv =
new String(storage.readAllBytes(BUCKET, EXTRACT_FILE), StandardCharsets.UTF_8);
Expand Down

0 comments on commit 75ea095

Please sign in to comment.