Skip to content

Commit

Permalink
Various improvements and fixes (#1817)
Browse files Browse the repository at this point in the history
* Reduce log level for explorer API errors
* Reduce log level for remote peer invalid open_channel
* Don't send duplicate commands in PostRestartHtlcCleaner: if there
  is already a pending HTLC settlement command in the DB, the post
  restart handler should let the channel replay it instead of sending
  a conflicting command.
* Workaround for lnd bug in reestablish: sometimes lnd sends
  announcement_signatures before sending their channel reestablish.
  This is a minor spec violation, we can simply delay the message and
  handle it later (hopefully once we've received their reestablish).
* Log shared secrets in Sphinx error: Breez sometimes returns errors
  that we fail to parse. Unfortunately we didn't correctly log the shared
  secrets because the variable was shadowed, so we can't investigate
  further for now.
* Fix utxo metric checks: if we're unable to fetch the number of
  unconfirmed parents for a utxo, this shouldn't cause the global utxo
  check to fail. We log a warning and let operations continue to ensure
  the metric is updated.
* Handle ChannelIdAssigned when disconnected: there may be a race
  condition where a peer disconnect in the middle of a channel id assignment.
  In that case, we still want to record the up-to-date mapping.
  • Loading branch information
t-bast authored May 25, 2021
1 parent 98cae45 commit e8c33ba
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,11 @@ private class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client
def getUnconfirmedAncestorCount(utxo: Utxo): Future[(ByteVector32, Long)] = client.rpcClient.invoke("getmempoolentry", utxo.txId).map(json => {
val JInt(ancestorCount) = json \ "ancestorcount"
(utxo.txId, ancestorCount.toLong)
})
}).recover {
case ex: Throwable =>
log.warn(s"could not retrieve unconfirmed ancestor count for txId=${utxo.txId} amount=${utxo.amount}:", ex)
(utxo.txId, 0)
}

def getUnconfirmedAncestorCountMap(utxos: Seq[Utxo]): Future[Map[ByteVector32, Long]] = Future.sequence(utxos.filter(_.confirmations == 0).map(getUnconfirmedAncestorCount)).map(_.toMap)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object ExplorerApi {
Behaviors.stopped

case WrappedFailure(e) =>
context.log.error(s"${explorer.name} failed: ", e)
context.log.warn(s"${explorer.name} failed: ", e)
Metrics.WatchdogError.withTag(Tags.Source, explorer.name).increment()
Behaviors.stopped
}
Expand Down
13 changes: 11 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1443,7 +1443,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
closingType_opt match {
case Some(closingType) =>
log.info(s"channel closed (type=$closingType)")
log.info(s"channel closed (type=${closingType_opt.map(c => EventType.Closed(c).label).getOrElse("UnknownYet")})")
context.system.eventStream.publish(ChannelClosed(self, d.channelId, closingType, d.commitments))
goto(CLOSED) using d1 storing()
case None =>
Expand Down Expand Up @@ -1701,6 +1701,14 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
peer ! Peer.Disconnect(remoteNodeId)
stay

// This handler is a workaround for an issue in lnd similar to the one above: they sometimes send announcement_signatures
// before channel_reestablish, which is a minor spec violation. It doesn't halt the channel, we can simply postpone
// that message.
case Event(remoteAnnSigs: AnnouncementSignatures, _) =>
log.warning("received announcement_signatures before channel_reestablish (known lnd bug): delaying...")
context.system.scheduler.scheduleOnce(5 seconds, self, remoteAnnSigs)
stay

case Event(c: CurrentBlockCount, d: HasCommitments) => handleNewBlock(c, d)

case Event(c: CurrentFeerates, d: HasCommitments) =>
Expand Down Expand Up @@ -2125,11 +2133,12 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
private def handleLocalError(cause: Throwable, d: Data, msg: Option[Any]) = {
cause match {
case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request")
case _ if stateName == WAIT_FOR_OPEN_CHANNEL => log.warning(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
case _ => log.error(s"${cause.getMessage} while processing msg=${msg.getOrElse("n/a").getClass.getSimpleName} in state=$stateName")
}
cause match {
case _: ChannelException => ()
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData ")
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData")
}
val error = Error(d.channelId, cause.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ object Sphinx extends Logging {
require(packet.length == PacketLength, s"invalid error packet length ${packet.length}, must be $PacketLength")

@tailrec
def loop(packet: ByteVector, sharedSecrets: Seq[(ByteVector32, PublicKey)]): DecryptedFailurePacket = sharedSecrets match {
def loop(packet: ByteVector, secrets: Seq[(ByteVector32, PublicKey)]): DecryptedFailurePacket = secrets match {
case Nil => throw new RuntimeException(s"couldn't parse error packet=$packet with sharedSecrets=$sharedSecrets")
case (secret, pubkey) :: tail =>
val packet1 = wrap(packet, secret)
Expand Down
9 changes: 7 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package fr.acinq.eclair.io

import akka.actor.typed
import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated}
import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem, FSM, OneForOneStrategy, PossiblyHarmful, Props, Status, SupervisorStrategy, Terminated, typed}
import akka.event.Logging.MDC
import akka.event.{BusLogging, DiagnosticLoggingAdapter}
import akka.util.Timeout
Expand Down Expand Up @@ -86,6 +85,12 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: EclairWa
stay using d.copy(channels = channels1)
}

// This event is usually handled while we're connected, but if our peer disconnects right when we're emitting this,
// we still want to record the channelId mapping.
case Event(ChannelIdAssigned(channel, _, temporaryChannelId, channelId), d: DisconnectedData) =>
log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId")
stay using d.copy(channels = d.channels + (FinalChannelId(channelId) -> channel))

case Event(_: LightningMessage, _) => stay // we probably just got disconnected and that's the last messages we received
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,13 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial
val nonStandardRelayedOutHtlcs: Map[Origin, Set[(ByteVector32, Long)]] = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getHtlcsRelayedOut(htlcsIn, nodeParams, log) }.flatten.toMap
val relayedOut: Map[Origin, Set[(ByteVector32, Long)]] = getHtlcsRelayedOut(channels, htlcsIn) ++ nonStandardRelayedOutHtlcs

val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin)))
val settledHtlcs: Set[(ByteVector32, Long)] = nodeParams.db.pendingCommands.listSettlementCommands().map { case (channelId, cmd) => (channelId, cmd.id) }.toSet
val notRelayed = htlcsIn.filterNot(htlcIn => {
// If an HTLC has been relayed and then settled downstream, it will not have a matching entry in relayedOut.
// When that happens, there will be an HTLC settlement command in the pendingRelay DB, and we will let the channel
// replay it instead of sending a conflicting command.
relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin)) || settledHtlcs.contains((htlcIn.add.channelId, htlcIn.add.id))
})
cleanupRelayDb(htlcsIn, nodeParams.db.pendingCommands)

