Skip to content

Commit 0e31ae9

Browse files
committed
Merge pull request #2060 from sipa/parallel
Parallel script verification
2 parents 91f70a7 + ef0f422 commit 0e31ae9

File tree

9 files changed

+342
-41
lines changed

9 files changed

+342
-41
lines changed

bitcoin-qt.pro

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ HEADERS += src/qt/bitcoingui.h \
157157
src/irc.h \
158158
src/bloom.h \
159159
src/mruset.h \
160+
src/checkqueue.h \
160161
src/json/json_spirit_writer_template.h \
161162
src/json/json_spirit_writer.h \
162163
src/json/json_spirit_value.h \

src/checkqueue.h

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright (c) 2012 The Bitcoin developers
2+
// Distributed under the MIT/X11 software license, see the accompanying
3+
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
4+
#ifndef CHECKQUEUE_H
5+
#define CHECKQUEUE_H
6+
7+
#include <boost/thread/mutex.hpp>
8+
#include <boost/thread/locks.hpp>
9+
#include <boost/thread/condition_variable.hpp>
10+
11+
#include <vector>
12+
#include <algorithm>
13+
14+
template<typename T> class CCheckQueueControl;
15+
16+
/** Queue for verifications that have to be performed.
17+
* The verifications are represented by a type T, which must provide an
18+
* operator(), returning a bool.
19+
*
20+
* One thread (the master) is assumed to push batches of verifications
21+
* onto the queue, where they are processed by N-1 worker threads. When
22+
* the master is done adding work, it temporarily joins the worker pool
23+
* as an N'th worker, until all jobs are done.
24+
*/
25+
template<typename T> class CCheckQueue {
26+
private:
27+
// Mutex to protect the inner state
28+
boost::mutex mutex;
29+
30+
// Worker threads block on this when out of work
31+
boost::condition_variable condWorker;
32+
33+
// Master thread blocks on this when out of work
34+
boost::condition_variable condMaster;
35+
36+
// Quit method blocks on this until all workers are gone
37+
boost::condition_variable condQuit;
38+
39+
// The queue of elements to be processed.
40+
// As the order of booleans doesn't matter, it is used as a LIFO (stack)
41+
std::vector<T> queue;
42+
43+
// The number of workers (including the master) that are idle.
44+
int nIdle;
45+
46+
// The total number of workers (including the master).
47+
int nTotal;
48+
49+
// The temporary evaluation result.
50+
bool fAllOk;
51+
52+
// Number of verifications that haven't completed yet.
53+
// This includes elements that are not anymore in queue, but still in
54+
// worker's own batches.
55+
unsigned int nTodo;
56+
57+
// Whether we're shutting down.
58+
bool fQuit;
59+
60+
// The maximum number of elements to be processed in one batch
61+
unsigned int nBatchSize;
62+
63+
// Internal function that does bulk of the verification work.
64+
bool Loop(bool fMaster = false) {
65+
boost::condition_variable &cond = fMaster ? condMaster : condWorker;
66+
std::vector<T> vChecks;
67+
vChecks.reserve(nBatchSize);
68+
unsigned int nNow = 0;
69+
bool fOk = true;
70+
do {
71+
{
72+
boost::unique_lock<boost::mutex> lock(mutex);
73+
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
74+
if (nNow) {
75+
fAllOk &= fOk;
76+
nTodo -= nNow;
77+
if (nTodo == 0 && !fMaster)
78+
// We processed the last element; inform the master he can exit and return the result
79+
condMaster.notify_one();
80+
} else {
81+
// first iteration
82+
nTotal++;
83+
}
84+
// logically, the do loop starts here
85+
while (queue.empty()) {
86+
if ((fMaster || fQuit) && nTodo == 0) {
87+
nTotal--;
88+
if (nTotal==0)
89+
condQuit.notify_one();
90+
bool fRet = fAllOk;
91+
// reset the status for new work later
92+
if (fMaster)
93+
fAllOk = true;
94+
// return the current status
95+
return fRet;
96+
}
97+
nIdle++;
98+
cond.wait(lock); // wait
99+
nIdle--;
100+
}
101+
// Decide how many work units to process now.
102+
// * Do not try to do everything at once, but aim for increasingly smaller batches so
103+
// all workers finish approximately simultaneously.
104+
// * Try to account for idle jobs which will instantly start helping.
105+
// * Don't do batches smaller than 1 (duh), or larger than nBatchSize.
106+
nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1)));
107+
vChecks.resize(nNow);
108+
for (unsigned int i = 0; i < nNow; i++) {
109+
// We want the lock on the mutex to be as short as possible, so swap jobs from the global
110+
// queue to the local batch vector instead of copying.
111+
vChecks[i].swap(queue.back());
112+
queue.pop_back();
113+
}
114+
// Check whether we need to do work at all
115+
fOk = fAllOk;
116+
}
117+
// execute work
118+
BOOST_FOREACH(T &check, vChecks)
119+
if (fOk)
120+
fOk = check();
121+
vChecks.clear();
122+
} while(true);
123+
}
124+
125+
public:
126+
// Create a new check queue
127+
CCheckQueue(unsigned int nBatchSizeIn) :
128+
nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
129+
130+
// Worker thread
131+
void Thread() {
132+
Loop();
133+
}
134+
135+
// Wait until execution finishes, and return whether all evaluations where succesful.
136+
bool Wait() {
137+
return Loop(true);
138+
}
139+
140+
// Add a batch of checks to the queue
141+
void Add(std::vector<T> &vChecks) {
142+
boost::unique_lock<boost::mutex> lock(mutex);
143+
BOOST_FOREACH(T &check, vChecks) {
144+
queue.push_back(T());
145+
check.swap(queue.back());
146+
}
147+
nTodo += vChecks.size();
148+
if (vChecks.size() == 1)
149+
condWorker.notify_one();
150+
else if (vChecks.size() > 1)
151+
condWorker.notify_all();
152+
}
153+
154+
// Shut the queue down
155+
void Quit() {
156+
boost::unique_lock<boost::mutex> lock(mutex);
157+
fQuit = true;
158+
// No need to wake the master, as he will quit automatically when all jobs are
159+
// done.
160+
condWorker.notify_all();
161+
162+
while (nTotal > 0)
163+
condQuit.wait(lock);
164+
}
165+
166+
friend class CCheckQueueControl<T>;
167+
};
168+
169+
/** RAII-style controller object for a CCheckQueue that guarantees the passed
170+
* queue is finished before continuing.
171+
*/
172+
template<typename T> class CCheckQueueControl {
173+
private:
174+
CCheckQueue<T> *pqueue;
175+
bool fDone;
176+
177+
public:
178+
CCheckQueueControl(CCheckQueue<T> *pqueueIn) : pqueue(pqueueIn), fDone(false) {
179+
// passed queue is supposed to be unused, or NULL
180+
if (pqueue != NULL) {
181+
assert(pqueue->nTotal == pqueue->nIdle);
182+
assert(pqueue->nTodo == 0);
183+
assert(pqueue->fAllOk == true);
184+
}
185+
}
186+
187+
bool Wait() {
188+
if (pqueue == NULL)
189+
return true;
190+
bool fRet = pqueue->Wait();
191+
fDone = true;
192+
return fRet;
193+
}
194+
195+
void Add(std::vector<T> &vChecks) {
196+
if (pqueue != NULL)
197+
pqueue->Add(vChecks);
198+
}
199+
200+
~CCheckQueueControl() {
201+
if (!fDone)
202+
Wait();
203+
}
204+
};
205+
206+
#endif

