Skip to content

Commit

Permalink
Refactor Postgres code (#1743)
Browse files Browse the repository at this point in the history
More symmetry between postgres and sqlite init.

* define SqliteDatabases and PostgresDatabase

Those classes implement traits `Databases`, `FileBackup` and
`ExclusiveLock`.

The goal is to have access to backend-specific attributes, particularly
in tests. It arguably makes the `Databases` cleaner and simpler, with a
nice symmetry between the `apply methods`.

* replace 5s lock timeout by NOLOCK

* use chaindir instead of datadir for jdbcurl file

It is more consistent with sqlite, and makes sense because we don't want
to mix up testnet and mainnet databases.

* add tests on locks and jdbc url check
  • Loading branch information
pm47 authored Mar 31, 2021
1 parent 75cb777 commit 936f36b
Show file tree
Hide file tree
Showing 22 changed files with 649 additions and 505 deletions.
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class Setup(datadir: File,

logger.info(s"instanceid=$instanceId")

val databases = Databases.init(config.getConfig("db"), instanceId, datadir, chaindir, db)
val databases = Databases.init(config.getConfig("db"), instanceId, chaindir, db)

/**
* This counter holds the current blockchain height.
Expand Down
300 changes: 141 additions & 159 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,89 +16,146 @@

package fr.acinq.eclair.db

import java.io.File
import java.nio.file._
import java.sql.{Connection, DriverManager}
import java.util.UUID

import akka.actor.ActorSystem
import com.typesafe.config.Config
import fr.acinq.eclair.db.pg.PgUtils.LockType.LockType
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler
import fr.acinq.eclair.db.pg.PgUtils._
import fr.acinq.eclair.db.pg._
import fr.acinq.eclair.db.sqlite._
import grizzled.slf4j.Logging
import javax.sql.DataSource

import java.io.File
import java.nio.file._
import java.sql.{Connection, DriverManager}
import java.util.UUID
import scala.concurrent.duration._

trait Databases {
//@formatter:off
def network: NetworkDb
def audit: AuditDb
def channels: ChannelsDb
def peers: PeersDb
def payments: PaymentsDb
def pendingRelay: PendingRelayDb
//@formatter:on
}

val network: NetworkDb
object Databases extends Logging {

val audit: AuditDb
trait FileBackup {
this: Databases =>
def backup(backupFile: File): Unit
}

val channels: ChannelsDb
trait ExclusiveLock {
this: Databases =>
def obtainExclusiveLock(): Unit
}

val peers: PeersDb
case class SqliteDatabases private (network: SqliteNetworkDb,
audit: SqliteAuditDb,
channels: SqliteChannelsDb,
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
pendingRelay: SqlitePendingRelayDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
statement => {
statement.executeUpdate(s"backup to ${backupFile.getAbsolutePath}")
}
}
}

val payments: PaymentsDb
object SqliteDatabases {
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = SqliteDatabases(
network = new SqliteNetworkDb(networkJdbc),
audit = new SqliteAuditDb(auditJdbc),
channels = new SqliteChannelsDb(eclairJdbc),
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
pendingRelay = new SqlitePendingRelayDb(eclairJdbc),
backupConnection = eclairJdbc
)
}

val pendingRelay: PendingRelayDb
}
case class PostgresDatabases private (network: PgNetworkDb,
audit: PgAuditDb,
channels: PgChannelsDb,
peers: PgPeersDb,
payments: PgPaymentsDb,
pendingRelay: PgPendingRelayDb,
dataSource: HikariDataSource,
lock: PgLock) extends Databases with ExclusiveLock {
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
}

object Databases extends Logging {
object PostgresDatabases {
def apply(hikariConfig: HikariConfig,
instanceId: UUID,
lock: PgLock = PgLock.NoLock,
jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = {

jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))

implicit val ds: HikariDataSource = new HikariDataSource(hikariConfig)
implicit val implicitLock: PgLock = lock

val databases = PostgresDatabases(
network = new PgNetworkDb,
audit = new PgAuditDb,
channels = new PgChannelsDb,
peers = new PgPeersDb,
payments = new PgPaymentsDb,
pendingRelay = new PgPendingRelayDb,
dataSource = ds,
lock = lock)

lock match {
case PgLock.NoLock => ()
case l: PgLock.LeaseLock =>
// we obtain a lock right now...
databases.obtainExclusiveLock()
// ...and renew the lease regularly
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock())
}

trait FileBackup { this: Databases =>
def backup(backupFile: File): Unit
}
databases
}

trait ExclusiveLock { this: Databases =>
def obtainExclusiveLock(): Unit
private def checkIfDatabaseUrlIsUnchanged(url: String, urlFile: File): Unit = {
def readString(path: Path): String = Files.readAllLines(path).get(0)

def writeString(path: Path, string: String): Unit = Files.write(path, java.util.Arrays.asList(string))

if (urlFile.exists()) {
val oldUrl = readString(urlFile.toPath)
if (oldUrl != url)
throw JdbcUrlChanged(oldUrl, url)
} else {
writeString(urlFile.toPath, url)
}
}
}

def init(dbConfig: Config, instanceId: UUID, datadir: File, chaindir: File, db: Option[Databases] = None)(implicit system: ActorSystem): Databases = {
def init(dbConfig: Config, instanceId: UUID, chaindir: File, db: Option[Databases] = None)(implicit system: ActorSystem): Databases = {
db match {
case Some(d) => d
case None =>
dbConfig.getString("driver") match {
case "sqlite" => Databases.sqliteJDBC(chaindir)
case "postgres" =>
val pg = Databases.setupPgDatabases(dbConfig, instanceId, datadir, { ex =>
logger.error("fatal error: Cannot obtain lock on the database.\n", ex)
sys.exit(-2)
})
if (LockType(dbConfig.getString("postgres.lock-type")) == LockType.LEASE) {
val dbLockLeaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
val dbLockLeaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
if (dbLockLeaseInterval <= dbLockLeaseRenewInterval)
throw new RuntimeException("Invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
import system.dispatcher
system.scheduler.scheduleWithFixedDelay(dbLockLeaseRenewInterval, dbLockLeaseRenewInterval)(new Runnable {
override def run(): Unit = {
try {
pg.obtainExclusiveLock()
} catch {
case e: Throwable =>
logger.error("fatal error: Cannot obtain the database lease.\n", e)
sys.exit(-1)
}
}
})
}
pg
case driver => throw new RuntimeException(s"Unknown database driver `$driver`")
case "sqlite" => Databases.sqlite(chaindir)
case "postgres" => Databases.postgres(dbConfig, instanceId, chaindir)
case driver => throw new RuntimeException(s"unknown database driver `$driver`")
}
}
}

/**
* Given a parent folder it creates or loads all the databases from a JDBC connection
*
* @param dbdir
* @return
*/
def sqliteJDBC(dbdir: File): Databases = {
* Given a parent folder it creates or loads all the databases from a JDBC connection
*/
def sqlite(dbdir: File): Databases = {
dbdir.mkdir()
var sqliteEclair: Connection = null
var sqliteNetwork: Connection = null
Expand All @@ -109,127 +166,52 @@ object Databases extends Logging {
sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "audit.sqlite")}")
SqliteUtils.obtainExclusiveLock(sqliteEclair) // there should only be one process writing to this file
logger.info("successful lock on eclair.sqlite")
sqliteDatabaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair)
SqliteDatabases(sqliteAudit, sqliteNetwork, sqliteEclair)
} catch {
case t: Throwable => {
case t: Throwable =>
logger.error("could not create connection to sqlite databases: ", t)
if (sqliteEclair != null) sqliteEclair.close()
if (sqliteNetwork != null) sqliteNetwork.close()
if (sqliteAudit != null) sqliteAudit.close()
throw t
}
}
}

