Skip to content

Commit

Permalink
Add TCP keep-alive on ZMQ socket (#1807)
Browse files Browse the repository at this point in the history
One of ZMQ's drawbacks is that subscribers on an unreliable network may
silently disconnect from publishers in case of network failures.

In our case, we want to reconnect immediately when that happens, so we set
a tcp keep-alive to ensure this.

Fixes #1789
  • Loading branch information
t-bast authored May 17, 2021
1 parent 9141998 commit 1fbede7
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
2 changes: 1 addition & 1 deletion eclair-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.0</version>
<version>0.5.2</version>
</dependency>
<!-- SERIALIZATION -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]]
subscriber.monitor("inproc://events", ZMQ.EVENT_CONNECTED | ZMQ.EVENT_DISCONNECTED)
subscriber.connect(address)
subscriber.subscribe(topic.getBytes(ZMQ.CHARSET))
subscriber.setTCPKeepAlive(1)

val monitor = ctx.createSocket(SocketType.PAIR)
monitor.connect("inproc://events")
Expand All @@ -49,18 +50,18 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]]

// we check messages in a non-blocking manner with an interval, making sure to retrieve all messages before waiting again
@tailrec
final def checkEvent: Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match {
final def checkEvent(): Unit = Option(Event.recv(monitor, ZMQ.DONTWAIT)) match {
case Some(event) =>
self ! event
checkEvent
checkEvent()
case None => ()
}

@tailrec
final def checkMsg: Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match {
final def checkMsg(): Unit = Option(ZMsg.recvMsg(subscriber, ZMQ.DONTWAIT)) match {
case Some(msg) =>
self ! msg
checkMsg
checkMsg()
case None => ()
}

Expand All @@ -69,11 +70,11 @@ class ZMQActor(address: String, topic: String, connected: Option[Promise[Done]]

override def receive: Receive = {
case Symbol("checkEvent") =>
checkEvent
checkEvent()
context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkEvent"))

case Symbol("checkMsg") =>
checkMsg
checkMsg()
context.system.scheduler.scheduleOnce(1 second, self, Symbol("checkMsg"))

case event: Event => event.getEvent match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind
}
}

test("reconnect ZMQ automatically") {
withWatcher(f => {
import f._

// When the watcher starts, it broadcasts the current height.
val block1 = listener.expectMsgType[CurrentBlockCount]
listener.expectNoMessage(100 millis)

restartBitcoind(probe)
generateBlocks(1)
val block2 = listener.expectMsgType[CurrentBlockCount]
assert(block2.blockCount === block1.blockCount + 1)
listener.expectNoMessage(100 millis)
})
}

test("add/remove watches from/to utxo map") {
val m0 = Map.empty[OutPoint, Set[Watch[_ <: WatchTriggered]]]
val txid = randomBytes32
Expand Down

0 comments on commit 1fbede7

Please sign in to comment.