log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}")
Expand Down Expand Up @@ -332,6 +338,7 @@ object PostRestartHtlcCleaner {
def groupByOrigin(htlcsOut: Seq[(Origin, ByteVector32, Long)], htlcsIn: Seq[IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]] =
htlcsOut
.groupBy { case (origin, _, _) => origin }
.view
.mapValues(_.map { case (_, channelId, htlcId) => (channelId, htlcId) }.toSet)
// We are only interested in HTLCs that are pending upstream (not fulfilled nor failed yet).
// It may be the case that we have unresolved HTLCs downstream that have been resolved upstream when the downstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
RemoteFailure(cfg.fullRoute(route), e)
case Failure(t) =>
log.warning(s"cannot parse returned error: ${t.getMessage}")
log.warning(s"cannot parse returned error ${fail.reason.toHex} with sharedSecrets=$sharedSecrets: ${t.getMessage}")
UnreadableRemoteFailure(cfg.fullRoute(route))
}
log.warning(s"too many failed attempts, failing the payment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import fr.acinq.eclair.Features.StaticRemoteKey
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.UInt64.Conversions._
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.{CurrentBlockCount, CurrentFeerates}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw, FeeratesPerKw}
import fr.acinq.eclair.blockchain.{CurrentBlockCount, CurrentFeerates}
import fr.acinq.eclair.channel.Channel._
import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx}
import fr.acinq.eclair.channel._
Expand All @@ -39,7 +39,7 @@ import fr.acinq.eclair.router.Announcements
import fr.acinq.eclair.transactions.DirectedHtlc.{incoming, outgoing}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.transactions.Transactions.{DefaultCommitmentFormat, HtlcSuccessTx, weight2fee}
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelUpdate, ClosingSigned, CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc}
import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelUpdate, ClosingSigned, CommitSig, Error, FailureMessageCodecs, PermanentChannelFailure, RevokeAndAck, Shutdown, TemporaryNodeFailure, UpdateAddHtlc, UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFee, UpdateFulfillHtlc}
import org.scalatest.funsuite.FixtureAnyFunSuiteLike
import org.scalatest.{Outcome, Tag}
import scodec.bits._
Expand Down Expand Up @@ -1364,6 +1364,25 @@ class NormalStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(initialState == bob.stateData)
}

test("recv CMD_FAIL_HTLC (htlc pending fulfill)") { f =>
import f._

val sender = TestProbe()
val (r, htlc) = addHtlc(50000000 msat, alice, bob, alice2bob, bob2alice)
crossSign(alice, bob, alice2bob, bob2alice)

// HTLC is fulfilled but alice doesn't send its revocation.
bob ! CMD_FULFILL_HTLC(htlc.id, r)
bob ! CMD_SIGN()
bob2alice.expectMsgType[UpdateFulfillHtlc]
bob2alice.expectMsgType[CommitSig]

// We cannot fail the HTLC, we must wait for the fulfill to be acked.
val c = CMD_FAIL_HTLC(htlc.id, Right(TemporaryNodeFailure), replyTo_opt = Some(sender.ref))
bob ! c
sender.expectMsg(RES_FAILURE(c, UnknownHtlcId(channelId(bob), htlc.id)))
}

test("recv CMD_FAIL_HTLC (acknowledge in case of failure)") { f =>
import f._
val sender = TestProbe()
Expand Down
17 changes: 17 additions & 0 deletions eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,23 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with Paralle
assert(init.fundingAmount === 15000.sat)
assert(init.pushAmount === 100.msat)
}

test("handle final channelId assigned in state DISCONNECTED") { f =>
import f._
val probe = TestProbe()
connect(remoteNodeId, peer, peerConnection, channels = Set(ChannelCodecsSpec.normal))
peer ! ConnectionDown(peerConnection.ref)
probe.send(peer, Peer.GetPeerInfo)
val peerInfo1 = probe.expectMsgType[Peer.PeerInfo]
assert(peerInfo1.state === "DISCONNECTED")
assert(peerInfo1.channels === 1)
peer ! ChannelIdAssigned(probe.ref, remoteNodeId, randomBytes32(), randomBytes32())
probe.send(peer, Peer.GetPeerInfo)
val peerInfo2 = probe.expectMsgType[Peer.PeerInfo]
assert(peerInfo2.state === "DISCONNECTED")
assert(peerInfo2.channels === 2)
}

}

object PeerSpec {
Expand Down
Loading

0 comments on commit e8c33ba

Please sign in to comment.