Skip to content

Commit

Permalink
Rework TxPublisher (#1844)
Browse files Browse the repository at this point in the history
Splt the TxPublisher in many smaller actors with clear responsibilities.
Each tx publishing attempt is its own actor and watches the tx until it
either confirms or becomes evicted, and reports the result to its parent.

The TxPublisher (one per channel) orchestrates publishing attempts and
will in the future decide to RBF txs based on deadline information.
  • Loading branch information
t-bast authored Jun 22, 2021
1 parent afb1b41 commit d43d06f
Show file tree
Hide file tree
Showing 38 changed files with 3,066 additions and 1,271 deletions.
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ eclair {
fulfill-safety-before-timeout-blocks = 24
min-final-expiry-delta-blocks = 30 // Bolt 11 invoice's min_final_cltv_expiry; must be strictly greater than fulfill-safety-before-timeout-blocks
max-block-processing-delay = 30 seconds // we add a random delay before processing blocks, capped at this value, to prevent herd effect
max-tx-publish-retry-delay = 60 seconds // we add a random delay before retrying failed transaction publication

fee-base-msat = 1000
fee-proportional-millionths = 100 // fee charged per transferred satoshi in millionths of a satoshi (100 = 0.01%)
Expand Down
2 changes: 2 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
fulfillSafetyBeforeTimeout: CltvExpiryDelta,
minFinalExpiryDelta: CltvExpiryDelta,
maxBlockProcessingDelay: FiniteDuration,
maxTxPublishRetryDelay: FiniteDuration,
htlcMinimum: MilliSatoshi,
toRemoteDelay: CltvExpiryDelta,
maxToLocalDelay: CltvExpiryDelta,
Expand Down Expand Up @@ -339,6 +340,7 @@ object NodeParams extends Logging {
fulfillSafetyBeforeTimeout = fulfillSafetyBeforeTimeout,
minFinalExpiryDelta = minFinalExpiryDelta,
maxBlockProcessingDelay = FiniteDuration(config.getDuration("max-block-processing-delay").getSeconds, TimeUnit.SECONDS),
maxTxPublishRetryDelay = FiniteDuration(config.getDuration("max-tx-publish-retry-delay").getSeconds, TimeUnit.SECONDS),
htlcMinimum = htlcMinimum,
toRemoteDelay = CltvExpiryDelta(config.getInt("to-remote-delay-blocks")),
maxToLocalDelay = CltvExpiryDelta(config.getInt("max-to-local-delay-blocks")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient.{FundTransactionOptions, FundTransactionResponse, SignTransactionResponse, toSatoshi}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient, JsonRPCError}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BitcoinJsonRPCClient, ExtendedBitcoinClient}
import fr.acinq.eclair.blockchain.fee.{FeeratePerKB, FeeratePerKw}
import fr.acinq.eclair.transactions.Transactions
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -61,7 +61,7 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC
val f = signTransaction(tx)
// if signature fails (e.g. because wallet is encrypted) we need to unlock the utxos
f.recoverWith { case _ =>
unlockOutpoints(tx.txIn.map(_.outPoint))
bitcoinClient.unlockOutpoints(tx.txIn.map(_.outPoint))
.recover { case t: Throwable => // no-op, just add a log in case of failure
logger.warn(s"Cannot unlock failed transaction's UTXOs txid=${tx.txid}", t)
t
Expand Down Expand Up @@ -152,41 +152,12 @@ class BitcoinCoreWallet(rpcClient: BitcoinJsonRPCClient)(implicit ec: ExecutionC
}
}

override def rollback(tx: Transaction): Future[Boolean] = unlockOutpoints(tx.txIn.map(_.outPoint)) // we unlock all utxos used by the tx
override def rollback(tx: Transaction): Future[Boolean] = bitcoinClient.unlockOutpoints(tx.txIn.map(_.outPoint)) // we unlock all utxos used by the tx

override def doubleSpent(tx: Transaction): Future[Boolean] = bitcoinClient.doubleSpent(tx)

/**
* @param outPoints outpoints to unlock
* @return true if all outpoints were successfully unlocked, false otherwise
*/
private def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = {
// we unlock utxos one by one and not as a list as it would fail at the first utxo that is not actually locked and the rest would not be processed
val futures = outPoints
.map(outPoint => Utxo(outPoint.txid, outPoint.index))
.map(utxo => rpcClient
.invoke("lockunspent", true, List(utxo))
.mapTo[JBool]
.transformWith {
case Success(JBool(result)) => Future.successful(result)
case Failure(JsonRPCError(error)) if error.message.contains("expected locked output") =>
Future.successful(true) // we consider that the outpoint was successfully unlocked (since it was not locked to begin with)
case Failure(t) =>
logger.warn(s"Cannot unlock utxo=$utxo", t)
Future.successful(false)
})
val future = Future.sequence(futures)
// return true if all outpoints were unlocked false otherwise
future.map(_.forall(b => b))
}

}

object BitcoinCoreWallet {

// @formatter:off
case class Utxo(txid: ByteVector32, vout: Long)
case class WalletTransaction(address: String, amount: Satoshi, fees: Satoshi, blockHash: ByteVector32, confirmations: Long, txid: ByteVector32, timestamp: Long)
// @formatter:on

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import fr.acinq.eclair.blockchain.Monitoring.Metrics
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog
import fr.acinq.eclair.channel.TxPublisher.PublishTx
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import fr.acinq.eclair.{KamonExt, ShortChannelId}
import org.json4s.JsonAST._
Expand Down Expand Up @@ -149,8 +148,8 @@ object ZmqWatcher {
case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: ByteVector32, minDepth: Long) extends WatchConfirmed[WatchTxConfirmedTriggered]
case class WatchTxConfirmedTriggered(blockHeight: Int, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered

case class WatchParentTxConfirmed(replyTo: ActorRef[WatchParentTxConfirmedTriggered], txId: ByteVector32, minDepth: Long, childTx: PublishTx) extends WatchConfirmed[WatchParentTxConfirmedTriggered]
case class WatchParentTxConfirmedTriggered(blockHeight: Int, txIndex: Int, tx: Transaction, childTx: PublishTx) extends WatchConfirmedTriggered
case class WatchParentTxConfirmed(replyTo: ActorRef[WatchParentTxConfirmedTriggered], txId: ByteVector32, minDepth: Long) extends WatchConfirmed[WatchParentTxConfirmedTriggered]
case class WatchParentTxConfirmedTriggered(blockHeight: Int, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered

// TODO: not implemented yet: notify me if confirmation number gets below minDepth?
case class WatchFundingLost(replyTo: ActorRef[WatchFundingLostTriggered], txId: ByteVector32, minDepth: Long) extends Watch[WatchFundingLostTriggered]
Expand Down Expand Up @@ -378,7 +377,7 @@ private class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client
case w: WatchFundingConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchFundingConfirmedTriggered(height, index, tx))
case w: WatchFundingDeeplyBuried => context.self ! TriggerEvent(w.replyTo, w, WatchFundingDeeplyBuriedTriggered(height, index, tx))
case w: WatchTxConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchTxConfirmedTriggered(height, index, tx))
case w: WatchParentTxConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchParentTxConfirmedTriggered(height, index, tx, w.childTx))
case w: WatchParentTxConfirmed => context.self ! TriggerEvent(w.replyTo, w, WatchParentTxConfirmedTriggered(height, index, tx))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{GetTxWithMetaResponse, Ut
import fr.acinq.eclair.blockchain.fee.{FeeratePerKB, FeeratePerKw}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.wire.protocol.ChannelAnnouncement
import grizzled.slf4j.Logging
import org.json4s.Formats
import org.json4s.JsonAST._
import scodec.bits.ByteVector

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.{Failure, Success, Try}

/**
* Created by PM on 26/04/2016.
Expand All @@ -39,7 +40,7 @@ import scala.util.Try
* Note that all wallet utilities (signing transactions, setting fees, locking outputs, etc) can be found in
* [[fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet]].
*/
class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) extends Logging {

import ExtendedBitcoinClient._

Expand Down Expand Up @@ -151,6 +152,30 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
getRawTransaction(tx.txid).map(_ => tx.txid).recoverWith { case _ => Future.failed(e) }
}

/**
* @param outPoints outpoints to unlock.
* @return true if all outpoints were successfully unlocked, false otherwise.
*/
def unlockOutpoints(outPoints: Seq[OutPoint])(implicit ec: ExecutionContext): Future[Boolean] = {
// we unlock utxos one by one and not as a list as it would fail at the first utxo that is not actually locked and the rest would not be processed
val futures = outPoints
.map(outPoint => Utxo(outPoint.txid, outPoint.index))
.map(utxo => rpcClient
.invoke("lockunspent", true, List(utxo))
.mapTo[JBool]
.transformWith {
case Success(JBool(result)) => Future.successful(result)
case Failure(JsonRPCError(error)) if error.message.contains("expected locked output") =>
Future.successful(true) // we consider that the outpoint was successfully unlocked (since it was not locked to begin with)
case Failure(t) =>
logger.warn(s"cannot unlock utxo=$utxo:", t)
Future.successful(false)
})
val future = Future.sequence(futures)
// return true if all outpoints were unlocked false otherwise
future.map(_.forall(b => b))
}

def isTransactionOutputSpendable(txid: ByteVector32, outputIndex: Int, includeMempool: Boolean)(implicit ec: ExecutionContext): Future[Boolean] =
for {
json <- rpcClient.invoke("gettxout", txid, outputIndex, includeMempool)
Expand Down Expand Up @@ -206,8 +231,9 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {
def getMempool()(implicit ec: ExecutionContext): Future[Seq[Transaction]] =
for {
txids <- rpcClient.invoke("getrawmempool").map(json => json.extract[List[String]].map(ByteVector32.fromValidHex))
txs <- Future.sequence(txids.map(getTransaction(_)))
} yield txs
// NB: if a transaction is evicted before we've called getTransaction, we need to ignore it instead of failing.
txs <- Future.sequence(txids.map(getTransaction(_).map(Some(_)).recover { case _ => None }))
} yield txs.flatten

def getMempoolTx(txid: ByteVector32)(implicit ec: ExecutionContext): Future[MempoolTx] = {
rpcClient.invoke("getmempoolentry", txid).map(json => {
Expand Down Expand Up @@ -296,6 +322,8 @@ object ExtendedBitcoinClient {
*/
case class MempoolTx(txid: ByteVector32, vsize: Long, weight: Long, replaceable: Boolean, fees: Satoshi, ancestorCount: Int, ancestorFees: Satoshi, descendantCount: Int, descendantFees: Satoshi)

case class Utxo(txid: ByteVector32, vout: Long)

def toSatoshi(btcAmount: BigDecimal): Satoshi = Satoshi(btcAmount.bigDecimal.scaleByPowerOfTen(8).longValue)

}
36 changes: 18 additions & 18 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.channel.Helpers.{Closing, Funding}
import fr.acinq.eclair.channel.Monitoring.Metrics.ProcessMessage
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx, SetChannelId, SignAndPublishTx}
import fr.acinq.eclair.channel.publish.TxPublisher
import fr.acinq.eclair.channel.publish.TxPublisher.{PublishRawTx, PublishReplaceableTx, PublishTx, SetChannelId}
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
Expand Down Expand Up @@ -63,7 +64,7 @@ object Channel {

case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], bitcoinClient: ExtendedBitcoinClient) extends TxPublisherFactory {
override def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command] = {
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, watcher, bitcoinClient)).onFailure(typed.SupervisorStrategy.restart), "tx-publisher")
context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient, watcher))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher")
}
}

