Skip to content

Commit

Permalink
net: add a new message queue for the message processor
Browse files Browse the repository at this point in the history
This separates the storage of messages from the net and queued messages
for processing, allowing the locks to be split.
  • Loading branch information
Fuzzbawls committed Sep 2, 2020
1 parent 701b578 commit 5581b47
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 16 deletions.
25 changes: 10 additions & 15 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6099,21 +6099,16 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
if (pfrom->nSendSize >= nMaxSendBufferSize)
return false;

auto it = pfrom->vRecvMsg.begin();
if (it == pfrom->vRecvMsg.end())
return false;

// end, if an incomplete message is found
if (!it->complete())
return false;

// get next message
CNetMessage msg = std::move(*it);

// at this point, any failure means we can delete the current message
pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin());

fMoreWork = !pfrom->vRecvMsg.empty() && pfrom->vRecvMsg.front().complete();
std::list<CNetMessage> msgs;
{
LOCK(pfrom->cs_vProcessMsg);
if (pfrom->vProcessMsg.empty())
return false;
// Just take one message
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
fMoreWork = !pfrom->vProcessMsg.empty();
}
CNetMessage& msg(msgs.front());

msg.SetVersion(pfrom->GetRecvVersion());
// Scan for message start
Expand Down
12 changes: 11 additions & 1 deletion src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1244,8 +1244,18 @@ void CConnman::ThreadSocketHandler()
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
if (notify)
if (notify) {
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
}
WakeMessageHandler();
}
} else if (nBytes == 0) {
// socket closed gracefully
if (!pnode->fDisconnect)
Expand Down
3 changes: 3 additions & 0 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ class CNode
std::deque<CSerializeData> vSendMsg;
RecursiveMutex cs_vSend;

RecursiveMutex cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;

RecursiveMutex cs_sendProcessing;

std::deque<CInv> vRecvGetData;
Expand Down

0 comments on commit 5581b47

Please sign in to comment.