Skip to content

Commit

Permalink
Use warning messages for connection issues (#1863)
Browse files Browse the repository at this point in the history
lightning/bolts#834 recommends sending
warning messages instead of connection-level errors in some cases, which
avoids unnecessary channel closure.
  • Loading branch information
t-bast authored Jul 6, 2021
1 parent 291c128 commit d9a03a5
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 23 deletions.
11 changes: 6 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1701,18 +1701,19 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// a channel_reestablish when reconnecting a channel that recently got confirmed, and instead send a funding_locked
// first and then go silent. This is due to a race condition on their side, so we trigger a reconnection, hoping that
// we will eventually receive their channel_reestablish.
case Event(_: FundingLocked, _) =>
case Event(_: FundingLocked, d) =>
log.warning("received funding_locked before channel_reestablish (known lnd bug): disconnecting...")
peer ! Peer.Disconnect(remoteNodeId)
stay
// NB: we use a small delay to ensure we've sent our warning before disconnecting.
context.system.scheduler.scheduleOnce(2 second, peer, Peer.Disconnect(remoteNodeId))
stay sending Warning(d.channelId, "spec violation: you sent funding_locked before channel_reestablish")

// This handler is a workaround for an issue in lnd similar to the one above: they sometimes send announcement_signatures
// before channel_reestablish, which is a minor spec violation. It doesn't halt the channel, we can simply postpone
// that message.
case Event(remoteAnnSigs: AnnouncementSignatures, _) =>
case Event(remoteAnnSigs: AnnouncementSignatures, d) =>
log.warning("received announcement_signatures before channel_reestablish (known lnd bug): delaying...")
context.system.scheduler.scheduleOnce(5 seconds, self, remoteAnnSigs)
stay
stay sending Warning(d.channelId, "spec violation: you sent announcement_signatures before channel_reestablish")

case Event(ProcessCurrentBlockCount(c), d: HasCommitments) => handleNewBlock(c, d)

Expand Down
7 changes: 2 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: EclairWa
stay

case Event(warning: Warning, _: ConnectedData) =>
log.warning("peer sent warning: {}", warning.channelId, warning.toAscii)
log.warning("peer sent warning: {}", warning.toAscii)
// NB: we don't forward warnings to the channel actors, they shouldn't take any automatic action.
// It's up to the node operator to decide what to do to address the warning.
stay
Expand Down Expand Up @@ -318,7 +318,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: EclairWa
}

