Skip to content
This repository was archived by the owner on May 14, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions gax-java/gax/src/main/java/com/google/api/gax/rpc/Watchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -69,7 +70,7 @@ public final class Watchdog implements Runnable, BackgroundResource {
// Dummy value to convert the ConcurrentHashMap into a Set
private static Object PRESENT = new Object();
private final ConcurrentHashMap<WatchdogStream, Object> openStreams = new ConcurrentHashMap<>();

private final Phaser phaser;
private final ApiClock clock;
private final Duration scheduleInterval;
private final ScheduledExecutorService executor;
Expand All @@ -87,6 +88,8 @@ private Watchdog(ApiClock clock, Duration scheduleInterval, ScheduledExecutorSer
this.clock = Preconditions.checkNotNull(clock, "clock can't be null");
this.scheduleInterval = scheduleInterval;
this.executor = executor;
// Register the main thread
this.phaser = new Phaser(1);
}

private void start() {
Expand Down Expand Up @@ -116,10 +119,15 @@ public <ResponseT> ResponseObserver<ResponseT> watch(

@Override
public void run() {
// Register the current thread
phaser.register();
try {
runUnsafe();
} catch (Throwable t) {
LOG.log(Level.SEVERE, "Caught throwable in periodic Watchdog run. Continuing.", t);
} finally {
// Unregister the current thread
phaser.arriveAndDeregister();
}
}

Expand All @@ -136,7 +144,9 @@ private void runUnsafe() {

@Override
public void shutdown() {
future.cancel(false);
if (future.cancel(false)) {
phaser.arriveAndDeregister();
}
}

@Override
Expand All @@ -151,13 +161,19 @@ public boolean isTerminated() {

@Override
public void shutdownNow() {
future.cancel(true);
if (future.cancel(true)) {
phaser.arriveAndDeregister();
}
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
try {
future.get(duration, unit);
// Default phase is 0, this method wait until all parties arrive and unregister(meaning
// waiting for the current "run" method to complete), then
// terminate the Phaser
phaser.awaitAdvanceInterruptibly(0, duration, unit);
return true;
} catch (ExecutionException | CancellationException e) {
return true;
Expand All @@ -183,6 +199,7 @@ enum State {
}

class WatchdogStream<ResponseT> extends StateCheckingResponseObserver<ResponseT> {

private final Object lock = new Object();

private final Duration waitTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,26 @@ public void testTimedOutBeforeStart() throws InterruptedException {
assertThat(error).isInstanceOf(WatchdogTimeoutException.class);
}

@Test
public void awaitTermination_shouldReturnFalseIfShutDownIsNotCalledFirst() throws Exception {
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
boolean awaitTermination = watchdog.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertThat(awaitTermination).isFalse();
}

@Test
public void awaitTermination_shouldReturnTrue() throws Exception {
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
watchdog.watch(new AccumulatingObserver<>(), waitTime, idleTime);
// Make sure the run() method is run before calling shutdown()
Thread.sleep(2000);
watchdog.shutdown();
boolean awaitTermination = watchdog.awaitTermination(1000, TimeUnit.MILLISECONDS);
assertThat(awaitTermination).isTrue();
assertThat(watchdog.isTerminated()).isTrue();
}

@Test
public void testMultiple() throws Exception {
// Start stream1
Expand Down