Skip to content

Commit

Permalink
Use proper data type for timestamps in Postgres 2 (#1862)
Browse files Browse the repository at this point in the history
For some reason, the payments database was forgotten by #1778.
  • Loading branch information
pm47 authored Jul 8, 2021
1 parent f8feb19 commit cea3fc0
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 138 deletions.
73 changes: 43 additions & 30 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import scodec.Attempt
import scodec.bits.BitVector
import scodec.codecs._

import java.sql.{ResultSet, Statement}
import java.sql.{ResultSet, Statement, Timestamp}
import java.time.Instant
import java.util.UUID
import javax.sql.DataSource
import scala.concurrent.duration._

class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb with Logging {

Expand All @@ -42,7 +42,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
import lock._

val DB_NAME = "payments"
val CURRENT_VERSION = 5
val CURRENT_VERSION = 6

private val hopSummaryCodec = (("node_id" | CommonCodecs.publicKey) :: ("next_node_id" | CommonCodecs.publicKey) :: ("short_channel_id" | optional(bool, CommonCodecs.shortchannelid))).as[HopSummary]
private val paymentRouteCodec = discriminated[List[HopSummary]].by(byte)
Expand All @@ -62,12 +62,21 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
statement.executeUpdate("ALTER TABLE sent SET SCHEMA payments")
}

def migration56(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN created_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + created_at * interval '1 millisecond'")
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN expire_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + expire_at * interval '1 millisecond'")
statement.executeUpdate("ALTER TABLE payments.received ALTER COLUMN received_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + received_at * interval '1 millisecond'")

statement.executeUpdate("ALTER TABLE payments.sent ALTER COLUMN created_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + created_at * interval '1 millisecond'")
statement.executeUpdate("ALTER TABLE payments.sent ALTER COLUMN completed_at SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + completed_at * interval '1 millisecond'")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA payments")

statement.executeUpdate("CREATE TABLE payments.received (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at BIGINT NOT NULL, expire_at BIGINT NOT NULL, received_at BIGINT)")
statement.executeUpdate("CREATE TABLE payments.sent (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at BIGINT NOT NULL, completed_at BIGINT)")
statement.executeUpdate("CREATE TABLE payments.received (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, expire_at TIMESTAMP WITH TIME ZONE NOT NULL, received_at TIMESTAMP WITH TIME ZONE)")
statement.executeUpdate("CREATE TABLE payments.sent (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at TIMESTAMP WITH TIME ZONE NOT NULL, completed_at TIMESTAMP WITH TIME ZONE)")

statement.executeUpdate("CREATE INDEX sent_parent_id_idx ON payments.sent(parent_id)")
statement.executeUpdate("CREATE INDEX sent_payment_hash_idx ON payments.sent(payment_hash)")
Expand All @@ -76,6 +85,10 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand All @@ -95,7 +108,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
statement.setLong(6, sent.amount.toLong)
statement.setLong(7, sent.recipientAmount.toLong)
statement.setString(8, sent.recipientNodeId.value.toHex)
statement.setLong(9, sent.createdAt)
statement.setTimestamp(9, Timestamp.from(Instant.ofEpochMilli(sent.createdAt)))
statement.setString(10, sent.paymentRequest.map(PaymentRequest.write).orNull)
statement.executeUpdate()
}
Expand All @@ -106,7 +119,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, payment_preimage, fees_msat, payment_route) = (?, ?, ?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
paymentResult.parts.foreach(p => {
statement.setLong(1, p.timestamp)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(p.timestamp)))
statement.setString(2, paymentResult.paymentPreimage.toHex)
statement.setLong(3, p.feesPaid.toLong)
statement.setBytes(4, paymentRouteCodec.encode(p.route.getOrElse(Nil).map(h => HopSummary(h)).toList).require.toByteArray)
Expand All @@ -121,7 +134,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = withMetrics("payments/update-outgoing-failed", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.sent SET (completed_at, failures) = (?, ?) WHERE id = ? AND completed_at IS NULL")) { statement =>
statement.setLong(1, paymentResult.timestamp)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(paymentResult.timestamp)))
statement.setBytes(2, paymentFailuresCodec.encode(paymentResult.failures.map(f => FailureSummary(f)).toList).require.toByteArray)
statement.setString(3, paymentResult.id.toString)
if (statement.executeUpdate() == 0) throw new IllegalArgumentException(s"Tried to mark an outgoing payment as failed but already in final status (id=${paymentResult.id})")
Expand All @@ -134,7 +147,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
rs.getByteVector32FromHexNullable("payment_preimage"),
rs.getMilliSatoshiNullable("fees_msat"),
rs.getBitVectorOpt("payment_route"),
rs.getLongNullable("completed_at"),
rs.getTimestampNullable("completed_at").map(_.getTime),
rs.getBitVectorOpt("failures"))