def replyUnknownChannel(peerConnection: ActorRef, unknownChannelId: ByteVector32): Unit = {
val msg = Error(unknownChannelId, UNKNOWN_CHANNEL_MESSAGE)
val msg = Warning(unknownChannelId, "unknown channel")
logMessage(msg, "OUT")
peerConnection ! msg
}
Expand Down Expand Up @@ -361,10 +361,7 @@ class Peer(val nodeParams: NodeParams, remoteNodeId: PublicKey, wallet: EclairWa

object Peer {

// @formatter:off
val CHANNELID_ZERO: ByteVector32 = ByteVector32.Zeroes
val UNKNOWN_CHANNEL_MESSAGE: ByteVector = ByteVector.view("unknown channel".getBytes())
// @formatter:on

trait ChannelFactory {
def spawn(context: ActorContext, remoteNodeId: PublicKey, origin_opt: Option[ActorRef]): ActorRef
Expand Down
17 changes: 10 additions & 7 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair.crypto.Noise.KeyPair
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.io.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.io.Peer.CHANNELID_ZERO
import fr.acinq.eclair.remote.EclairInternalsSerializer.RemoteTypes
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire.protocol
Expand Down Expand Up @@ -195,6 +194,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! Pong(ByteVector.fill(pongLength)(0.toByte))
} else {
log.warning(s"ignoring invalid ping with pongLength=${ping.pongLength}")
d.transport ! Warning(s"invalid pong length (${ping.pongLength})")
}
stay

Expand All @@ -207,8 +207,9 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
log.debug(s"received pong with latency=$latency")
cancelTimer(PingTimeout.toString())
// we don't need to call scheduleNextPing here, the next ping was already scheduled when we received that pong
case None =>
log.debug(s"received unexpected pong with size=${data.length}")
case _ =>
log.debug(s"received unexpected pong with length=${data.length}")
d.transport ! Warning(s"invalid pong with length=${data.length}")
}
stay using d.copy(expectedPong_opt = None)

Expand Down Expand Up @@ -264,6 +265,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! TransportHandler.ReadAck(msg)
if (msg.chainHash != d.chainHash) {
log.warning("received gossip_timestamp_range message for chain {}, we're on {}", msg.chainHash, d.chainHash)
d.transport ! Warning(s"invalid gossip_timestamp_range chain (${msg.chainHash})")
stay
} else {
log.info(s"setting up gossipTimestampFilter=$msg")
Expand Down Expand Up @@ -306,16 +308,16 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
case _ => "unknown"
}
log.error(s"peer sent us a routing message with invalid sig: r=$r bin=$bin")
// for now we just return an error, maybe ban the peer in the future?
// for now we just send a warning, maybe ban the peer in the future?
// TODO: this doesn't actually disconnect the peer, once we introduce peer banning we should actively disconnect
d.transport ! Error(CHANNELID_ZERO, ByteVector.view(s"bad announcement sig! bin=$bin".getBytes()))
d.transport ! Warning(s"invalid announcement sig (bin=$bin)")
d.behavior
case GossipDecision.InvalidAnnouncement(c) =>
// they seem to be sending us fake announcements?
log.error(s"couldn't find funding tx with valid scripts for shortChannelId=${c.shortChannelId}")
// for now we just return an error, maybe ban the peer in the future?
// for now we just send a warning, maybe ban the peer in the future?
// TODO: this doesn't actually disconnect the peer, once we introduce peer banning we should actively disconnect
d.transport ! Error(CHANNELID_ZERO, ByteVector.view(s"couldn't verify channel! shortChannelId=${c.shortChannelId}".getBytes()))
d.transport ! Warning(s"invalid announcement, couldn't verify channel (shortChannelId=${c.shortChannelId})")
d.behavior
case GossipDecision.ChannelClosed(_) =>
if (d.behavior.ignoreNetworkAnnouncement) {
Expand All @@ -325,6 +327,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.behavior.copy(fundingTxAlreadySpentCount = d.behavior.fundingTxAlreadySpentCount + 1)
} else {
log.warning(s"peer sent us too many channel announcements with funding tx already spent (count=${d.behavior.fundingTxAlreadySpentCount + 1}), ignoring network announcements for $IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD")
d.transport ! Warning("too many channel announcements with funding tx already spent, please check your bitcoin node")
setTimer(ResumeAnnouncements.toString, ResumeAnnouncements, IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD, repeat = false)
d.behavior.copy(fundingTxAlreadySpentCount = d.behavior.fundingTxAlreadySpentCount + 1, ignoreNetworkAnnouncement = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
val ping = Ping(Int.MaxValue, randomBytes(127))
transport.send(peerConnection, ping)
transport.expectMsg(TransportHandler.ReadAck(ping))
assert(transport.expectMsgType[Warning].channelId === Peer.CHANNELID_ZERO)
transport.expectNoMsg()
}

Expand Down Expand Up @@ -332,8 +333,13 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
router.send(peerConnection, GossipDecision.ChannelClosed(c))
}
// peer will temporary ignore announcements coming from bob
var warningSent = false
for (ann <- channels ++ updates) {
transport.send(peerConnection, ann)
if (!warningSent) {
transport.expectMsgType[Warning]
warningSent = true
}
transport.expectMsg(TransportHandler.ReadAck(ann))
}
router.expectNoMsg(1 second)
Expand All @@ -354,16 +360,16 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi
// now let's assume that the router isn't happy with those channels because the announcement is invalid
router.send(peerConnection, GossipDecision.InvalidAnnouncement(channels(0)))
// peer will return a connection-wide error, including the hex-encoded representation of the bad message
val error1 = transport.expectMsgType[Error]
assert(error1.channelId === Peer.CHANNELID_ZERO)
assert(new String(error1.data.toArray).startsWith("couldn't verify channel! shortChannelId="))
val warn1 = transport.expectMsgType[Warning]
assert(warn1.channelId === Peer.CHANNELID_ZERO)
assert(new String(warn1.data.toArray).startsWith("invalid announcement, couldn't verify channel"))

// let's assume that one of the sigs were invalid
router.send(peerConnection, GossipDecision.InvalidSignature(channels(0)))
// peer will return a connection-wide error, including the hex-encoded representation of the bad message
val error2 = transport.expectMsgType[Error]
assert(error2.channelId === Peer.CHANNELID_ZERO)
assert(new String(error2.data.toArray).startsWith("bad announcement sig! bin=0100"))
val warn2 = transport.expectMsgType[Warning]
assert(warn2.channelId === Peer.CHANNELID_ZERO)
assert(new String(warn2.data.toArray).startsWith("invalid announcement sig"))
}

}
Expand Down

0 comments on commit d9a03a5

Please sign in to comment.