Skip to content

Commit

Permalink
Remove kamon tracing (#1662)
Browse files Browse the repository at this point in the history
It's costly, we're not using it, and it's too invasive.
There's no reason to keep it at the moment.
  • Loading branch information
t-bast authored Jan 20, 2021
1 parent 81f15aa commit 54ca292
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 305 deletions.
13 changes: 0 additions & 13 deletions eclair-core/src/main/scala/fr/acinq/eclair/PimpKamon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package fr.acinq.eclair

import fr.acinq.eclair.payment.{LocalFailure, PaymentFailure, RemoteFailure, UnreadableRemoteFailure}
import kamon.metric.Timer
import kamon.trace.Span

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -40,15 +38,4 @@ object KamonExt {
res
}

/**
* A helper function that fails a span with proper messages when dealing with payments
*/
def failSpan(span: Span, failure: PaymentFailure) = {
failure match {
case LocalFailure(_, t) => span.fail("local failure", t)
case RemoteFailure(_, e) => span.fail(s"remote failure: origin=${e.originNode} error=${e.failureMessage}")
case UnreadableRemoteFailure(_) => span.fail("unreadable remote failure")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import fr.acinq.eclair.ShortChannelId.coordinates
import fr.acinq.eclair.TxCoordinates
import fr.acinq.eclair.blockchain.{GetTxWithMetaResponse, UtxoStatus, ValidateResult}
import fr.acinq.eclair.wire.ChannelAnnouncement
import kamon.Kamon
import org.json4s.Formats
import org.json4s.JsonAST._

Expand Down Expand Up @@ -170,31 +169,20 @@ class ExtendedBitcoinClient(val rpcClient: BitcoinJsonRPCClient) {

def validate(c: ChannelAnnouncement)(implicit ec: ExecutionContext): Future[ValidateResult] = {
val TxCoordinates(blockHeight, txIndex, outputIndex) = coordinates(c.shortChannelId)
val span = Kamon.spanBuilder("validate-bitcoin-client").start()
for {
_ <- Future.successful(0)
span0 = Kamon.spanBuilder("getblockhash").start()
blockHash <- rpcClient.invoke("getblockhash", blockHeight).map(_.extractOpt[String].map(ByteVector32.fromValidHex).getOrElse(ByteVector32.Zeroes))
_ = span0.finish()
span1 = Kamon.spanBuilder("getblock").start()
txid: ByteVector32 <- rpcClient.invoke("getblock", blockHash).map(json => Try {
val JArray(txs) = json \ "tx"
ByteVector32.fromValidHex(txs(txIndex).extract[String])
}.getOrElse(ByteVector32.Zeroes))
_ = span1.finish()
span2 = Kamon.spanBuilder("getrawtx").start()
tx <- getRawTransaction(txid)
_ = span2.finish()
span3 = Kamon.spanBuilder("utxospendable-mempool").start()
unspent <- isTransactionOutputSpendable(txid, outputIndex, includeMempool = true)
_ = span3.finish()
fundingTxStatus <- if (unspent) {
Future.successful(UtxoStatus.Unspent)
} else {
// if this returns true, it means that the spending tx is *not* in the blockchain
isTransactionOutputSpendable(txid, outputIndex, includeMempool = false).map(res => UtxoStatus.Spent(spendingTxConfirmed = !res))
}
_ = span.finish()
} yield ValidateResult(c, Right((Transaction.read(tx), fundingTxStatus)))
} recover {
case t: Throwable => ValidateResult(c, Left(t))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import fr.acinq.eclair.router.RouteCalculation
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{CltvExpiry, FSMDiagnosticActorLogging, Logs, MilliSatoshi, MilliSatoshiLong, NodeParams}
import kamon.Kamon
import kamon.context.Context

import java.util.UUID
import java.util.concurrent.TimeUnit
Expand All @@ -57,13 +55,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
val start = System.currentTimeMillis
private var retriedFailedChannels = false

private val span = Kamon.spanBuilder("multi-part-payment")
.tag(Tags.ParentId, cfg.parentId.toString)
.tag(Tags.PaymentHash, paymentHash.toHex)
.tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString())
.tag(Tags.RecipientAmount, cfg.recipientAmount.toLong)
.start()

startWith(WAIT_FOR_PAYMENT_REQUEST, WaitingForRequest)

when(WAIT_FOR_PAYMENT_REQUEST) {
Expand All @@ -83,11 +74,7 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
val (toSend, maxFee) = remainingToSend(nodeParams, d.request, d.pending.values)
if (routes.map(_.amount).sum == toSend) {
val childPayments = routes.map(route => (UUID.randomUUID(), route)).toMap
Kamon.runWithContextEntry(parentPaymentIdKey, cfg.parentId) {
Kamon.runWithSpan(span, finishSpan = true) {
childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) }
}
}
childPayments.foreach { case (childId, route) => spawnChildPaymentFsm(childId) ! createChildPayment(self, route, d.request) }
goto(PAYMENT_IN_PROGRESS) using d.copy(remainingAttempts = (d.remainingAttempts - 1).max(0), pending = d.pending ++ childPayments)
} else {
// If a child payment failed while we were waiting for routes, the routes we received don't cover the whole
Expand Down Expand Up @@ -242,7 +229,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
case Left(paymentFailed) =>
log.warning("multi-part payment failed")
reply(origin, paymentFailed)
span.fail("payment failed")
case Right(paymentSent) =>
log.info("multi-part payment succeeded")
reply(origin, paymentSent)
Expand All @@ -254,7 +240,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
if (retriedFailedChannels) {
Metrics.RetryFailedChannelsResult.withTag(Tags.Success, event.isRight).increment()
}
span.finish()
stop(FSM.Normal)
}

Expand All @@ -278,8 +263,6 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,

object MultiPartPaymentLifecycle {

val parentPaymentIdKey = Context.key[UUID]("parentPaymentId", UUID.fromString("00000000-0000-0000-0000-000000000000"))

def props(nodeParams: NodeParams, cfg: SendPaymentConfig, router: ActorRef, register: ActorRef) = Props(new MultiPartPaymentLifecycle(nodeParams, cfg, router, register))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package fr.acinq.eclair.payment.send

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, FSM, Props, Status}
import akka.event.Logging.MDC
import fr.acinq.bitcoin.ByteVector32
Expand All @@ -36,9 +34,8 @@ import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router._
import fr.acinq.eclair.wire.Onion._
import fr.acinq.eclair.wire._
import kamon.Kamon
import kamon.trace.Span

import java.util.concurrent.TimeUnit
import scala.util.{Failure, Success}

/**
Expand All @@ -52,28 +49,10 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
private val paymentsDb = nodeParams.db.payments
private val start = System.currentTimeMillis

private val span = Kamon.runWithContextEntry(MultiPartPaymentLifecycle.parentPaymentIdKey, cfg.parentId) {
val spanBuilder = if (Kamon.currentSpan().isEmpty) {
Kamon.spanBuilder("single-payment")
} else {
Kamon.spanBuilder("payment-part").asChildOf(Kamon.currentSpan())
}
spanBuilder
.tag(Tags.PaymentId, cfg.id.toString)
.tag(Tags.PaymentHash, paymentHash.toHex)
.tag(Tags.RecipientNodeId, cfg.recipientNodeId.toString())
.tag(Tags.RecipientAmount, cfg.recipientAmount.toLong)
.start()
}

startWith(WAITING_FOR_REQUEST, WaitingForRequest)

when(WAITING_FOR_REQUEST) {
case Event(c: SendPaymentToRoute, WaitingForRequest) =>
span.tag(Tags.TargetNodeId, c.targetNodeId.toString())
span.tag(Tags.Amount, c.finalPayload.amount.toLong)
span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong)
span.tag(Tags.Expiry, c.finalPayload.expiry.toLong)
log.debug("sending {} to route {}", c.finalPayload.amount, c.printRoute())
val send = SendPayment(c.replyTo, c.targetNodeId, c.finalPayload, maxAttempts = 1, assistedRoutes = c.assistedRoutes)
c.route.fold(
Expand All @@ -86,10 +65,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
goto(WAITING_FOR_ROUTE) using WaitingForRoute(send, Nil, Ignore.empty)

case Event(c: SendPayment, WaitingForRequest) =>
span.tag(Tags.TargetNodeId, c.targetNodeId.toString())
span.tag(Tags.Amount, c.finalPayload.amount.toLong)
span.tag(Tags.TotalAmount, c.finalPayload.totalAmount.toLong)
span.tag(Tags.Expiry, c.finalPayload.expiry.toLong)
log.debug("sending {} to {}", c.finalPayload.amount, c.targetNodeId)
router ! RouteRequest(nodeParams.nodeId, c.targetNodeId, c.finalPayload.amount, c.getMaxFee(nodeParams), c.assistedRoutes, routeParams = c.routeParams, paymentContext = Some(cfg.paymentContext))
if (cfg.storeInDb) {
Expand All @@ -109,7 +84,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
log.warning("router error: {}", t.getMessage)
Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(LocalFailure(Nil, t))).increment()
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ LocalFailure(Nil, t)))
myStop()
stop(FSM.Normal)
}

when(WAITING_FOR_PAYMENT_COMPLETE) {
Expand All @@ -122,7 +97,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1)
val p = PartialPayment(id, d.c.finalPayload.amount, d.cmd.amount - d.c.finalPayload.amount, htlc.channelId, Some(cfg.fullRoute(d.route)))
onSuccess(d.c.replyTo, cfg.createPaymentSent(fulfill.paymentPreimage, p :: Nil))
myStop()
stop(FSM.Normal)

case Event(RES_ADD_SETTLED(_, _, fail: HtlcResult.Fail), d: WaitingForComplete) =>
fail match {
Expand All @@ -143,24 +118,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
}

private var stateSpan: Option[Span] = None

onTransition {
case _ -> state2 =>
// whenever there is a transition we stop the current span and start a new one, this way we can track each state
val stateSpanBuilder = Kamon.spanBuilder(state2.toString).asChildOf(span)
nextStateData match {
case d: WaitingForRoute =>
// this means that previous state was WAITING_FOR_COMPLETE
d.failures.lastOption.foreach(failure => stateSpan.foreach(span => KamonExt.failSpan(span, failure)))
case d: WaitingForComplete =>
stateSpanBuilder.tag("route", s"${cfg.fullRoute(d.route).map(_.nextNodeId).mkString("->")}")
case _ => ()
}
stateSpan.foreach(_.finish())
stateSpan = Some(stateSpanBuilder.start())
}

whenUnhandled {
case Event(_: TransportHandler.ReadAck, _) => stay // ignored, router replies with this when we forward a channel_update
}
Expand All @@ -187,7 +144,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
retry(failure, d)
} else {
onFailure(d.c.replyTo, PaymentFailed(id, paymentHash, d.failures :+ LocalFailure(cfg.fullRoute(d.route), t)))
myStop()
stop(FSM.Normal)
}
}

Expand All @@ -205,7 +162,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
// if destination node returns an error, we fail the payment immediately
log.warning(s"received an error message from target nodeId=$nodeId, failing the payment (failure=$failureMessage)")
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ RemoteFailure(cfg.fullRoute(route), e)))
myStop()
stop(FSM.Normal)
case res if failures.size + 1 >= c.maxAttempts =>
// otherwise we never try more than maxAttempts, no matter the kind of error returned
val failure = res match {
Expand All @@ -222,7 +179,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
log.warning(s"too many failed attempts, failing the payment")
onFailure(c.replyTo, PaymentFailed(id, paymentHash, failures :+ failure))
myStop()
stop(FSM.Normal)
case Failure(t) =>
log.warning(s"cannot parse returned error: ${t.getMessage}, route=${route.printNodes()}")
val failure = UnreadableRemoteFailure(cfg.fullRoute(route))
Expand Down Expand Up @@ -300,12 +257,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
}

private def myStop(): State = {
stateSpan.foreach(_.finish())
span.finish()
stop(FSM.Normal)
}

private def onSuccess(replyTo: ActorRef, result: PaymentSent): Unit = {
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
replyTo ! result
Expand All @@ -317,7 +268,6 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}

private def onFailure(replyTo: ActorRef, result: PaymentFailed): Unit = {
span.fail("payment failed")
if (cfg.storeInDb) paymentsDb.updateOutgoingPayment(result)
replyTo ! result
if (cfg.publishEvent) context.system.eventStream.publish(result)
Expand Down
Loading

0 comments on commit 54ca292

Please sign in to comment.