Skip to content

Commit

Permalink
Schedule backup at regular interval (#1845)
Browse files Browse the repository at this point in the history
This is a bit less trigger happy than previously, and the implementation
is simpler.
  • Loading branch information
pm47 authored Jun 23, 2021
1 parent d43d06f commit 45204e2
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 85 deletions.
20 changes: 6 additions & 14 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ eclair {
password = "" // password for basic auth, must be non empty if json-rpc api is enabled
}

enable-db-backup = true // enable the automatic sqlite db backup; do not change this unless you know what you are doing
// override this with a script/exe that will be called everytime a new database backup has been created
# backup-notify-script = "/absolute/path/to/script.sh"

watch-spent-window = 1 minute // at startup watches will be put back within that window to reduce herd effect; must be > 0s

bitcoind {
Expand Down Expand Up @@ -242,16 +238,12 @@ eclair {
}
}

// do not edit or move this section
eclair {
backup-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1
}
backup-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
file-backup {
enabled = true // enable the automatic sqlite db backup; do not change this unless you know what you are doing
interval = 10 seconds // interval between two backups
target-file = "eclair.sqlite.bak" // name of the target backup file; will be placed under the chain directory
// override this with a script/exe that will be called everytime a new database backup has been created
# notify-script = "/absolute/path/to/script.sh"
}

akka {
Expand Down
5 changes: 4 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ object NodeParams extends Logging {
// v0.4.3
"min-feerate" -> "on-chain-fees.min-feerate",
"smooth-feerate-window" -> "on-chain-fees.smoothing-window",
"feerate-provider-timeout" -> "on-chain-fees.provider-timeout"
"feerate-provider-timeout" -> "on-chain-fees.provider-timeout",
// v0.6.1
"enable-db-backup" -> "file-backup.enabled",
"backup-notify-script" -> "file-backup.notify-script"
)
deprecatedKeyPaths.foreach {
case (old, new_) => require(!config.hasPath(old), s"configuration key '$old' has been replaced by '$new_'")
Expand Down
17 changes: 9 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import fr.acinq.eclair.channel.{Channel, Register}
import fr.acinq.eclair.crypto.WeakEntropyPool
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Peer, Server, Switchboard}
import fr.acinq.eclair.payment.receive.PaymentHandler
Expand Down Expand Up @@ -248,15 +249,15 @@ class Setup(val datadir: File,
wallet = new BitcoinCoreWallet(bitcoin)
_ = wallet.getReceiveAddress().map(address => logger.info(s"initial wallet address=$address"))

// do not change the name of this actor. it is used in the configuration to specify a custom bounded mailbox
backupHandler = if (config.getBoolean("enable-db-backup")) {
_ = if (config.getBoolean("file-backup.enabled")) {
nodeParams.db match {
case fileBackup: FileBackup => system.actorOf(SimpleSupervisor.props(
FileBackupHandler.props(
fileBackup,
new File(chaindir, "eclair.sqlite.bak"),
if (config.hasPath("backup-notify-script")) Some(config.getString("backup-notify-script")) else None),
"backuphandler", SupervisorStrategy.Resume))
case fileBackup: FileBackup if config.getBoolean("file-backup.enabled") =>
val fileBackupParams = FileBackupParams(
interval = FiniteDuration(config.getDuration("file-backup.interval").getSeconds, TimeUnit.SECONDS),
targetFile = new File(chaindir, config.getString("file-backup.target-file")),
script_opt = if (config.hasPath("file-backup.notify-script")) Some(config.getString("file-backup.notify-script")) else None
)
system.spawn(Behaviors.supervise(FileBackupHandler(fileBackup, fileBackupParams)).onFailure(typed.SupervisorStrategy.restart), name = "backuphandler")
case _ =>
system.deadLetters
}
Expand Down
143 changes: 89 additions & 54 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/FileBackupHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,80 +16,115 @@

package fr.acinq.eclair.db

import akka.actor.{Actor, ActorLogging, Props}
import akka.dispatch.{BoundedMessageQueueSemantics, RequiresMessageQueue}
import akka.Done
import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import fr.acinq.eclair.KamonExt
import fr.acinq.eclair.channel.ChannelPersisted
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.FileBackupHandler._
import fr.acinq.eclair.db.Monitoring.Metrics

import java.io.File
import java.nio.file.{Files, StandardCopyOption}
import java.util.concurrent.Executors
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.FiniteDuration
import scala.sys.process.Process
import scala.util.{Failure, Success, Try}


/**
* This actor will synchronously make a backup of the database it was initialized with whenever it receives
* a ChannelPersisted event.
* To avoid piling up messages and entering an endless backup loop, it is supposed to be used with a bounded mailbox
* with a single item:
*
* backup-mailbox {
* mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
* mailbox-capacity = 1
* }
*
* Messages that cannot be processed will be sent to dead letters
*
* NB: Constructor is private so users will have to use BackupHandler.props() which always specific a custom mailbox.
*
* @param databases database to backup
* @param backupFile backup file
* @param backupScript_opt (optional) script to execute after the backup completes
* This actor will make a backup of the database it was initialized with at a scheduled interval. It will only
* perform a backup if a ChannelPersisted event was received since the previous backup.
*/
class FileBackupHandler private(databases: FileBackup, backupFile: File, backupScript_opt: Option[String]) extends Actor with RequiresMessageQueue[BoundedMessageQueueSemantics] with ActorLogging {
object FileBackupHandler {

// we listen to ChannelPersisted events, which will trigger a backup
context.system.eventStream.subscribe(self, classOf[ChannelPersisted])
// @formatter:off

def receive: Receive = {
case persisted: ChannelPersisted =>
KamonExt.time(Metrics.FileBackupDuration.withoutTags()) {
val tmpFile = new File(backupFile.getAbsolutePath.concat(".tmp"))
databases.backup(tmpFile)
/**
* @param targetFile backup file
* @param script_opt (optional) script to execute after the backup completes
* @param interval interval between two backups
*/
case class FileBackupParams(interval: FiniteDuration,
targetFile: File,
script_opt: Option[String])

// this will throw an exception if it fails, which is possible if the backup file is not on the same filesystem
// as the temporary file
Files.move(tmpFile.toPath, backupFile.toPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE)
sealed trait Command
case class WrappedChannelPersisted(wrapped: ChannelPersisted) extends Command
private case object TickBackup extends Command
private case class BackupResult(result: Try[Done]) extends Command

// publish a notification that we have updated our backup
context.system.eventStream.publish(BackupCompleted)
Metrics.FileBackupCompleted.withoutTags().increment()
sealed trait BackupEvent
// this notification is sent when we have completed our backup process (our backup file is ready to be used)
case object BackupCompleted extends BackupEvent
// @formatter:on

// the backup task will run in this thread pool
private val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())

def apply(databases: FileBackup, backupParams: FileBackupParams): Behavior[Command] =
Behaviors.setup { context =>
// we listen to ChannelPersisted events, which will trigger a backup
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ChannelPersisted](WrappedChannelPersisted))
Behaviors.withTimers { timers =>
timers.startTimerAtFixedRate(TickBackup, backupParams.interval)
new FileBackupHandler(databases, backupParams, context).waiting(willBackupAtNextTick = false)
}
}
}

class FileBackupHandler private(databases: FileBackup,
backupParams: FileBackupParams,
context: ActorContext[Command]) {

backupScript_opt.foreach(backupScript => {
Try {
// run the script in the current thread and wait until it terminates
Process(backupScript).!
} match {
case Success(exitCode) => log.debug(s"backup notify script $backupScript returned $exitCode")
case Failure(cause) => log.warning(s"cannot start backup notify script $backupScript: $cause")
def waiting(willBackupAtNextTick: Boolean): Behavior[Command] =
Behaviors.receiveMessagePartial {
case _: WrappedChannelPersisted =>
context.log.debug("will perform backup at next tick")
waiting(willBackupAtNextTick = true)
case TickBackup => if (willBackupAtNextTick) {
context.log.debug("performing backup")
context.pipeToSelf(doBackup())(BackupResult)
backuping(willBackupAtNextTick = false)
} else {
Behaviors.same
}
}

def backuping(willBackupAtNextTick: Boolean): Behavior[Command] =
Behaviors.receiveMessagePartial {
case _: WrappedChannelPersisted =>
context.log.debug("will perform backup at next tick")
backuping(willBackupAtNextTick = true)
case BackupResult(res) =>
res match {
case Success(Done) => context.log.debug("backup succeeded")
case Failure(cause) => context.log.warn(s"backup failed: $cause")
}
})
}
}
waiting(willBackupAtNextTick)
}

sealed trait BackupEvent
private def doBackup(): Future[Done] = Future {
KamonExt.time(Metrics.FileBackupDuration.withoutTags()) {
val tmpFile = new File(backupParams.targetFile.getAbsolutePath.concat(".tmp"))
databases.backup(tmpFile)

// this notification is sent when we have completed our backup process (our backup file is ready to be used)
case object BackupCompleted extends BackupEvent
// this will throw an exception if it fails, which is possible if the backup file is not on the same filesystem
// as the temporary file
Files.move(tmpFile.toPath, backupParams.targetFile.toPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE)

object FileBackupHandler {
// using this method is the only way to create a BackupHandler actor
// we make sure that it uses a custom bounded mailbox, and a custom pinned dispatcher (i.e our actor will have its own thread pool with 1 single thread)
def props(databases: FileBackup, backupFile: File, backupScript_opt: Option[String]) =
Props(new FileBackupHandler(databases, backupFile, backupScript_opt))
.withMailbox("eclair.backup-mailbox")
.withDispatcher("eclair.backup-dispatcher")
}
// publish a notification that we have updated our backup
context.system.eventStream ! EventStream.Publish(BackupCompleted)
Metrics.FileBackupCompleted.withoutTags().increment()
}

// run the script in the current thread and wait until it terminates
backupParams.script_opt.foreach(backupScript => Process(backupScript).!)

Done
}(ec)

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package fr.acinq.eclair.db

import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.testkit.TestProbe
import fr.acinq.eclair.channel.ChannelPersisted
import fr.acinq.eclair.db.Databases.FileBackup
import fr.acinq.eclair.db.FileBackupHandler.{BackupCompleted, BackupEvent}
import fr.acinq.eclair.db.sqlite.SqliteChannelsDb
import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec
import fr.acinq.eclair.{TestConstants, TestDatabases, TestKitBaseClass, TestUtils, randomBytes32}
Expand All @@ -27,27 +29,33 @@ import org.scalatest.funsuite.AnyFunSuiteLike
import java.io.File
import java.sql.DriverManager
import java.util.UUID
import scala.concurrent.duration.DurationInt

class SqliteFileBackupHandlerSpec extends TestKitBaseClass with AnyFunSuiteLike {

test("process backups") {
val db = TestDatabases.inMemoryDb()
val wip = new File(TestUtils.BUILD_DIRECTORY, s"wip-${UUID.randomUUID()}")
val dest = new File(TestUtils.BUILD_DIRECTORY, s"backup-${UUID.randomUUID()}")
wip.deleteOnExit()
dest.deleteOnExit()
val channel = ChannelCodecsSpec.normal
db.channels.addOrUpdateChannel(channel)
assert(db.channels.listLocalChannels() == Seq(channel))

val handler = system.actorOf(FileBackupHandler.props(db.asInstanceOf[FileBackup], dest, None))
val params = FileBackupHandler.FileBackupParams(
interval = 10 seconds,
targetFile = dest,
script_opt = None
)

val handler = system.spawn(FileBackupHandler(db.asInstanceOf[FileBackup], params), name = "filebackup")
val probe = TestProbe()
system.eventStream.subscribe(probe.ref, classOf[BackupEvent])

handler ! ChannelPersisted(null, TestConstants.Alice.nodeParams.nodeId, randomBytes32(), null)
handler ! ChannelPersisted(null, TestConstants.Alice.nodeParams.nodeId, randomBytes32(), null)
handler ! ChannelPersisted(null, TestConstants.Alice.nodeParams.nodeId, randomBytes32(), null)
probe.expectMsg(BackupCompleted)
handler ! FileBackupHandler.WrappedChannelPersisted(ChannelPersisted(null, TestConstants.Alice.nodeParams.nodeId, randomBytes32(), null))
handler ! FileBackupHandler.WrappedChannelPersisted(ChannelPersisted(null, TestConstants.Alice.nodeParams.nodeId, randomBytes32(), null))
handler ! FileBackupHandler.WrappedChannelPersisted(ChannelPersisted(null, TestConstants.Alice.nodeParams.nodeId, randomBytes32(), null))
probe.expectMsg(20 seconds, BackupCompleted)
probe.expectNoMessage()

val db1 = new SqliteChannelsDb(DriverManager.getConnection(s"jdbc:sqlite:$dest"))
val check = db1.listLocalChannels()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ abstract class IntegrationSpec extends TestKitBaseClass with BitcoindService wit

val commonConfig = ConfigFactory.parseMap(Map(
"eclair.chain" -> "regtest",
"eclair.enable-db-backup" -> false,
"eclair.file-backup.enabled" -> false,
"eclair.server.public-ips.1" -> "127.0.0.1",
"eclair.bitcoind.port" -> bitcoindPort,
"eclair.bitcoind.rpcport" -> bitcoindRpcPort,
Expand Down

0 comments on commit 45204e2

Please sign in to comment.