Skip to content

Commit

Permalink
Add ZeroMQ support. Notify blocks and transactions via ZeroMQ
Browse files Browse the repository at this point in the history
Continues Johnathan Corgan's work.
Publishing multipart messages

Bugfix: Add missing zmq header includes

Bugfix: Adjust build system to link ZeroMQ code for Qt binaries
  • Loading branch information
Jeff Garzik authored and João Barbosa committed Sep 16, 2015
1 parent 1136879 commit e6a14b6
Show file tree
Hide file tree
Showing 16 changed files with 717 additions and 4 deletions.
22 changes: 22 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ AC_ARG_ENABLE([glibc-back-compat],
[use_glibc_compat=$enableval],
[use_glibc_compat=no])

AC_ARG_ENABLE([zmq],
[AC_HELP_STRING([--disable-zmq],
[Disable ZMQ notifications])],
[use_zmq=$enableval],
[use_zmq=yes])

AC_ARG_WITH([protoc-bindir],[AS_HELP_STRING([--with-protoc-bindir=BIN_DIR],[specify protoc bin path])], [protoc_bin_path=$withval], [])

# Enable debug
Expand Down Expand Up @@ -833,6 +839,22 @@ if test x$bitcoin_enable_qt != xno; then
fi
fi

# conditional search for and use libzmq
AC_MSG_CHECKING([whether to build ZMQ support])
if test "x$use_zmq" = "xyes"; then
AC_MSG_RESULT([yes])
PKG_CHECK_MODULES([ZMQ],[libzmq],
[AC_DEFINE([ENABLE_ZMQ],[1],[Define to 1 to enable ZMQ functions])],
[AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
AC_MSG_WARN([libzmq not found, disabling])
use_zmq=no])
else
AC_MSG_RESULT([no, --disable-zmq used])
AC_DEFINE_UNQUOTED([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions])
fi

AM_CONDITIONAL([ENABLE_ZMQ], [test "x$use_zmq" = "xyes"])

AC_MSG_CHECKING([whether to build test_bitcoin])
if test x$use_tests = xyes; then
AC_MSG_RESULT([yes])
Expand Down
37 changes: 37 additions & 0 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python2

import array
import binascii
import zmq

port = 28332

zmqContext = zmq.Context()
zmqSubSocket = zmqContext.socket(zmq.SUB)
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx")
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)

try:
while True:
msg = zmqSubSocket.recv_multipart()
topic = str(msg[0])
body = msg[1]

if topic == "hashblock":
print "- HASH BLOCK -"
print binascii.hexlify(body)
elif topic == "hashtx":
print '- HASH TX -'
print binascii.hexlify(body)
elif topic == "rawblock":
print "- RAW BLOCK HEADER -"
print binascii.hexlify(body[:80])
elif topic == "rawtx":
print '- RAW TX -'
print binascii.hexlify(body)

except KeyboardInterrupt:
zmqContext.destroy()
98 changes: 98 additions & 0 deletions doc/zmq.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Block and Transaction Broadcasting With ZeroMQ