Expand Down Expand Up @@ -1380,7 +1381,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, penaltyTxs) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(claimTx))
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(claimTx, None))
penaltyTxs.foreach(claimTx => blockchain ! WatchOutputSpent(self, tx.txid, claimTx.input.outPoint.index.toInt, hints = Set(claimTx.tx.txid)))
rev1
}
Expand All @@ -1394,7 +1395,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// If the tx is one of our HTLC txs, we now publish a 3rd-stage claim-htlc-tx that claims its output.
val (localCommitPublished1, claimHtlcTx_opt) = Closing.claimLocalCommitHtlcTxOutput(localCommitPublished, keyManager, d.commitments, tx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
claimHtlcTx_opt.foreach(claimHtlcTx => {
txPublisher ! PublishRawTx(claimHtlcTx)
txPublisher ! PublishRawTx(claimHtlcTx, None)
blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.minDepthBlocks)
})
Closing.updateLocalCommitPublished(localCommitPublished1, tx)
Expand Down Expand Up @@ -2046,7 +2047,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Some(fundingTx) =>
// if we are funder, we never give up
log.info(s"republishing the funding tx...")
txPublisher ! PublishRawTx(fundingTx, "funding-tx")
txPublisher ! PublishRawTx(fundingTx, fundingTx.txIn.head.outPoint, "funding-tx", None)
// we also check if the funding tx has been double-spent
checkDoubleSpent(fundingTx)
context.system.scheduler.scheduleOnce(1 day, blockchain.toClassic, GetTxWithMeta(self, txid))
Expand Down Expand Up @@ -2199,7 +2200,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

