#include "AsyncWorkQueue.h"
#include "server.h"
AsyncWorkQueue::AsyncWorkQueue(int nthreads)
{
for (int i = 0; i < nthreads; ++i)
{
m_vecthreads.emplace_back([&]{
WorkerThreadMain();
});
}
}
void AsyncWorkQueue::WorkerThreadMain()
{
redisServerThreadVars vars;
serverTL = &vars;
vars.clients_pending_asyncwrite = listCreate();
m_mutex.lock();
m_vecpthreadVars.push_back(&vars);
m_mutex.unlock();
while (!m_fQuitting)
{
std::unique_lock<:mutex> lock(m_mutex);
if (m_workqueue.empty())
m_cvWakeup.wait(lock);
aeThreadOnline();
while (!m_workqueue.empty())
{
WorkItem task = std::move(m_workqueue.front());
m_workqueue.pop_front();
lock.unlock();
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
task.fnAsync();
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
lock.lock();
}
lock.unlock();
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
if (listLength(serverTL->clients_pending_asyncwrite)) {
aeAcquireLock();
ProcessPendingAsyncWrites();
aeReleaseLock();
}
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch.reset();
aeThreadOffline();
}
listRelease(vars.clients_pending_asyncwrite);
std::unique_lock