Skip to content

Commit

Permalink
Auto merge of #5274 - str4d:zip-239-prep-3, r=str4d
Browse files Browse the repository at this point in the history
ZIP 239 preparations 3

Cherry-picked from the following upstream PRs:
- bitcoin/bitcoin#8080
- bitcoin/bitcoin#8082
- bitcoin/bitcoin#8126
- bitcoin/bitcoin#7910
  - This is the unsquashed version of bitcoin/bitcoin#8149
  - We take three cleanup commits to the protocol / `CInv` code.
- bitcoin/bitcoin#8822
- bitcoin/bitcoin#8880
  - Excluding the first commit (we don't have the comment it fixes yet).
- bitcoin/bitcoin#19322
  • Loading branch information
zkbot committed Aug 17, 2021
2 parents 88d8867 + 8469683 commit 56b5f95
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 179 deletions.
118 changes: 62 additions & 56 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ int64_t nMaxTipAge = DEFAULT_MAX_TIP_AGE;

std::optional<unsigned int> expiryDeltaArg = std::nullopt;


CFeeRate minRelayTxFee = CFeeRate(DEFAULT_MIN_RELAY_TX_FEE);
CAmount maxTxFee = DEFAULT_TRANSACTION_MAXFEE;

Expand Down Expand Up @@ -265,6 +266,12 @@ namespace {

/** Dirty block file entries. */
set<int> setDirtyFileInfo;

/** Relay map, protected by cs_main. */
typedef std::map<uint256, std::shared_ptr<const CTransaction>> MapRelay;
MapRelay mapRelay;
/** Expiration-time ordered list of (expire time, relay map entry) pairs, protected by cs_main). */
std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration;
} // anon namespace

//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -2062,8 +2069,10 @@ bool GetTransaction(const uint256& hash, CTransaction& txOut, const Consensus::P
LOCK(cs_main);

if (!blockIndex) {
if (mempool.lookup(hash, txOut))
std::shared_ptr<const CTransaction> ptx = mempool.get(hash);
if (ptx)
{
txOut = *ptx;
return true;
}

Expand Down Expand Up @@ -4146,7 +4155,7 @@ bool ActivateBestChain(CValidationState& state, const CChainParams& chainparams,
LOCK(cs_vNodes);
for (CNode* pnode : vNodes)
if (nNewHeight > (pnode->nStartingHeight != -1 ? pnode->nStartingHeight - 2000 : nBlockEstimate))
pnode->PushInventory(CInv(MSG_BLOCK, hashNewTip));
pnode->PushBlockInventory(hashNewTip);
}
// Notify external listeners about the new tip.
GetMainSignals().UpdatedBlockTip(pindexNewTip);
Expand Down Expand Up @@ -5709,11 +5718,11 @@ bool LoadExternalBlockFile(const CChainParams& chainparams, FILE* fileIn, CDiskB
unsigned int nSize = 0;
try {
// locate a header
unsigned char buf[MESSAGE_START_SIZE];
unsigned char buf[CMessageHeader::MESSAGE_START_SIZE];
blkdat.FindByte(chainparams.MessageStart()[0]);
nRewind = blkdat.GetPos()+1;
blkdat >> FLATDATA(buf);
if (memcmp(buf, chainparams.MessageStart(), MESSAGE_START_SIZE))
if (memcmp(buf, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE))
continue;
// read size
blkdat >> nSize;
Expand Down Expand Up @@ -6120,7 +6129,7 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
// Trigger the peer node to send a getblocks request for the next batch of inventory
if (inv.hash == pfrom->hashContinue)
{
// Bypass PushInventory, this must send even if redundant,
// Bypass PushBlockInventory, this must send even if redundant,
// and we want it right after the last block so they don't
// wait for other stuff first.
vector<CInv> vInv;
Expand All @@ -6130,38 +6139,24 @@ void static ProcessGetData(CNode* pfrom, const Consensus::Params& consensusParam
}
}
}
else if (inv.IsKnownType())
else if (inv.type == MSG_TX)
{
// Check the mempool to see if a transaction is expiring soon. If so, do not send to peer.
// Note that a transaction enters the mempool first, before the serialized form is cached
// in mapRelay after a successful relay.
bool isExpiringSoon = false;
bool pushed = false;
CTransaction tx;
bool isInMempool = mempool.lookup(inv.hash, tx);
if (isInMempool) {
isExpiringSoon = IsExpiringSoonTx(tx, currentHeight + 1);
}

if (!isExpiringSoon) {
// Send stream from relay memory
{
LOCK(cs_mapRelay);
map<uint256, CTransaction>::iterator mi = mapRelay.find(inv.hash);
if (mi != mapRelay.end()) {
pfrom->PushMessage(inv.GetCommand(), (*mi).second);
pushed = true;
}
}
if (!pushed && inv.type == MSG_TX) {
if (isInMempool) {
pfrom->PushMessage("tx", tx);
pushed = true;
}
// Send stream from relay memory
bool push = false;
auto mi = mapRelay.find(inv.hash);
if (mi != mapRelay.end() && !IsExpiringSoonTx(*mi->second, currentHeight + 1)) {
pfrom->PushMessage("tx", *mi->second);
push = true;
} else if (pfrom->timeLastMempoolReq) {
auto txinfo = mempool.info(inv.hash);
// To protect privacy, do not answer getdata using the mempool when
// that TX couldn't have been INVed in reply to a MEMPOOL request.
if (txinfo.tx && txinfo.nTime <= pfrom->timeLastMempoolReq && !IsExpiringSoonTx(*txinfo.tx, currentHeight + 1)) {
pfrom->PushMessage("tx", *txinfo.tx);
push = true;
}
}

if (!pushed) {
if (!push) {
vNotFound.push_back(inv);
}
}
Expand Down Expand Up @@ -6591,7 +6586,7 @@ bool static ProcessMessage(const CChainParams& chainparams, CNode* pfrom, string
LogPrint("net", " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString());
break;
}
pfrom->PushInventory(CInv(MSG_BLOCK, pindex->GetBlockHash()));
pfrom->PushBlockInventory(pindex->GetBlockHash());
if (--nLimit <= 0)
{
// When this block is requested, we'll send an inv that'll
Expand Down Expand Up @@ -7199,7 +7194,7 @@ bool ProcessMessages(const CChainParams& chainparams, CNode* pfrom)
it++;

// Scan for message start
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), MESSAGE_START_SIZE) != 0) {
if (memcmp(msg.hdr.pchMessageStart, chainparams.MessageStart(), CMessageHeader::MESSAGE_START_SIZE) != 0) {
LogPrintf("PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.hdr.GetCommand()), pfrom->id);
fOk = false;
break;
Expand All @@ -7220,11 +7215,12 @@ bool ProcessMessages(const CChainParams& chainparams, CNode* pfrom)
// Checksum
CDataStream& vRecv = msg.vRecv;
uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize);
unsigned int nChecksum = ReadLE32((unsigned char*)&hash);
if (nChecksum != hdr.nChecksum)
if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0)
{
LogPrintf("%s(%s, %u bytes): CHECKSUM ERROR nChecksum=%08x hdr.nChecksum=%08x\n", __func__,
SanitizeString(strCommand), nMessageSize, nChecksum, hdr.nChecksum);
LogPrintf("%s(%s, %u bytes): CHECKSUM ERROR expected %s was %s\n", __func__,
SanitizeString(strCommand), nMessageSize,
HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE),
HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE));
continue;
}

Expand Down Expand Up @@ -7449,27 +7445,22 @@ bool SendMessages(const Consensus::Params& params, CNode* pto)
if (!pto->fRelayTxes) pto->setInventoryTxToSend.clear();
}

int currentHeight = GetHeight();

// Respond to BIP35 mempool requests
if (fSendTrickle && pto->fSendMempool) {
std::vector<uint256> vtxid;
mempool.queryHashes(vtxid);
auto vtxinfo = mempool.infoAll();
pto->fSendMempool = false;

LOCK(pto->cs_filter);

int currentHeight = GetHeight();
for (const uint256& hash : vtxid) {
CTransaction tx;
bool fInMemPool = mempool.lookup(hash, tx);
if (fInMemPool && IsExpiringSoonTx(tx, currentHeight + 1)) {
continue;
}

for (const auto& txinfo : vtxinfo) {
const uint256& hash = txinfo.tx->GetHash();
CInv inv(MSG_TX, hash);
pto->setInventoryTxToSend.erase(hash);
if (IsExpiringSoonTx(*txinfo.tx, currentHeight + 1)) continue;
if (pto->pfilter) {
if (!fInMemPool) continue; // another thread removed since queryHashes, maybe...
if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue;
if (!pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
}
pto->filterInventoryKnown.insert(hash);
vInv.push_back(inv);
Expand All @@ -7478,6 +7469,7 @@ bool SendMessages(const Consensus::Params& params, CNode* pto)
vInv.clear();
}
}
pto->timeLastMempoolReq = GetTime();
}

// Determine transactions to relay
Expand Down Expand Up @@ -7509,14 +7501,28 @@ bool SendMessages(const Consensus::Params& params, CNode* pto)
continue;
}
// Not in the mempool anymore? don't bother sending it.
if (pto->pfilter) {
CTransaction tx;
if (!mempool.lookup(hash, tx)) continue;
if (!pto->pfilter->IsRelevantAndUpdate(tx)) continue;
auto txinfo = mempool.info(hash);
if (!txinfo.tx) {
continue;
}
if (IsExpiringSoonTx(*txinfo.tx, currentHeight + 1)) continue;
if (pto->pfilter && !pto->pfilter->IsRelevantAndUpdate(*txinfo.tx)) continue;
// Send
vInv.push_back(CInv(MSG_TX, hash));
nRelayedTransactions++;
{
// Expire old relay messages
while (!vRelayExpiration.empty() && vRelayExpiration.front().first < nNow)
{
mapRelay.erase(vRelayExpiration.front().second);
vRelayExpiration.pop_front();
}

auto ret = mapRelay.insert(std::make_pair(hash, std::move(txinfo.tx)));
if (ret.second) {
vRelayExpiration.push_back(std::make_pair(nNow + 15 * 60 * 1000000, ret.first));
}
}
if (vInv.size() == MAX_INV_SZ) {
pto->PushMessage("inv", vInv);
vInv.clear();
Expand Down
24 changes: 24 additions & 0 deletions src/memusage.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ struct stl_tree_node
X x;
};

struct stl_shared_counter
{
/* Various platforms use different sized counters here.
* Conservatively assume that they won't be larger than size_t. */
void* class_type;
size_t use_count;
size_t weak_count;
};

template<typename X>
static inline size_t DynamicUsage(const std::vector<X>& v)
{
Expand All @@ -93,6 +102,21 @@ static inline size_t DynamicUsage(const std::map<X, Y, C>& m)
return MallocUsage(sizeof(stl_tree_node<std::pair<const X, Y> >)) * m.size();
}

template<typename X>
static inline size_t DynamicUsage(const std::unique_ptr<X>& p)
{
return p ? MallocUsage(sizeof(X)) : 0;
}

template<typename X>
static inline size_t DynamicUsage(const std::shared_ptr<X>& p)
{
// A shared_ptr can either use a single continuous memory block for both
// the counter and the storage (when using std::make_shared), or separate.
// We can't observe the difference, however, so assume the worst.
return p ? MallocUsage(sizeof(X)) + MallocUsage(sizeof(stl_shared_counter)) : 0;
}

// Boost data structures

template<typename X>
Expand Down
25 changes: 4 additions & 21 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ std::string strSubVersion;

vector<CNode*> vNodes;
CCriticalSection cs_vNodes;
map<uint256, CTransaction> mapRelay;
deque<pair<int64_t, uint256> > vRelayExpiration;
CCriticalSection cs_mapRelay;
limitedmap<uint256, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);

static deque<string> vOneShots;
Expand Down Expand Up @@ -2034,23 +2031,10 @@ instance_of_cnetcleanup;

void RelayTransaction(const CTransaction& tx)
{
CInv inv(MSG_TX, tx.GetHash());
{
LOCK(cs_mapRelay);
// Expire old relay messages
while (!vRelayExpiration.empty() && vRelayExpiration.front().first < GetTime())
{
mapRelay.erase(vRelayExpiration.front().second);
vRelayExpiration.pop_front();
}

mapRelay.insert(std::make_pair(inv.hash, tx));
vRelayExpiration.push_back(std::make_pair(GetTime() + 15 * 60, inv.hash));
}
LOCK(cs_vNodes);
for (CNode* pnode : vNodes)
{
pnode->PushInventory(inv);
pnode->PushTxInventory(tx.GetHash());
}
}

Expand Down Expand Up @@ -2244,6 +2228,7 @@ CNode::CNode(SOCKET hSocketIn, const CAddress& addrIn, const std::string& addrNa
fRelayTxes = false;
fSentAddr = false;
pfilter = new CBloomFilter();
timeLastMempoolReq = 0;
nPingNonceSent = 0;
nPingUsecStart = 0;
nPingUsecTime = 0;
Expand Down Expand Up @@ -2370,10 +2355,8 @@ void CNode::EndMessage() UNLOCK_FUNCTION(cs_vSend)

// Set the checksum
uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end());
unsigned int nChecksum = 0;
memcpy(&nChecksum, &hash, sizeof(nChecksum));
assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + sizeof(nChecksum));
memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], &nChecksum, sizeof(nChecksum));
assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + CMessageHeader::CHECKSUM_SIZE);
memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE);

