Skip to content

Commit

Permalink
Add json columns in Postgres (#1865)
Browse files Browse the repository at this point in the history
A json column has been added to the few tables that contains an
opaque serialized blob:
- `local_channels.data`
- `nodes.data`
- `channels.channel_announcement`, `channels.channel_update_x`

We can now access all the individual data fields from SQL.

For the serialization, we use the same serializers than the one
that were previously used by the API. They have been moved to the
`eclair-core` module and simplified a bit.

There are two json data types in Postgres: `JSON` and `JSONB`. We use
the latter one, which is more recent, and allows indexing.

An alternative to this PR would have been to use columns, but:
- there would have been a *lot* of columns for the channel data
- every modification of our types would have required a db migration

NB: to handle non-backwards compatible changes in the json serializersi,
 all the json columns can be recomputed on restart by setting
`eclair.db.reset-json-columns=true`.

Change in in ChannelCodecsSpec:

The goal of this test is to make sure that, in addition to successfully
decoding data that encoded with an older codec, we actually read the
correct data. Just because there is no error doesn't mean that we
interpreted the data properly. For example we could invert a
`payment_hash` and a `payment_preimage`.

We can't compare object to object, because the current version of the
class has probably changed too. That's why we compare using the json
representation of the data, that we amend to ignore new or modified
fields.

After doing a manual comparison, I updated the test to use the current
json serializers, and replaced the test data with the latest json
serialization. This allows us to remove all the tweaks that we added
over time to take into account new and updated fields.
  • Loading branch information
pm47 authored Jul 8, 2021
1 parent bd57d41 commit 08faf3b
Show file tree
Hide file tree
Showing 13 changed files with 651 additions and 529 deletions.
1 change: 1 addition & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ eclair {
username = ""
password = ""
readonly-user = "" // if defined, this user will be granted read-only access to all tables in the database
reset-json-columns = false // in case of a json format change, this allows a full re-serialization of json data
pool {
max-size = 10 // recommended value = number_of_cpu_cores * 2
connection-timeout = 30 seconds
Expand Down
17 changes: 14 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ object Databases extends Logging {
instanceId: UUID,
lock: PgLock = PgLock.NoLock,
jdbcUrlFile_opt: Option[File],
readOnlyUser_opt: Option[String])(implicit system: ActorSystem): PostgresDatabases = {
readOnlyUser_opt: Option[String],
resetJsonColumns: Boolean)(implicit system: ActorSystem): PostgresDatabases = {

jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))

Expand Down Expand Up @@ -132,6 +133,14 @@ object Databases extends Logging {
}
}

if (resetJsonColumns) {
logger.warn("resetting json columns...")
PgUtils.inTransaction { connection =>
databases.channels.resetJsonColumns(connection)
databases.network.resetJsonColumns(connection)
}
}

databases
}

Expand Down Expand Up @@ -197,7 +206,8 @@ object Databases extends Logging {
val port = dbConfig.getInt("postgres.port")
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))
val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user"))
val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user"))
val resetJsonColumns = dbConfig.getBoolean("postgres.reset-json-columns")

val hikariConfig = new HikariConfig()
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
Expand Down Expand Up @@ -230,7 +240,8 @@ object Databases extends Logging {
instanceId = instanceId,
lock = lock,
jdbcUrlFile_opt = Some(jdbcUrlFile),
readOnlyUser_opt = readOnlyUser_opt
readOnlyUser_opt = readOnlyUser_opt,
resetJsonColumns = resetJsonColumns
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package fr.acinq.eclair.db.jdbc

import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.MilliSatoshi
import grizzled.slf4j.Logger
import org.sqlite.SQLiteConnection
import scodec.Decoder
import scodec.bits.{BitVector, ByteVector}

import java.sql.{Connection, ResultSet, Statement, Timestamp}
import java.sql.{Connection, PreparedStatement, ResultSet, Statement, Timestamp}
import java.util.UUID
import javax.sql.DataSource

Expand Down Expand Up @@ -98,6 +99,35 @@ trait JdbcUtils {
}
}

/**
* A utility method that efficiently migrate a table using a provided migration method
*/
def migrateTable(source: Connection,
destination: Connection,
sourceTable: String,
migrateSql: String,
migrate: (ResultSet, PreparedStatement) => Unit)(implicit logger: Logger): Int = {
val insertStatement = destination.prepareStatement(migrateSql)
val batchSize = 50
JdbcUtils.using(source.prepareStatement(s"SELECT * FROM $sourceTable")) { queryStatement =>
val rs = queryStatement.executeQuery()
var inserted = 0
var batchCount = 0
while (rs.next()) {
migrate(rs, insertStatement)
insertStatement.addBatch()
batchCount = batchCount + 1
if (batchCount % batchSize == 0) {
inserted = inserted + insertStatement.executeBatch().sum
batchCount = 0
}
}
inserted = inserted + insertStatement.executeBatch().sum
logger.info(s"migrated $inserted rows from table $sourceTable")
inserted
}
}

case class ExtendedResultSet(rs: ResultSet) extends Iterable[ResultSet] {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,21 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
import grizzled.slf4j.Logging
import scodec.bits.BitVector

import java.sql.{Statement, Timestamp}
import java.sql.{Connection, Statement, Timestamp}
import java.time.Instant
import javax.sql.DataSource

class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging {

import PgUtils.ExtendedResultSet._
import PgUtils._
import fr.acinq.eclair.json.JsonSerializers.{formats, serialization}
import lock._

val DB_NAME = "channels"
val CURRENT_VERSION = 4
val CURRENT_VERSION = 5

inTransaction { pg =>
using(pg.createStatement()) { statement =>
Expand All @@ -62,37 +64,68 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT")
}

def migration45(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN json JSONB")
resetJsonColumns(pg)
statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN json SET NOT NULL")
statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local_channels ((json->>'type'))")
statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local_channels ((json->'commitments'->'remoteParams'->>'nodeId'))")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)")
statement.executeUpdate("CREATE TABLE local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)")
statement.executeUpdate("CREATE TABLE htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")

statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local_channels ((json->>'type'))")
statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local_channels ((json->'commitments'->'remoteParams'->>'nodeId'))")
statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
case Some(v@2) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
migration34(statement)
migration45(statement)
case Some(v@3) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration34(statement)
migration45(statement)
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

/** Sometimes we may want to do a full reset when we update the json format */
def resetJsonColumns(connection: Connection): Unit = {
migrateTable(connection, connection,
"local_channels",
"UPDATE local_channels SET json=?::JSONB WHERE channel_id=?",
(rs, statement) => {
val state = stateDataCodec.decode(BitVector(rs.getBytes("data"))).require.value
val json = serialization.writePretty(state)
statement.setString(1, json)
statement.setString(2, state.channelId.toHex)
}
)(logger)
}

override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Postgres) {
withLock { pg =>
val data = stateDataCodec.encode(state).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local_channels (channel_id, data, is_closed)
| VALUES (?, ?, FALSE)
| INSERT INTO local_channels (channel_id, data, json, is_closed)
| VALUES (?, ?, ?::JSONB, FALSE)
| ON CONFLICT (channel_id)
| DO UPDATE SET data = EXCLUDED.data ;
| DO UPDATE SET data = EXCLUDED.data, json = EXCLUDED.json ;
| """.stripMargin)) { statement =>
statement.setString(1, state.channelId.toHex)
statement.setBytes(2, data)
statement.setString(3, serialization.writePretty(state))
statement.executeUpdate()
}
}
Expand Down
Loading

0 comments on commit 08faf3b

Please sign in to comment.