def postgresJDBC(database: String, host: String, port: Int,
username: Option[String], password: Option[String],
poolProperties: Map[String, Long],
instanceId: UUID,
databaseLeaseInterval: FiniteDuration,
lockExceptionHandler: LockExceptionHandler = { _ => () },
lockType: LockType = LockType.NONE, datadir: File): Databases with ExclusiveLock = {
val url = s"jdbc:postgresql://${host}:${port}/${database}"

checkIfDatabaseUrlIsUnchanged(url, datadir)

implicit val lock: DatabaseLock = lockType match {
case LockType.NONE => NoLock
case LockType.LEASE => LeaseLock(instanceId, databaseLeaseInterval, lockExceptionHandler)
case _ => throw new RuntimeException(s"Unknown postgres lock type: `$lockType`")
}

import com.zaxxer.hikari.{HikariConfig, HikariDataSource}

val config = new HikariConfig()
config.setJdbcUrl(url)
username.foreach(config.setUsername)
password.foreach(config.setPassword)
poolProperties.get("max-size").foreach(x => config.setMaximumPoolSize(x.toInt))
poolProperties.get("connection-timeout").foreach(config.setConnectionTimeout)
poolProperties.get("idle-timeout").foreach(config.setIdleTimeout)
poolProperties.get("max-life-time").foreach(config.setMaxLifetime)

implicit val ds: DataSource = new HikariDataSource(config)

val databases = new Databases with ExclusiveLock {
override val network = new PgNetworkDb
override val audit = new PgAuditDb
override val channels = new PgChannelsDb
override val peers = new PgPeersDb
override val payments = new PgPaymentsDb
override val pendingRelay = new PgPendingRelayDb
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock
}
databases.obtainExclusiveLock()
databases
}

def sqliteDatabaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = new Databases with FileBackup {
override val network = new SqliteNetworkDb(networkJdbc)
override val audit = new SqliteAuditDb(auditJdbc)
override val channels = new SqliteChannelsDb(eclairJdbc)
override val peers = new SqlitePeersDb(eclairJdbc)
override val payments = new SqlitePaymentsDb(eclairJdbc)
override val pendingRelay = new SqlitePendingRelayDb(eclairJdbc)
override def backup(backupFile: File): Unit = {

SqliteUtils.using(eclairJdbc.createStatement()) {
statement => {
statement.executeUpdate(s"backup to ${backupFile.getAbsolutePath}")
}
}

}
}

def setupPgDatabases(dbConfig: Config, instanceId: UUID, datadir: File, lockExceptionHandler: LockExceptionHandler): Databases with ExclusiveLock = {
def postgres(dbConfig: Config, instanceId: UUID, dbdir: File, lockExceptionHandler: LockFailureHandler = LockFailureHandler.logAndStop)(implicit system: ActorSystem): PostgresDatabases = {
val database = dbConfig.getString("postgres.database")
val host = dbConfig.getString("postgres.host")
val port = dbConfig.getInt("postgres.port")
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty)
None
else
Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty)
None
else
Some(dbConfig.getString("postgres.password"))
val properties = {
val poolConfig = dbConfig.getConfig("postgres.pool")
Map.empty
.updated("max-size", poolConfig.getInt("max-size").toLong)
.updated("connection-timeout", poolConfig.getDuration("connection-timeout").toMillis)
.updated("idle-timeout", poolConfig.getDuration("idle-timeout").toMillis)
.updated("max-life-time", poolConfig.getDuration("max-life-time").toMillis)

val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))

val hikariConfig = new HikariConfig()
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
username.foreach(hikariConfig.setUsername)
password.foreach(hikariConfig.setPassword)
val poolConfig = dbConfig.getConfig("postgres.pool")
hikariConfig.setMaximumPoolSize(poolConfig.getInt("max-size"))
hikariConfig.setConnectionTimeout(poolConfig.getDuration("connection-timeout").toMillis)
hikariConfig.setIdleTimeout(poolConfig.getDuration("idle-timeout").toMillis)
hikariConfig.setMaxLifetime(poolConfig.getDuration("max-life-time").toMillis)

val lock = dbConfig.getString("postgres.lock-type") match {
case "none" => PgLock.NoLock
case "lease" =>
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler)
case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`")
}
val lockType = LockType(dbConfig.getString("postgres.lock-type"))
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds

Databases.postgresJDBC(
database = database, host = host, port = port,
username = username, password = password,
poolProperties = properties,
val jdbcUrlFile = new File(dbdir, "last_jdbcurl")

Databases.PostgresDatabases(
hikariConfig = hikariConfig,
instanceId = instanceId,
databaseLeaseInterval = leaseInterval,
lockExceptionHandler = lockExceptionHandler, lockType = lockType, datadir = datadir
lock = lock,
jdbcUrlFile_opt = Some(jdbcUrlFile)
)
}

private def checkIfDatabaseUrlIsUnchanged(url: String, datadir: File ): Unit = {
val urlFile = new File(datadir, "last_jdbcurl")

def readString(path: Path): String = Files.readAllLines(path).get(0)

def writeString(path: Path, string: String): Unit = Files.write(path, java.util.Arrays.asList(string))

if (urlFile.exists()) {
val oldUrl = readString(urlFile.toPath)
if (oldUrl != url)
throw new RuntimeException(s"The database URL has changed since the last start. It was `$oldUrl`, now it's `$url`")
} else {
writeString(urlFile.toPath, url)
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.pg.PgUtils.DatabaseLock
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging

import java.sql.Statement
import javax.sql.DataSource
import scala.collection.immutable.Queue

class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends ChannelsDb with Logging {
class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging {

import PgUtils.ExtendedResultSet._
import PgUtils._
Expand Down
Loading

0 comments on commit 936f36b

Please sign in to comment.