Skip to content

Commit

Permalink
Posgres: fix concurrency in channels db (#1762)
Browse files Browse the repository at this point in the history
* preserve pg lock exception cause

* specialize connections by backend type

* added concurrency test on channels table

This test unveils a concurrency issue in the upsert logic of the local
channels db, with the following error being thrown when we update many
channels concurrently:

```
Canceled on identification as a pivot, during conflict out checking
```

* use pg upsert construct

This is the recommended pattern according to postgres doc
(https://www.postgresql.org/docs/current/plpgsql-control-structures.html#PLPGSQL-UPSERT-EXAMPLE):

> It is recommended that applications use INSERT with ON CONFLICT DO
UPDATE rather than actually using this pattern.

* reproduce and fix same issue in peers db
  • Loading branch information
pm47 authored Apr 12, 2021
1 parent 1e2abae commit 6518bb4
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 40 deletions.
20 changes: 10 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Postgres) {
withLock { pg =>
val data = stateDataCodec.encode(state).require.toByteArray
using(pg.prepareStatement("UPDATE local_channels SET data=? WHERE channel_id=?")) { update =>
update.setBytes(1, data)
update.setString(2, state.channelId.toHex)
if (update.executeUpdate() == 0) {
using(pg.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, FALSE)")) { statement =>
statement.setString(1, state.channelId.toHex)
statement.setBytes(2, data)
statement.executeUpdate()
}
}
using(pg.prepareStatement(
"""
| INSERT INTO local_channels (channel_id, data, is_closed)
| VALUES (?, ?, FALSE)
| ON CONFLICT (channel_id)
| DO UPDATE SET data = EXCLUDED.data ;
| """.stripMargin)) { statement =>
statement.setString(1, state.channelId.toHex)
statement.setBytes(2, data)
statement.executeUpdate()
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb {
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeaddress: NodeAddress): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
withLock { pg =>
val data = CommonCodecs.nodeaddress.encode(nodeaddress).require.toByteArray
using(pg.prepareStatement("UPDATE peers SET data=? WHERE node_id=?")) { update =>
update.setBytes(1, data)
update.setString(2, nodeId.value.toHex)
if (update.executeUpdate() == 0) {
using(pg.prepareStatement("INSERT INTO peers VALUES (?, ?)")) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, data)
statement.executeUpdate()
}
}
using(pg.prepareStatement(
"""
| INSERT INTO peers (node_id, data)
| VALUES (?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET data = EXCLUDED.data ;
| """.stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, data)
statement.executeUpdate()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ object PgUtils extends JdbcUtils {
logger.error(s"cannot obtain lock on the database ($other).")
}

case class LockException(lockFailure: LockFailure) extends RuntimeException("a lock exception occurred")
case class LockException(lockFailure: LockFailure) extends RuntimeException("a lock exception occurred", lockFailure match {
case LockFailure.GeneralLockException(cause) => cause // when the origin is an exception, we provide it to have a nice stack trace
case _ => null
})

/**
* This handler is useful in tests
Expand Down
13 changes: 9 additions & 4 deletions eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.db.sqlite.SqliteUtils
import fr.acinq.eclair.db._
import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler
import org.postgresql.jdbc.PgConnection
import org.sqlite.SQLiteConnection

import java.io.File
import java.sql.{Connection, DriverManager, Statement}
Expand Down Expand Up @@ -36,13 +38,16 @@ sealed trait TestDatabases extends Databases {

object TestDatabases {

def sqliteInMemory(): Connection = DriverManager.getConnection("jdbc:sqlite::memory:")
def sqliteInMemory(): SQLiteConnection = DriverManager.getConnection("jdbc:sqlite::memory:").asInstanceOf[SQLiteConnection]

def inMemoryDb(connection: Connection = sqliteInMemory()): Databases = Databases.SqliteDatabases(connection, connection, connection)
def inMemoryDb(): Databases = {
val connection = sqliteInMemory()
Databases.SqliteDatabases(connection, connection, connection)
}

case class TestSqliteDatabases() extends TestDatabases {
// @formatter:off
override val connection: Connection = sqliteInMemory()
override val connection: SQLiteConnection = sqliteInMemory()
override lazy val db: Databases = Databases.SqliteDatabases(connection, connection, connection)
override def getVersion(statement: Statement, db_name: String, currentVersion: Int): Int = SqliteUtils.getVersion(statement, db_name, currentVersion)
override def close(): Unit = ()
Expand All @@ -62,7 +67,7 @@ object TestDatabases {
implicit val system: ActorSystem = ActorSystem()

// @formatter:off
override val connection: Connection = pg.getPostgresDatabase.getConnection
override val connection: PgConnection = pg.getPostgresDatabase.getConnection.asInstanceOf[PgConnection]
override lazy val db: Databases = Databases.PostgresDatabases(hikariConfig, UUID.randomUUID(), lock, jdbcUrlFile_opt = Some(jdbcUrlFile))
override def getVersion(statement: Statement, db_name: String, currentVersion: Int): Int = PgUtils.getVersion(statement, db_name, currentVersion)
override def close(): Unit = pg.close()
Expand Down
54 changes: 39 additions & 15 deletions eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ import fr.acinq.eclair.db.sqlite.SqliteChannelsDb
import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec
import fr.acinq.eclair.{CltvExpiry, randomBytes32}
import fr.acinq.eclair.{CltvExpiry, ShortChannelId, randomBytes32}
import org.scalatest.funsuite.AnyFunSuite
import scodec.bits.ByteVector

import java.sql.SQLException
import java.util.concurrent.Executors
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.concurrent.duration._

class ChannelsDbSpec extends AnyFunSuite {

Expand All @@ -52,30 +55,51 @@ class ChannelsDbSpec extends AnyFunSuite {
val db = dbs.channels
dbs.pendingRelay // needed by db.removeChannel

val channel = ChannelCodecsSpec.normal
val channel1 = ChannelCodecsSpec.normal
val channel2a = ChannelCodecsSpec.normal.modify(_.commitments.channelId).setTo(randomBytes32)
val channel2b = channel2a.modify(_.shortChannelId).setTo(ShortChannelId(189371))

val commitNumber = 42
val paymentHash1 = ByteVector32.Zeroes
val cltvExpiry1 = CltvExpiry(123)
val paymentHash2 = ByteVector32(ByteVector.fill(32)(1))
val cltvExpiry2 = CltvExpiry(656)

intercept[SQLException](db.addHtlcInfo(channel.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel
intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel

assert(db.listLocalChannels().toSet === Set.empty)
db.addOrUpdateChannel(channel)
db.addOrUpdateChannel(channel)
assert(db.listLocalChannels() === List(channel))

assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil)
db.addHtlcInfo(channel.channelId, commitNumber, paymentHash1, cltvExpiry1)
db.addHtlcInfo(channel.channelId, commitNumber, paymentHash2, cltvExpiry2)
assert(db.listHtlcInfos(channel.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2)))
assert(db.listHtlcInfos(channel.channelId, 43).toList == Nil)

db.removeChannel(channel.channelId)
db.addOrUpdateChannel(channel1)
db.addOrUpdateChannel(channel1)
assert(db.listLocalChannels() === List(channel1))
db.addOrUpdateChannel(channel2a)
assert(db.listLocalChannels() === List(channel1, channel2a))
db.addOrUpdateChannel(channel2b)
assert(db.listLocalChannels() === List(channel1, channel2b))

assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil)
db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)
db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash2, cltvExpiry2)
assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2)))
assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil)

db.removeChannel(channel1.channelId)
assert(db.listLocalChannels() === List(channel2b))
assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil)
db.removeChannel(channel2b.channelId)
assert(db.listLocalChannels() === Nil)
assert(db.listHtlcInfos(channel.channelId, commitNumber).toList == Nil)
}
}

test("concurrent channel updates") {
forAllDbs { dbs =>
val db = dbs.channels
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
val channel = ChannelCodecsSpec.normal
val futures = for (_ <- 0 until 10000) yield {
Future(db.addOrUpdateChannel(channel.modify(_.commitments.channelId).setTo(randomBytes32)))
}
val res = Future.sequence(futures)
Await.result(res, 60 seconds)
}
}

Expand Down
17 changes: 17 additions & 0 deletions eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import fr.acinq.eclair.randomKey
import fr.acinq.eclair.wire.protocol.{NodeAddress, Tor2, Tor3}
import org.scalatest.funsuite.AnyFunSuite

import java.util.concurrent.Executors
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
import scala.util.Success

class PeersDbSpec extends AnyFunSuite {

Expand Down Expand Up @@ -68,4 +72,17 @@ class PeersDbSpec extends AnyFunSuite {
}
}

test("concurrent peer updates") {
forAllDbs { dbs =>
val db = dbs.peers
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
val Success(peerAddress) = NodeAddress.fromParts("127.0.0.1", 42000)
val futures = for (_ <- 0 until 10000) yield {
Future(db.addOrUpdatePeer(randomKey.publicKey, peerAddress))
}
val res = Future.sequence(futures)
Await.result(res, 60 seconds)
}
}

}

0 comments on commit 6518bb4

Please sign in to comment.