Skip to content

Commit

Permalink
Rename pending_relay to pending_commands (#1822)
Browse files Browse the repository at this point in the history
Naming was confusing because it led to believe messages were related to
htlcs that have not yet been relayed, whereas those are settlement
messages, meaning that htlcs have relayed and are pending resolution
upstream.

The database has been renamed to a more generic `PendingCommandsDb`
because we may store other types of commands for which we need reliable
delivery.
  • Loading branch information
pm47 authored May 25, 2021
1 parent d437ea1 commit 98cae45
Show file tree
Hide file tree
Showing 22 changed files with 272 additions and 180 deletions.
12 changes: 6 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx, SetChannelI
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.db.pg.PgUtils.PgLock.logger
import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.payment.PaymentSettlingOnChain
Expand Down Expand Up @@ -1875,15 +1875,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

onTransition {
case _ -> CLOSING =>
PendingRelayDb.getPendingFailsAndFulfills(nodeParams.db.pendingRelay, nextStateData.asInstanceOf[HasCommitments].channelId) match {
PendingCommandsDb.getSettlementCommands(nodeParams.db.pendingCommands, nextStateData.asInstanceOf[HasCommitments].channelId) match {
case Nil =>
log.debug("nothing to replay")
case cmds =>
log.info("replaying {} unacked fulfills/fails", cmds.size)
cmds.foreach(self ! _) // they all have commit = false
}
case SYNCING -> (NORMAL | SHUTDOWN) =>
PendingRelayDb.getPendingFailsAndFulfills(nodeParams.db.pendingRelay, nextStateData.asInstanceOf[HasCommitments].channelId) match {
PendingCommandsDb.getSettlementCommands(nodeParams.db.pendingCommands, nextStateData.asInstanceOf[HasCommitments].channelId) match {
case Nil =>
log.debug("nothing to replay")
case cmds =>
Expand Down Expand Up @@ -2109,7 +2109,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
} else {
// There might be pending fulfill commands that we haven't relayed yet.
// Since this involves a DB call, we only want to check it if all the previous checks failed (this is the slow path).
val pendingRelayFulfills = nodeParams.db.pendingRelay.listPendingRelay(d.channelId).collect { case c: CMD_FULFILL_HTLC => c.id }
val pendingRelayFulfills = nodeParams.db.pendingCommands.listSettlementCommands(d.channelId).collect { case c: CMD_FULFILL_HTLC => c.id }
val offendingPendingRelayFulfills = almostTimedOutIncoming.filter(htlc => pendingRelayFulfills.contains(htlc.id))
if (offendingPendingRelayFulfills.nonEmpty) {
handleLocalError(HtlcsWillTimeoutUpstream(d.channelId, offendingPendingRelayFulfills), d, Some(c))
Expand Down Expand Up @@ -2520,13 +2520,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
*/
def acking(channelId: ByteVector32, cmd: HtlcSettlementCommand): FSM.State[fr.acinq.eclair.channel.State, Data] = {
log.debug("scheduling acknowledgement of cmd id={}", cmd.id)
context.system.scheduler.scheduleOnce(10 seconds)(PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, channelId, cmd))(context.system.dispatcher)
context.system.scheduler.scheduleOnce(10 seconds)(PendingCommandsDb.ackSettlementCommand(nodeParams.db.pendingCommands, channelId, cmd))(context.system.dispatcher)
state
}

def acking(updates: List[UpdateMessage]): FSM.State[fr.acinq.eclair.channel.State, Data] = {
log.debug("scheduling acknowledgement of cmds ids={}", updates.collect { case s: HtlcSettlementMessage => s.id }.mkString(","))
context.system.scheduler.scheduleOnce(10 seconds)(PendingRelayDb.ackPendingFailsAndFulfills(nodeParams.db.pendingRelay, updates))(context.system.dispatcher)
context.system.scheduler.scheduleOnce(10 seconds)(PendingCommandsDb.ackSettlementCommands(nodeParams.db.pendingCommands, updates))(context.system.dispatcher)
state
}

Expand Down
10 changes: 5 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait Databases {
def channels: ChannelsDb
def peers: PeersDb
def payments: PaymentsDb
def pendingRelay: PendingRelayDb
def pendingCommands: PendingCommandsDb
//@formatter:on
}

Expand All @@ -59,7 +59,7 @@ object Databases extends Logging {
channels: SqliteChannelsDb,
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
pendingRelay: SqlitePendingRelayDb,
pendingCommands: SqlitePendingCommandsDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
statement => {
Expand All @@ -75,7 +75,7 @@ object Databases extends Logging {
channels = new SqliteChannelsDb(eclairJdbc),
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
pendingRelay = new SqlitePendingRelayDb(eclairJdbc),
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
backupConnection = eclairJdbc
)
}
Expand All @@ -85,7 +85,7 @@ object Databases extends Logging {
channels: PgChannelsDb,
peers: PgPeersDb,
payments: PgPaymentsDb,
pendingRelay: PgPendingRelayDb,
pendingCommands: PgPendingCommandsDb,
dataSource: HikariDataSource,
lock: PgLock) extends Databases with ExclusiveLock {
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
Expand Down Expand Up @@ -119,7 +119,7 @@ object Databases extends Logging {
channels = new PgChannelsDb,
peers = new PgPeersDb,
payments = new PgPaymentsDb,
pendingRelay = new PgPendingRelayDb,
pendingCommands = new PgPendingCommandsDb,
dataSource = ds,
lock = lock)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package fr.acinq.eclair.db

import java.io.Closeable

import akka.actor.{ActorContext, ActorRef}
import akka.actor.ActorRef
import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel._
import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc, UpdateMessage}

import java.io.Closeable

/**
* This database stores CMD_FULFILL_HTLC and CMD_FAIL_HTLC that we have received from downstream
* (either directly via UpdateFulfillHtlc or by extracting the value from the
Expand All @@ -36,48 +36,48 @@ import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc, U
* to handle all corner cases.
*
*/
trait PendingRelayDb extends Closeable {
trait PendingCommandsDb extends Closeable {

def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit
def addSettlementCommand(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit

def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit
def removeSettlementCommand(channelId: ByteVector32, htlcId: Long): Unit

def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand]
def listSettlementCommands(channelId: ByteVector32): Seq[HtlcSettlementCommand]

def listPendingRelay(): Set[(ByteVector32, Long)]
def listSettlementCommands(): Seq[(ByteVector32, HtlcSettlementCommand)]

}

object PendingRelayDb {
object PendingCommandsDb {
/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database because we don't want to lose preimages, or to forget to fail
* incoming htlcs, which would lead to unwanted channel closings.
*/
def safeSend(register: ActorRef, db: PendingRelayDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
def safeSend(register: ActorRef, db: PendingCommandsDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
// htlc settlement commands don't have replyTo
register ! Register.Forward(ActorRef.noSender, channelId, cmd)
// we store the command in a db (note that this happens *after* forwarding the command to the channel, so we don't add latency)
db.addPendingRelay(channelId, cmd)
db.addSettlementCommand(channelId, cmd)
}

def ackCommand(db: PendingRelayDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
db.removePendingRelay(channelId, cmd.id)
def ackSettlementCommand(db: PendingCommandsDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
db.removeSettlementCommand(channelId, cmd.id)
}

def ackPendingFailsAndFulfills(db: PendingRelayDb, updates: List[UpdateMessage])(implicit log: LoggingAdapter): Unit = updates.collect {
def ackSettlementCommands(db: PendingCommandsDb, updates: List[UpdateMessage])(implicit log: LoggingAdapter): Unit = updates.collect {
case u: UpdateFulfillHtlc =>
log.debug(s"fulfill acked for htlcId=${u.id}")
db.removePendingRelay(u.channelId, u.id)
db.removeSettlementCommand(u.channelId, u.id)
case u: UpdateFailHtlc =>
log.debug(s"fail acked for htlcId=${u.id}")
db.removePendingRelay(u.channelId, u.id)
db.removeSettlementCommand(u.channelId, u.id)
case u: UpdateFailMalformedHtlc =>
log.debug(s"fail-malformed acked for htlcId=${u.id}")
db.removePendingRelay(u.channelId, u.id)
db.removeSettlementCommand(u.channelId, u.id)
}

def getPendingFailsAndFulfills(db: PendingRelayDb, channelId: ByteVector32)(implicit log: LoggingAdapter): Seq[HtlcSettlementCommand] = {
db.listPendingRelay(channelId)
def getSettlementCommands(db: PendingCommandsDb, channelId: ByteVector32)(implicit log: LoggingAdapter): Seq[HtlcSettlementCommand] = {
db.listSettlementCommands(channelId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit

override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
using(pg.prepareStatement("DELETE FROM pending_settlement_commands WHERE channel_id=?")) { statement =>
statement.setString(1, channelId.toHex)
statement.executeUpdate()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,48 @@ import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel.{Command, HtlcSettlementCommand}
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.PendingRelayDb
import fr.acinq.eclair.db.PendingCommandsDb
import fr.acinq.eclair.db.pg.PgUtils._
import fr.acinq.eclair.wire.internal.CommandCodecs.cmdCodec
import grizzled.slf4j.Logging

import java.sql.Statement
import javax.sql.DataSource
import scala.collection.immutable.Queue

class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRelayDb {
class PgPendingCommandsDb(implicit ds: DataSource, lock: PgLock) extends PendingCommandsDb with Logging {

import PgUtils.ExtendedResultSet._
import PgUtils._
import lock._

val DB_NAME = "pending_relay"
val CURRENT_VERSION = 1
val CURRENT_VERSION = 2

inTransaction { pg =>
using(pg.createStatement()) { statement =>

def migration12(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE pending_relay RENAME TO pending_settlement_commands")
}

getVersion(statement, DB_NAME) match {
case None =>
// note: should we use a foreign key to local_channels table here?
statement.executeUpdate("CREATE TABLE pending_relay (channel_id TEXT NOT NULL, htlc_id BIGINT NOT NULL, data BYTEA NOT NULL, PRIMARY KEY(channel_id, htlc_id))")
statement.executeUpdate("CREATE TABLE pending_settlement_commands (channel_id TEXT NOT NULL, htlc_id BIGINT NOT NULL, data BYTEA NOT NULL, PRIMARY KEY(channel_id, htlc_id))")
case Some(v@1) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration12(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

override def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add", DbBackends.Postgres) {
override def addSettlementCommand(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("INSERT INTO pending_relay VALUES (?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
using(pg.prepareStatement("INSERT INTO pending_settlement_commands VALUES (?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
statement.setString(1, channelId.toHex)
statement.setLong(2, cmd.id)
statement.setBytes(3, cmdCodec.encode(cmd).require.toByteArray)
Expand All @@ -61,35 +71,35 @@ class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRel
}
}

override def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove", DbBackends.Postgres) {
override def removeSettlementCommand(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=? AND htlc_id=?")) { statement =>
using(pg.prepareStatement("DELETE FROM pending_settlement_commands WHERE channel_id=? AND htlc_id=?")) { statement =>
statement.setString(1, channelId.toHex)
statement.setLong(2, htlcId)
statement.executeUpdate()
}
}
}

override def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel", DbBackends.Postgres) {
override def listSettlementCommands(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT htlc_id, data FROM pending_relay WHERE channel_id=?")) { statement =>
using(pg.prepareStatement("SELECT htlc_id, data FROM pending_settlement_commands WHERE channel_id=?")) { statement =>
statement.setString(1, channelId.toHex)
val rs = statement.executeQuery()
codecSequence(rs, cmdCodec)
}
}
}

override def listPendingRelay(): Set[(ByteVector32, Long)] = withMetrics("pending-relay/list", DbBackends.Postgres) {
override def listSettlementCommands(): Seq[(ByteVector32, HtlcSettlementCommand)] = withMetrics("pending-relay/list", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT channel_id, htlc_id FROM pending_relay")) { statement =>
using(pg.prepareStatement("SELECT channel_id, data FROM pending_settlement_commands")) { statement =>
val rs = statement.executeQuery()
var q: Queue[(ByteVector32, Long)] = Queue()
var q: Queue[(ByteVector32, HtlcSettlementCommand)] = Queue()
while (rs.next()) {
q = q :+ (rs.getByteVector32FromHex("channel_id"), rs.getLong("htlc_id"))
q = q :+ (rs.getByteVector32FromHex("channel_id"), cmdCodec.decode(rs.getByteVector("data").bits).require.value)
}
q.toSet
q
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
}

override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Sqlite) {
using(sqlite.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
using(sqlite.prepareStatement("DELETE FROM pending_settlement_commands WHERE channel_id=?")) { statement =>
statement.setBytes(1, channelId.toArray)
statement.executeUpdate()
}
Expand Down
Loading

0 comments on commit 98cae45

Please sign in to comment.