|
| 1 | +// Copyright (c) 2012 The Bitcoin developers |
| 2 | +// Distributed under the MIT/X11 software license, see the accompanying |
| 3 | +// file COPYING or http://www.opensource.org/licenses/mit-license.php. |
| 4 | +#ifndef CHECKQUEUE_H |
| 5 | +#define CHECKQUEUE_H |
| 6 | + |
| 7 | +#include <boost/thread/mutex.hpp> |
| 8 | +#include <boost/thread/locks.hpp> |
| 9 | +#include <boost/thread/condition_variable.hpp> |
| 10 | + |
| 11 | +#include <vector> |
| 12 | +#include <algorithm> |
| 13 | + |
| 14 | +template<typename T> class CCheckQueueControl; |
| 15 | + |
| 16 | +/** Queue for verifications that have to be performed. |
| 17 | + * The verifications are represented by a type T, which must provide an |
| 18 | + * operator(), returning a bool. |
| 19 | + * |
| 20 | + * One thread (the master) is assumed to push batches of verifications |
| 21 | + * onto the queue, where they are processed by N-1 worker threads. When |
| 22 | + * the master is done adding work, it temporarily joins the worker pool |
| 23 | + * as an N'th worker, until all jobs are done. |
| 24 | + */ |
| 25 | +template<typename T> class CCheckQueue { |
| 26 | +private: |
| 27 | + // Mutex to protect the inner state |
| 28 | + boost::mutex mutex; |
| 29 | + |
| 30 | + // Worker threads block on this when out of work |
| 31 | + boost::condition_variable condWorker; |
| 32 | + |
| 33 | + // Master thread blocks on this when out of work |
| 34 | + boost::condition_variable condMaster; |
| 35 | + |
| 36 | + // Quit method blocks on this until all workers are gone |
| 37 | + boost::condition_variable condQuit; |
| 38 | + |
| 39 | + // The queue of elements to be processed. |
| 40 | + // As the order of booleans doesn't matter, it is used as a LIFO (stack) |
| 41 | + std::vector<T> queue; |
| 42 | + |
| 43 | + // The number of workers (including the master) that are idle. |
| 44 | + int nIdle; |
| 45 | + |
| 46 | + // The total number of workers (including the master). |
| 47 | + int nTotal; |
| 48 | + |
| 49 | + // The temporary evaluation result. |
| 50 | + bool fAllOk; |
| 51 | + |
| 52 | + // Number of verifications that haven't completed yet. |
| 53 | + // This includes elements that are not anymore in queue, but still in |
| 54 | + // worker's own batches. |
| 55 | + unsigned int nTodo; |
| 56 | + |
| 57 | + // Whether we're shutting down. |
| 58 | + bool fQuit; |
| 59 | + |
| 60 | + // The maximum number of elements to be processed in one batch |
| 61 | + unsigned int nBatchSize; |
| 62 | + |
| 63 | + // Internal function that does bulk of the verification work. |
| 64 | + bool Loop(bool fMaster = false) { |
| 65 | + boost::condition_variable &cond = fMaster ? condMaster : condWorker; |
| 66 | + std::vector<T> vChecks; |
| 67 | + vChecks.reserve(nBatchSize); |
| 68 | + unsigned int nNow = 0; |
| 69 | + bool fOk = true; |
| 70 | + do { |
| 71 | + { |
| 72 | + boost::unique_lock<boost::mutex> lock(mutex); |
| 73 | + // first do the clean-up of the previous loop run (allowing us to do it in the same critsect) |
| 74 | + if (nNow) { |
| 75 | + fAllOk &= fOk; |
| 76 | + nTodo -= nNow; |
| 77 | + if (nTodo == 0 && !fMaster) |
| 78 | + // We processed the last element; inform the master he can exit and return the result |
| 79 | + condMaster.notify_one(); |
| 80 | + } else { |
| 81 | + // first iteration |
| 82 | + nTotal++; |
| 83 | + } |
| 84 | + // logically, the do loop starts here |
| 85 | + while (queue.empty()) { |
| 86 | + if ((fMaster || fQuit) && nTodo == 0) { |
| 87 | + nTotal--; |
| 88 | + if (nTotal==0) |
| 89 | + condQuit.notify_one(); |
| 90 | + bool fRet = fAllOk; |
| 91 | + // reset the status for new work later |
| 92 | + if (fMaster) |
| 93 | + fAllOk = true; |
| 94 | + // return the current status |
| 95 | + return fRet; |
| 96 | + } |
| 97 | + nIdle++; |
| 98 | + cond.wait(lock); // wait |
| 99 | + nIdle--; |
| 100 | + } |
| 101 | + // Decide how many work units to process now. |
| 102 | + // * Do not try to do everything at once, but aim for increasingly smaller batches so |
| 103 | + // all workers finish approximately simultaneously. |
| 104 | + // * Try to account for idle jobs which will instantly start helping. |
| 105 | + // * Don't do batches smaller than 1 (duh), or larger than nBatchSize. |
| 106 | + nNow = std::max(1U, std::min(nBatchSize, (unsigned int)queue.size() / (nTotal + nIdle + 1))); |
| 107 | + vChecks.resize(nNow); |
| 108 | + for (unsigned int i = 0; i < nNow; i++) { |
| 109 | + // We want the lock on the mutex to be as short as possible, so swap jobs from the global |
| 110 | + // queue to the local batch vector instead of copying. |
| 111 | + vChecks[i].swap(queue.back()); |
| 112 | + queue.pop_back(); |
| 113 | + } |
| 114 | + // Check whether we need to do work at all |
| 115 | + fOk = fAllOk; |
| 116 | + } |
| 117 | + // execute work |
| 118 | + BOOST_FOREACH(T &check, vChecks) |
| 119 | + if (fOk) |
| 120 | + fOk = check(); |
| 121 | + vChecks.clear(); |
| 122 | + } while(true); |
| 123 | + } |
| 124 | + |
| 125 | +public: |
| 126 | + // Create a new check queue |
| 127 | + CCheckQueue(unsigned int nBatchSizeIn) : |
| 128 | + nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {} |
| 129 | + |
| 130 | + // Worker thread |
| 131 | + void Thread() { |
| 132 | + Loop(); |
| 133 | + } |
| 134 | + |
| 135 | + // Wait until execution finishes, and return whether all evaluations where succesful. |
| 136 | + bool Wait() { |
| 137 | + return Loop(true); |
| 138 | + } |
| 139 | + |
| 140 | + // Add a batch of checks to the queue |
| 141 | + void Add(std::vector<T> &vChecks) { |
| 142 | + boost::unique_lock<boost::mutex> lock(mutex); |
| 143 | + BOOST_FOREACH(T &check, vChecks) { |
| 144 | + queue.push_back(T()); |
| 145 | + check.swap(queue.back()); |
| 146 | + } |
| 147 | + nTodo += vChecks.size(); |
| 148 | + if (vChecks.size() == 1) |
| 149 | + condWorker.notify_one(); |
| 150 | + else if (vChecks.size() > 1) |
| 151 | + condWorker.notify_all(); |
| 152 | + } |
| 153 | + |
| 154 | + // Shut the queue down |
| 155 | + void Quit() { |
| 156 | + boost::unique_lock<boost::mutex> lock(mutex); |
| 157 | + fQuit = true; |
| 158 | + // No need to wake the master, as he will quit automatically when all jobs are |
| 159 | + // done. |
| 160 | + condWorker.notify_all(); |
| 161 | + |
| 162 | + while (nTotal > 0) |
| 163 | + condQuit.wait(lock); |
| 164 | + } |
| 165 | + |
| 166 | + friend class CCheckQueueControl<T>; |
| 167 | +}; |
| 168 | + |
| 169 | +/** RAII-style controller object for a CCheckQueue that guarantees the passed |
| 170 | + * queue is finished before continuing. |
| 171 | + */ |
| 172 | +template<typename T> class CCheckQueueControl { |
| 173 | +private: |
| 174 | + CCheckQueue<T> *pqueue; |
| 175 | + bool fDone; |
| 176 | + |
| 177 | +public: |
| 178 | + CCheckQueueControl(CCheckQueue<T> *pqueueIn) : pqueue(pqueueIn), fDone(false) { |
| 179 | + // passed queue is supposed to be unused, or NULL |
| 180 | + if (pqueue != NULL) { |
| 181 | + assert(pqueue->nTotal == pqueue->nIdle); |
| 182 | + assert(pqueue->nTodo == 0); |
| 183 | + assert(pqueue->fAllOk == true); |
| 184 | + } |
| 185 | + } |
| 186 | + |
| 187 | + bool Wait() { |
| 188 | + if (pqueue == NULL) |
| 189 | + return true; |
| 190 | + bool fRet = pqueue->Wait(); |
| 191 | + fDone = true; |
| 192 | + return fRet; |
| 193 | + } |
| 194 | + |
| 195 | + void Add(std::vector<T> &vChecks) { |
| 196 | + if (pqueue != NULL) |
| 197 | + pqueue->Add(vChecks); |
| 198 | + } |
| 199 | + |
| 200 | + ~CCheckQueueControl() { |
| 201 | + if (!fDone) |
| 202 | + Wait(); |
| 203 | + } |
| 204 | +}; |
| 205 | + |
| 206 | +#endif |
0 commit comments