Skip to content

Commit

Permalink
Check blockchain watchdogs regularly (#1808)
Browse files Browse the repository at this point in the history
We want to check secondary blockchain sources when we haven't received
blocks in a while.

Fixes #1803
  • Loading branch information
t-bast authored May 17, 2021
1 parent ec276f8 commit 9c3ee59
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import scala.util.Random
/** Monitor secondary blockchain sources to detect when we're being eclipsed. */
object BlockchainWatchdog {

// @formatter:off
case class BlockHeaderAt(blockCount: Long, blockHeader: BlockHeader)
case object NoBlockReceivedTimer

// @formatter:off
sealed trait BlockchainWatchdogEvent
/**
* We are missing too many blocks compared to one of our blockchain watchdogs.
Expand All @@ -50,21 +51,29 @@ object BlockchainWatchdog {
case class LatestHeaders(currentBlockCount: Long, blockHeaders: Set[BlockHeaderAt], source: String) extends Command
private[watchdogs] case class WrappedCurrentBlockCount(currentBlockCount: Long) extends Command
private case class CheckLatestHeaders(currentBlockCount: Long) extends Command
private case class NoBlockReceivedSince(lastBlockCount: Long) extends Command
// @formatter:on

/**
* @param chainHash chain we're interested in.
* @param maxRandomDelay to avoid the herd effect whenever a block is created, we add a random delay before we query
* secondary blockchain sources. This parameter specifies the maximum delay we'll allow.
*/
def apply(chainHash: ByteVector32, maxRandomDelay: FiniteDuration): Behavior[Command] = {
def apply(chainHash: ByteVector32, maxRandomDelay: FiniteDuration, blockTimeout: FiniteDuration = 15 minutes): Behavior[Command] = {
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockCount](cbc => WrappedCurrentBlockCount(cbc.blockCount)))
Behaviors.withTimers { timers =>
// We start a timer to check blockchain watchdogs regularly even when we don't receive any block.
timers.startSingleTimer(NoBlockReceivedTimer, NoBlockReceivedSince(0), blockTimeout)
Behaviors.receiveMessage {
case NoBlockReceivedSince(lastBlockCount) =>
context.self ! CheckLatestHeaders(lastBlockCount)
timers.startSingleTimer(NoBlockReceivedTimer, NoBlockReceivedSince(lastBlockCount), blockTimeout)
Behaviors.same
case WrappedCurrentBlockCount(blockCount) =>
val delay = Random.nextInt(maxRandomDelay.toSeconds.toInt).seconds
timers.startSingleTimer(CheckLatestHeaders(blockCount), delay)
timers.startSingleTimer(NoBlockReceivedTimer, NoBlockReceivedSince(blockCount), blockTimeout)
Behaviors.same
case CheckLatestHeaders(blockCount) =>
val id = UUID.randomUUID()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,29 @@ class BlockchainWatchdogSpec extends ScalaTestWithActorTestKit(ConfigFactory.loa
testKit.stop(watchdog)
}

test("fetch block headers when we don't receive blocks", TestTags.ExternalApi) {
val eventListener = TestProbe[DangerousBlocksSkew]()
system.eventStream ! EventStream.Subscribe(eventListener.ref)
val blockTimeout = 5 seconds
val watchdog = testKit.spawn(BlockchainWatchdog(Block.TestnetGenesisBlock.hash, 1 second, blockTimeout))

watchdog ! WrappedCurrentBlockCount(500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
eventListener.expectNoMessage(100 millis)

// If we don't receive blocks, we check blockchain sources.
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
eventListener.expectNoMessage(100 millis)

// And we keep checking blockchain sources until we receive a block.
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
assert(eventListener.expectMessageType[DangerousBlocksSkew].recentHeaders.currentBlockCount === 500000)
eventListener.expectNoMessage(100 millis)
}

}

0 comments on commit 9c3ee59

Please sign in to comment.