OutgoingPayment(
Expand All @@ -146,7 +159,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
MilliSatoshi(rs.getLong("amount_msat")),
MilliSatoshi(rs.getLong("recipient_amount_msat")),
PublicKey(rs.getByteVectorFromHex("recipient_node_id")),
rs.getLong("created_at"),
rs.getTimestamp("created_at").getTime,
rs.getStringNullable("payment_request").map(PaymentRequest.read),
status
)
Expand Down Expand Up @@ -207,8 +220,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = withMetrics("payments/list-outgoing-by-timestamp", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.sent WHERE created_at >= ? AND created_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map { rs =>
parseOutgoingPayment(rs)
}.toSeq
Expand All @@ -223,8 +236,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
statement.setString(2, preimage.toHex)
statement.setString(3, paymentType)
statement.setString(4, PaymentRequest.write(pr))
statement.setLong(5, pr.timestamp.seconds.toMillis) // BOLT11 timestamp is in seconds
statement.setLong(6, (pr.timestamp + pr.expiry.getOrElse(PaymentRequest.DEFAULT_EXPIRY_SECONDS.toLong)).seconds.toMillis)
statement.setTimestamp(5, Timestamp.from(Instant.ofEpochSecond(pr.timestamp))) // BOLT11 timestamp is in seconds
statement.setTimestamp(6, Timestamp.from(Instant.ofEpochSecond(pr.timestamp + pr.expiry.getOrElse(PaymentRequest.DEFAULT_EXPIRY_SECONDS.toLong))))
statement.executeUpdate()
}
}
Expand All @@ -234,7 +247,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.received SET (received_msat, received_at) = (? + COALESCE(received_msat, 0), ?) WHERE payment_hash = ?")) { update =>
update.setLong(1, amount.toLong)
update.setLong(2, receivedAt)
update.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(receivedAt)))
update.setString(3, paymentHash.toHex)
val updated = update.executeUpdate()
if (updated == 0) {
Expand All @@ -250,8 +263,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
PaymentRequest.read(paymentRequest),
rs.getByteVector32FromHex("payment_preimage"),
rs.getString("payment_type"),
rs.getLong("created_at"),
buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), Some(paymentRequest), rs.getLongNullable("received_at")))
rs.getTimestamp("created_at").getTime,
buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), Some(paymentRequest), rs.getTimestampNullable("received_at").map(_.getTime)))
}

private def buildIncomingPaymentStatus(amount_opt: Option[MilliSatoshi], serializedPaymentRequest_opt: Option[String], receivedAt_opt: Option[Long]): IncomingPaymentStatus = {
Expand All @@ -274,8 +287,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE created_at > ? AND created_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand All @@ -284,8 +297,8 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-received", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat > 0 AND created_at > ? AND created_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand All @@ -294,9 +307,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-pending", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at > ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(3, System.currentTimeMillis)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.setTimestamp(3, Timestamp.from(Instant.now()))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand All @@ -305,9 +318,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = withMetrics("payments/list-incoming-expired", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("SELECT * FROM payments.received WHERE received_msat IS NULL AND created_at > ? AND created_at < ? AND expire_at < ? ORDER BY created_at")) { statement =>
statement.setLong(1, from)
statement.setLong(2, to)
statement.setLong(3, System.currentTimeMillis)
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
statement.setTimestamp(3, Timestamp.from(Instant.now()))
statement.executeQuery().map(parseIncomingPayment).toSeq
}
}
Expand Down Expand Up @@ -366,9 +379,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
val paymentType = rs.getString("payment_type")
val paymentRequest_opt = rs.getStringNullable("payment_request")
val amount_opt = rs.getMilliSatoshiNullable("final_amount")
val createdAt = rs.getLong("created_at")
val completedAt_opt = rs.getLongNullable("completed_at")
val expireAt_opt = rs.getLongNullable("expire_at")
val createdAt = rs.getTimestamp("created_at").getTime
val completedAt_opt = rs.getTimestampNullable("completed_at").map(_.getTime)
val expireAt_opt = rs.getTimestampNullable("expire_at").map(_.getTime)

if (rs.getString("type") == "received") {
val status: IncomingPaymentStatus = buildIncomingPaymentStatus(amount_opt, paymentRequest_opt, completedAt_opt)
Expand Down
Loading

0 comments on commit cea3fc0

Please sign in to comment.