[ZeroMQ](http://zeromq.org/) is a lightweight wrapper around TCP
connections, inter-process communications, and shared-memory,
providing various message-oriented semantics such as publish/subcribe,
request/reply, and push/pull.

The Bitcoin Core daemon can be configured to act as a trusted "border
router", implementing the bitcoin wire protocol and relay, making
consensus decisions, maintaining the local blockchain database,
broadcasting locally generated transactions into the network, and
providing a queryable RPC interface to interact on a polled basis for
requesting blockchain related data. However, there exists only a
limited service to notify external software of events like the arrival
of new blocks or transactions.

The ZeroMQ facility implements a notification interface through a
set of specific notifiers. Currently there are notifiers that publish
blocks and transactions. This read-only facility requires only the
connection of a corresponding ZeroMQ subscriber port in receiving
software; it is not authenticated nor is there any two-way protocol
involvement. Therefore, subscribers should validate the received data
since it may be out of date, incomplete or even invalid.

ZeroMQ sockets are self-connecting and self-healing; that is, connects
made between two endpoints will be automatically restored after an
outage, and either end may be freely started or stopped in any order.

Because ZeroMQ is message oriented, subscribers receive transactions
and blocks all-at-once and do not need to implement any sort of
buffering or reassembly.

## Prerequisites

The ZeroMQ feature in Bitcoin Core uses only a very small part of the
ZeroMQ C API, and is thus compatible with any version of ZeroMQ
from 2.1 onward, including all versions in the 3.x and 4.x release
series. Typically, it is packaged by distributions as something like
*libzmq-dev*.

The C++ wrapper for ZeroMQ is *not* needed.

## Enabling

By default, the ZeroMQ port functionality is enabled. Two steps are
required to enable--compiling in the ZeroMQ code, and configuring
runtime operation on the command-line or configuration file.

$ ./configure --enable-zmq (other options)

This will produce a binary that is capable of providing the ZeroMQ
facility, but will not do so until also configured properly.

## Usage

Currently, the following notifications are supported:

-zmqpubhashtx=address
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address

The socket type is PUB and the address must be a valid ZeroMQ
socket address. The same address can be used in more than one notification.

For instance:

$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw

Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the notification
`-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the
hexadecimal transaction hash (32 bytes).

These options can also be provided in bitcoin.conf.

ZeroMQ endpoint specifiers for TCP (and others) are documented in the
[ZeroMQ API](http://api.zeromq.org).

Client side, then, the ZeroMQ subscriber socket must have the
ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without
doing so will result in no messages arriving. Please see `contrib/zmq/zmq_sub.py`
for a working example.

## Remarks

From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB
sockets don't even have a read function. Thus, there is no state
introduced into bitcoind directly. Furthermore, no information is
broadcast that wasn't already received from the public P2P network.

No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.

Note that when the block chain tip changes, a reorganisation may occur and just
the tip will be notified. It is up to the subscriber to retrieve the chain
from the last known block to the new tip.
26 changes: 24 additions & 2 deletions src/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ if ENABLE_WALLET
BITCOIN_INCLUDES += $(BDB_CPPFLAGS)
EXTRA_LIBRARIES += libbitcoin_wallet.a
endif
if ENABLE_ZMQ
EXTRA_LIBRARIES += libbitcoin_zmq.a
endif

if BUILD_BITCOIN_LIBS
lib_LTLIBRARIES = libbitcoinconsensus.la
Expand Down Expand Up @@ -157,7 +160,12 @@ BITCOIN_CORE_H = \
wallet/db.h \
wallet/wallet.h \
wallet/wallet_ismine.h \
wallet/walletdb.h
wallet/walletdb.h \
zmq/zmqabstractnotifier.h \
zmq/zmqconfig.h\
zmq/zmqnotificationinterface.h \
zmq/zmqpublishnotifier.h


obj/build.h: FORCE
@$(MKDIR_P) $(builddir)/obj
Expand Down Expand Up @@ -199,6 +207,17 @@ libbitcoin_server_a_SOURCES = \
validationinterface.cpp \
$(BITCOIN_CORE_H)

if ENABLE_ZMQ
LIBBITCOIN_ZMQ=libbitcoin_zmq.a

libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES)
libbitcoin_zmq_a_SOURCES = \
zmq/zmqabstractnotifier.cpp \
zmq/zmqnotificationinterface.cpp \
zmq/zmqpublishnotifier.cpp
endif


# wallet: shared between bitcoind and bitcoin-qt, but only linked
# when wallet enabled
libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
Expand Down Expand Up @@ -320,12 +339,15 @@ bitcoind_LDADD = \
$(LIBMEMENV) \
$(LIBSECP256K1)

if ENABLE_ZMQ
bitcoind_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif

if ENABLE_WALLET
bitcoind_LDADD += libbitcoin_wallet.a
endif

bitcoind_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
#

# bitcoin-cli binary #
bitcoin_cli_SOURCES = bitcoin-cli.cpp
Expand Down
3 changes: 3 additions & 0 deletions src/Makefile.qt.include
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER)
if ENABLE_WALLET
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
endif
if ENABLE_ZMQ
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \
$(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
$(EVENT_PTHREADS_LIBS) $(EVENT_LIBS)
Expand Down
3 changes: 3 additions & 0 deletions src/Makefile.qttest.include
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER)
if ENABLE_WALLET
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET)
endif
if ENABLE_ZMQ
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS)
endif
qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \
$(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \
$(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \
Expand Down
4 changes: 4 additions & 0 deletions src/Makefile.test.include
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ endif
test_test_bitcoin_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS)
test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static

if ENABLE_ZMQ
test_test_bitcoin_LDADD += $(ZMQ_LIBS)
endif

nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES)

$(BITCOIN_TESTS): $(GENERATED_TEST_FILES)
Expand Down
36 changes: 35 additions & 1 deletion src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include "wallet/wallet.h"
#include "wallet/walletdb.h"
#endif

#include <stdint.h>
#include <stdio.h>

Expand All @@ -55,13 +54,21 @@
#include <boost/thread.hpp>
#include <openssl/crypto.h>

#if ENABLE_ZMQ
#include "zmq/zmqnotificationinterface.h"
#endif

using namespace std;

#ifdef ENABLE_WALLET
CWallet* pwalletMain = NULL;
#endif
bool fFeeEstimatesInitialized = false;

#if ENABLE_ZMQ
static CZMQNotificationInterface* pzmqNotificationInterface = NULL;
#endif

#ifdef WIN32
// Win32 LevelDB doesn't use filedescriptors, and the ones used for
// accessing block files don't count towards the fd_set size limit
Expand Down Expand Up @@ -211,6 +218,16 @@ void Shutdown()
if (pwalletMain)
pwalletMain->Flush(true);
#endif

#if ENABLE_ZMQ
if (pzmqNotificationInterface) {
UnregisterValidationInterface(pzmqNotificationInterface);
pzmqNotificationInterface->Shutdown();
delete pzmqNotificationInterface;
pzmqNotificationInterface = NULL;
}
#endif

#ifndef WIN32
try {
boost::filesystem::remove(GetPidFile());
Expand Down Expand Up @@ -375,6 +392,14 @@ std::string HelpMessage(HelpMessageMode mode)
" " + _("(1 = keep tx meta data e.g. account owner and payment request information, 2 = drop tx meta data)"));
#endif

#if ENABLE_ZMQ
strUsage += HelpMessageGroup(_("ZeroMQ notification options:"));
strUsage += HelpMessageOpt("-zmqpubhashblock=<address>", _("Enable publish hash block in <address>"));
strUsage += HelpMessageOpt("-zmqpubhashtransaction=<address>", _("Enable publish hash transaction in <address>"));
strUsage += HelpMessageOpt("-zmqpubrawblock=<address>", _("Enable publish raw block in <address>"));
strUsage += HelpMessageOpt("-zmqpubrawtransaction=<address>", _("Enable publish raw transaction in <address>"));
#endif

strUsage += HelpMessageGroup(_("Debugging/Testing options:"));
if (showDebug)
{
Expand Down Expand Up @@ -1125,6 +1150,15 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
BOOST_FOREACH(const std::string& strDest, mapMultiArgs["-seednode"])
AddOneShot(strDest);

#if ENABLE_ZMQ
pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs);

if (pzmqNotificationInterface) {
pzmqNotificationInterface->Initialize();
RegisterValidationInterface(pzmqNotificationInterface);
}
#endif

// ********************************************************* Step 7: load block chain

fReindex = GetBoolArg("-reindex", false);
Expand Down
1 change: 0 additions & 1 deletion src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ void UnregisterAllValidationInterfaces() {
g_signals.SetBestChain.disconnect_all_slots();
g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.SyncTransaction.disconnect_all_slots();
g_signals.UpdatedTransaction.disconnect_all_slots();
g_signals.UpdatedBlockTip.disconnect_all_slots();
}

Expand Down
22 changes: 22 additions & 0 deletions src/zmq/zmqabstractnotifier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include "zmqabstractnotifier.h"
#include "util.h"


CZMQAbstractNotifier::~CZMQAbstractNotifier()
{
assert(!psocket);
}

bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/)
{
return true;
}

bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
{
return true;
}
Loading

0 comments on commit e6a14b6

Please sign in to comment.