Skip to content

Commit

Permalink
Catch all connection failures and reconnect (#1760)
Browse files Browse the repository at this point in the history
The `ReconnectionTask` was only catching
`ConnectionResult.Failure.ConnectionFailed`, which is a subset of
possible failures. It should instead have caught
`ConnectionResult.Failure`.

All authentication and initialization failures were not caught and
didn't trigger reconnections.

Co-authored-by: Bastien Teinturier <[email protected]>
  • Loading branch information
pm47 and t-bast authored Apr 12, 2021
1 parent 48c0c4c commit 357f7f9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class ReconnectionTask(nodeParams: NodeParams, remoteNodeId: PublicKey) extends
startWith(IDLE, IdleData(Nothing))

when(CONNECTING) {
case Event(_: PeerConnection.ConnectionResult.ConnectionFailed, d: ConnectingData) =>
log.info(s"connection failed, next reconnection in ${d.nextReconnectionDelay.toSeconds} seconds")
case Event(failure: PeerConnection.ConnectionResult.Failure, d: ConnectingData) =>
log.info(s"connection failed (reason=$failure), next reconnection in ${d.nextReconnectionDelay.toSeconds} seconds")
setReconnectTimer(d.nextReconnectionDelay)
goto(WAITING) using WaitingData(nextReconnectionDelay(d.nextReconnectionDelay, nodeParams.maxReconnectInterval))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import scala.concurrent.duration._

class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with ParallelTestExecution {

val fakeIPAddress = NodeAddress.fromParts("1.2.3.4", 42000).get
val channels = Map(Peer.FinalChannelId(randomBytes32) -> system.deadLetters)
private val fakeIPAddress = NodeAddress.fromParts("1.2.3.4", 42000).get
private val channels = Map(Peer.FinalChannelId(randomBytes32) -> system.deadLetters)

val PeerNothingData = Peer.Nothing
val PeerDisconnectedData = Peer.DisconnectedData(channels)
val PeerConnectedData = Peer.ConnectedData(fakeIPAddress.socketAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) })
private val PeerNothingData = Peer.Nothing
private val PeerDisconnectedData = Peer.DisconnectedData(channels)
private val PeerConnectedData = Peer.ConnectedData(fakeIPAddress.socketAddress, system.deadLetters, null, null, channels.map { case (k: ChannelId, v) => (k, v) })

case class FixtureParam(nodeParams: NodeParams, remoteNodeId: PublicKey, reconnectionTask: TestFSMRef[ReconnectionTask.State, ReconnectionTask.Data, ReconnectionTask], monitor: TestProbe)

Expand Down Expand Up @@ -168,6 +168,56 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
assert(waitingData3.nextReconnectionDelay === (waitingData0.nextReconnectionDelay * 8))
}

test("all kind of connection failures should be caught by the reconnection task", Tag("auto_reconnect")) { f =>
import f._

val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, connectingData: ReconnectionTask.ConnectingData) = monitor.expectMsgType[TransitionWithData]

val failures = List(
PeerConnection.ConnectionResult.ConnectionFailed(connectingData.to),
PeerConnection.ConnectionResult.NoAddressFound,
PeerConnection.ConnectionResult.InitializationFailed("incompatible features"),
PeerConnection.ConnectionResult.AuthenticationFailed("authentication timeout")
)

failures.foreach { failure =>
// we simulate a connection error
reconnectionTask ! failure
// a new reconnection task will be scheduled
val TransitionWithData(ReconnectionTask.CONNECTING, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
// we send the tick manually so we don't wait
reconnectionTask ! ReconnectionTask.TickReconnect
// this triggers a reconnection
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, _) = monitor.expectMsgType[TransitionWithData]
}
}

test("concurrent incoming/outgoing reconnection", Tag("auto_reconnect")) { f =>
import f._

val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, _: ReconnectionTask.ConnectingData) = monitor.expectMsgType[TransitionWithData]

// at this point, we are attempting to connect to the peer
// let's assume that an incoming connection arrives from the peer right before our outgoing connection, but we haven't
// yet received the peer transition
reconnectionTask ! PeerConnection.ConnectionResult.AlreadyConnected
// we will schedule a reconnection
val TransitionWithData(ReconnectionTask.CONNECTING, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
// but immediately after that we finally get notified that the peer is connected
peer.send(reconnectionTask, Peer.Transition(PeerDisconnectedData, PeerConnectedData))
// we cancel the reconnection and go to idle state
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.IDLE, _, _) = monitor.expectMsgType[TransitionWithData]

}

test("reconnect using the address from node_announcement") { f =>
import f._

Expand Down

0 comments on commit 357f7f9

Please sign in to comment.