src/init.cpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ void Shutdown(void* parg)
8484
fShutdown = true;
8585
nTransactionsUpdated++;
8686
bitdb.Flush(false);
87+
{
88+
LOCK(cs_main);
89+
ThreadScriptCheckQuit();
90+
}
8791
StopNode();
8892
{
8993
LOCK(cs_main);
@@ -303,6 +307,7 @@ std::string HelpMessage()
303307
" -checklevel=<n> " + _("How thorough the block verification is (0-4, default: 3)") + "\n" +
304308
" -loadblock=<file> " + _("Imports blocks from external blk000??.dat file") + "\n" +
305309
" -reindex " + _("Rebuild blockchain index from current blk000??.dat files") + "\n" +
310+
" -par=N " + _("Set the number of script verification threads (1-16, 0=auto, default: 0)") + "\n" +
306311

307312
"\n" + _("Block creation options:") + "\n" +
308313
" -blockminsize=<n> " + _("Set minimum block size in bytes (default: 0)") + "\n" +
@@ -484,6 +489,15 @@ bool AppInit2()
484489
fDebug = GetBoolArg("-debug");
485490
fBenchmark = GetBoolArg("-benchmark");
486491

492+
// -par=0 means autodetect, but nScriptCheckThreads==0 means no concurrency
493+
nScriptCheckThreads = GetArg("-par", 0);
494+
if (nScriptCheckThreads == 0)
495+
nScriptCheckThreads = boost::thread::hardware_concurrency();
496+
if (nScriptCheckThreads <= 1)
497+
nScriptCheckThreads = 0;
498+
else if (nScriptCheckThreads > MAX_SCRIPTCHECK_THREADS)
499+
nScriptCheckThreads = MAX_SCRIPTCHECK_THREADS;
500+
487501
// -debug implies fDebug*
488502
if (fDebug)
489503
fDebugNet = true;
@@ -579,6 +593,12 @@ bool AppInit2()
579593
if (fDaemon)
580594
fprintf(stdout, "Bitcoin server starting\n");
581595

596+
if (nScriptCheckThreads) {
597+
printf("Using %u threads for script verification\n", nScriptCheckThreads);
598+
for (int i=0; i<nScriptCheckThreads-1; i++)
599+
NewThread(ThreadScriptCheck, NULL);
600+
}
601+
582602
int64 nStart;
583603

584604
// ********************************************************* Step 5: verify wallet database integrity

0 commit comments

Comments
 (0)