Skip to content

Commit

Permalink
Add a random delay before processing blocks (#1825)
Browse files Browse the repository at this point in the history
The goal is to reduce herd effects when there are lots of channels.

Co-authored-by: Bastien Teinturier <[email protected]>
  • Loading branch information
pm47 and t-bast authored May 26, 2021
1 parent 6f6c458 commit 43a89f8
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 12 deletions.
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ eclair {
// expiry-delta-blocks.
fulfill-safety-before-timeout-blocks = 24
min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks
max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect

fee-base-msat = 1000
fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%)
Expand Down
2 changes: 2 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
expiryDelta: CltvExpiryDelta,
fulfillSafetyBeforeTimeout: CltvExpiryDelta,
minFinalExpiryDelta: CltvExpiryDelta,
maxBlockProcessingDelay: FiniteDuration,
htlcMinimum: MilliSatoshi,
toRemoteDelay: CltvExpiryDelta,
maxToLocalDelay: CltvExpiryDelta,
Expand Down Expand Up @@ -337,6 +338,7 @@ object NodeParams extends Logging {
expiryDelta = expiryDelta,
fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout,
minFinalExpiryDelta = minFinalExpiryDelta,
maxBlockProcessingDelay = FiniteDuration(config.getDuration("max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
htlcMinimum = htlcMinimum,
toRemoteDelay = CltvExpiryDelta(config.getInt("to-remote-delay-blocks")),
maxToLocalDelay = CltvExpiryDelta(config.getInt("max-to-local-delay-blocks")),
Expand Down
27 changes: 17 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,19 @@ import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.db.pg.PgUtils.PgLock.logger
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.payment.PaymentSettlingOnChain
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.Transactions.{ClosingTx, TxOwner}
import fr.acinq.eclair.transactions._
import fr.acinq.eclair.wire.protocol._
import org.sqlite.SQLiteException
import scodec.bits.ByteVector

import java.sql.SQLException
import scala.collection.immutable.Queue
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Random, Success, Try}

/**
* Created by PM on 20/08/2015.
Expand Down Expand Up @@ -125,6 +123,9 @@ object Channel {
*/
case class OutgoingMessage(msg: LightningMessage, peerConnection: ActorRef)

/** We don't immediately process [[CurrentBlockCount]] to avoid herd effects */
case class ProcessCurrentBlockCount(c: CurrentBlockCount)

}

class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId: PublicKey, blockchain: typed.ActorRef[ZmqWatcher.Command], relayer: ActorRef, txPublisherFactory: Channel.TxPublisherFactory, origin_opt: Option[ActorRef] = None)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) extends FSM[State, Data] with FSMDiagnosticActorLogging[State, Data] {
Expand All @@ -150,6 +151,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

// this will be used to detect htlc timeouts
context.system.eventStream.subscribe(self, classOf[CurrentBlockCount])
// the constant delay by which we delay processing of blocks (it will be smoothened among all channels)
private val blockProcessingDelay = Random.nextLong(nodeParams.maxBlockProcessingDelay.toMillis + 1).millis
// this will be used to make sure the current commitment fee is up-to-date
context.system.eventStream.subscribe(self, classOf[CurrentFeerates])

Expand Down Expand Up @@ -631,7 +634,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(BITCOIN_FUNDING_PUBLISH_FAILED, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => handleFundingPublishFailed(d)

case Event(c: CurrentBlockCount, d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => d.fundingTx match {
case Event(ProcessCurrentBlockCount(c), d: DATA_WAIT_FOR_FUNDING_CONFIRMED) => d.fundingTx match {
case Some(_) => stay // we are funder, we're still waiting for the funding tx to be confirmed
case None if c.blockCount - d.waitingSinceBlock > FUNDING_TIMEOUT_FUNDEE =>
log.warning(s"funding tx hasn't been published in ${c.blockCount - d.waitingSinceBlock} blocks")
Expand Down Expand Up @@ -943,7 +946,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
}

case Event(c: CurrentBlockCount, d: DATA_NORMAL) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: DATA_NORMAL) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: DATA_NORMAL) => handleCurrentFeerate(c, d)

Expand Down Expand Up @@ -1221,7 +1224,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(r: RevocationTimeout, d: DATA_SHUTDOWN) => handleRevocationTimeout(r, d)

case Event(c: CurrentBlockCount, d: DATA_SHUTDOWN) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: DATA_SHUTDOWN) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: DATA_SHUTDOWN) => handleCurrentFeerate(c, d)

Expand Down Expand Up @@ -1519,7 +1522,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// note: this can only happen if state is NORMAL or SHUTDOWN
// -> in NEGOTIATING there are no more htlcs
// -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway
case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
Expand Down Expand Up @@ -1710,7 +1713,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
context.system.scheduler.scheduleOnce(5 seconds, self, remoteAnnSigs)
stay

case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)
case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
handleOfflineFeerate(c, d)
Expand Down Expand Up @@ -1792,8 +1795,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// we only care about this event in NORMAL and SHUTDOWN state, and there may be cases where the task is not cancelled
case Event(_: RevocationTimeout, _) => stay

// we reschedule with a random delay to prevent herd effect when there are a lot of channels
case Event(c: CurrentBlockCount, _) =>
context.system.scheduler.scheduleOnce(blockProcessingDelay, self, ProcessCurrentBlockCount(c))
stay

// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
case Event(CurrentBlockCount(_), _) => stay
case Event(ProcessCurrentBlockCount(_), _) => stay

// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
case Event(CurrentFeerates(_), _) => stay
Expand Down Expand Up @@ -2568,4 +2576,3 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

}


Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ object TestConstants {
expiryDelta = CltvExpiryDelta(144),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxBlockProcessingDelay = 10 millis,
htlcMinimum = 0 msat,
minDepthBlocks = 3,
toRemoteDelay = CltvExpiryDelta(144),
Expand Down Expand Up @@ -215,6 +216,7 @@ object TestConstants {
expiryDelta = CltvExpiryDelta(144),
fulfillSafetyBeforeTimeout = CltvExpiryDelta(6),
minFinalExpiryDelta = CltvExpiryDelta(18),
maxBlockProcessingDelay = 10 millis,
htlcMinimum = 1000 msat,
minDepthBlocks = 3,
toRemoteDelay = CltvExpiryDelta(144),
Expand Down Expand Up @@ -287,4 +289,4 @@ object TestTags {
// Tests that call an external API (which may start failing independently of our code).
object ExternalApi extends Tag("external-api")

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit
"eclair.bitcoind.wallet" -> defaultWallet,
"eclair.mindepth-blocks" -> 2,
"eclair.max-htlc-value-in-flight-msat" -> 100000000000L,
"eclair.router.broadcast-interval" -> "2 second",
"eclair.max-block-processing-delay" -> "2 seconds",
"eclair.router.broadcast-interval" -> "2 seconds",
"eclair.auto-reconnect" -> false,
"eclair.to-remote-delay-blocks" -> 24,
"eclair.multi-part-payment-expiry" -> "20 seconds").asJava).withFallback(ConfigFactory.load())
Expand Down

0 comments on commit 43a89f8

Please sign in to comment.