private def doPublish(closingTx: ClosingTx): Unit = {
txPublisher ! PublishRawTx(closingTx)
txPublisher ! PublishRawTx(closingTx, None)
blockchain ! WatchTxConfirmed(self, closingTx.tx.txid, nodeParams.minDepthBlocks)
}

Expand Down Expand Up @@ -2229,12 +2230,9 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
* This helper method will publish txs only if they haven't yet reached minDepth
*/
private def publishIfNeeded(txs: Iterable[PublishTx], irrevocablySpent: Map[OutPoint, Transaction]): Unit = {
val (skip, process) = txs.partition(publishTx => Closing.inputsAlreadySpent(publishTx.tx, irrevocablySpent))
process.foreach { publishTx =>
log.info(s"publishing txid=${publishTx.tx.txid}")
txPublisher ! publishTx
}
skip.foreach(publishTx => log.info(s"no need to republish txid=${publishTx.tx.txid}, it has already been confirmed"))
val (skip, process) = txs.partition(publishTx => Closing.inputAlreadySpent(publishTx.input, irrevocablySpent))
process.foreach { publishTx => txPublisher ! publishTx }
skip.foreach(publishTx => log.info("no need to republish tx spending {}:{}, it has already been confirmed", publishTx.input.txid, publishTx.input.index))
}