LogPrint("net", "(%d bytes) peer=%d\n", nSize, id);

Expand Down
22 changes: 12 additions & 10 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "utilstrencodings.h"
#include "chainparams.h"

#include <atomic>
#include <deque>
#include <stdint.h>
#include <atomic>
Expand Down Expand Up @@ -164,9 +165,6 @@ extern int nMaxConnections;

extern std::vector<CNode*> vNodes;
extern CCriticalSection cs_vNodes;
extern std::map<uint256, CTransaction> mapRelay;
extern std::deque<std::pair<int64_t, uint256> > vRelayExpiration;
extern CCriticalSection cs_mapRelay;
extern limitedmap<uint256, int64_t> mapAlreadyAskedFor;

extern std::vector<std::string> vAddedNodes;
Expand Down Expand Up @@ -355,6 +353,8 @@ class CNode
// Used for BIP35 mempool sending, also protected by cs_inventory
bool fSendMempool;

// Last time a "MEMPOOL" request was serviced.
std::atomic<int64_t> timeLastMempoolReq;
// Ping time measurement:
// The pong reply we're expecting, or 0 if no pong expected.
std::atomic<uint64_t> nPingNonceSent;
Expand Down Expand Up @@ -475,18 +475,20 @@ class CNode
}
}

void PushInventory(const CInv& inv)
void PushTxInventory(const uint256& hash)
{
LOCK(cs_inventory);
if (inv.type == MSG_TX) {
if (!filterInventoryKnown.contains(inv.hash)) {
setInventoryTxToSend.insert(inv.hash);
}
} else if (inv.type == MSG_BLOCK) {
vInventoryBlockToSend.push_back(inv.hash);
if (!filterInventoryKnown.contains(hash)) {
setInventoryTxToSend.insert(hash);
}
}

void PushBlockInventory(const uint256& hash)
{
LOCK(cs_inventory);
vInventoryBlockToSend.push_back(hash);
}

void AskFor(const CInv& inv);

// TODO: Document the postcondition of this function. Is cs_vSend locked?
Expand Down
Loading

0 comments on commit 56b5f95

Please sign in to comment.