/**
Expand Down Expand Up @@ -2264,13 +2262,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(localCommitPublished: LocalCommitPublished, commitments: Commitments): Unit = {
import localCommitPublished._

val commitInput = commitments.commitInput.outPoint
val publishQueue = commitments.commitmentFormat match {
case Transactions.DefaultCommitmentFormat =>
List(PublishRawTx(commitTx, "commit-tx")) ++ (claimMainDelayedOutputTx ++ htlcTxs.values.flatten ++ claimHtlcDelayedTxs).map(tx => PublishRawTx(tx))
val redeemableHtlcTxs = htlcTxs.values.flatten.map(tx => PublishRawTx(tx, Some(commitTx.txid)))
List(PublishRawTx(commitTx, commitInput, "commit-tx", None)) ++ (claimMainDelayedOutputTx.map(tx => PublishRawTx(tx, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx, None)))
case Transactions.AnchorOutputsCommitmentFormat =>
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => SignAndPublishTx(tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => SignAndPublishTx(tx, commitments) }
List(PublishRawTx(commitTx, "commit-tx")) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(tx)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx))
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => PublishReplaceableTx(tx, commitments) }
List(PublishRawTx(commitTx, commitInput, "commit-tx", None)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(tx, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx, None))
}
publishIfNeeded(publishQueue, irrevocablySpent)

Expand Down Expand Up @@ -2333,7 +2333,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(remoteCommitPublished: RemoteCommitPublished): Unit = {
import remoteCommitPublished._

val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx(tx))
val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx(tx, None))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down Expand Up @@ -2372,7 +2372,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = {
import revokedCommitPublished._

val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(tx))
val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(tx, None))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import akka.actor.{ActorRef, PossiblyHarmful}
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.TxPublisher.PublishTx
import fr.acinq.eclair.payment.OutgoingPacket.Upstream
import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.CommitmentSpec
Expand Down
Loading

0 comments on commit d43d06f

Please sign in to comment.