Skip to content

Commit

Permalink
threads: fix some errors
Browse files Browse the repository at this point in the history
  • Loading branch information
xanimo committed Feb 1, 2025
1 parent 0d0a122 commit f0d2d37
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 131 deletions.
24 changes: 11 additions & 13 deletions src/checkqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@
#include <algorithm>
#include <vector>

#include <boost/foreach.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
#include <condition_variable>
#include <mutex>

template <typename T>
class CCheckQueueControl;
Expand All @@ -31,13 +29,13 @@ class CCheckQueue
{
private:
//! Mutex to protect the inner state
boost::mutex mutex;
std::mutex mutex;

//! Worker threads block on this when out of work
boost::condition_variable condWorker;
std::condition_variable condWorker;

//! Master thread blocks on this when out of work
boost::condition_variable condMaster;
std::condition_variable condMaster;

//! The queue of elements to be processed.
//! As the order of booleans doesn't matter, it is used as a LIFO (stack)
Expand Down Expand Up @@ -68,14 +66,14 @@ class CCheckQueue
/** Internal function that does bulk of the verification work. */
bool Loop(bool fMaster = false)
{
boost::condition_variable& cond = fMaster ? condMaster : condWorker;
std::condition_variable& cond = fMaster ? condMaster : condWorker;
std::vector<T> vChecks;
vChecks.reserve(nBatchSize);
unsigned int nNow = 0;
bool fOk = true;
do {
{
boost::unique_lock<boost::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(mutex);
// first do the clean-up of the previous loop run (allowing us to do it in the same critsect)
if (nNow) {
fAllOk &= fOk;
Expand Down Expand Up @@ -119,7 +117,7 @@ class CCheckQueue
fOk = fAllOk;
}
// execute work
BOOST_FOREACH (T& check, vChecks)
for (T& check : vChecks)
if (fOk)
fOk = check();
vChecks.clear();
Expand All @@ -128,7 +126,7 @@ class CCheckQueue

public:
//! Mutex to ensure only one concurrent CCheckQueueControl
boost::mutex ControlMutex;
std::mutex ControlMutex;

//! Create a new check queue
CCheckQueue(unsigned int nBatchSizeIn) : nIdle(0), nTotal(0), fAllOk(true), nTodo(0), fQuit(false), nBatchSize(nBatchSizeIn) {}
Expand All @@ -148,8 +146,8 @@ class CCheckQueue
//! Add a batch of checks to the queue
void Add(std::vector<T>& vChecks)
{
boost::unique_lock<boost::mutex> lock(mutex);
BOOST_FOREACH (T& check, vChecks) {
std::unique_lock<std::mutex> lock(mutex);
for (T& check : vChecks) {
queue.push_back(T());
check.swap(queue.back());
}
Expand Down
2 changes: 1 addition & 1 deletion src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
LogPrintf("mapBlockIndex.size() = %u\n", mapBlockIndex.size());
LogPrintf("nBestHeight = %d\n", chainActive.Height());
if (GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION))
StartTorControl(threadGroup, scheduler);
StartTorControl();

Discover(threadGroup);

Expand Down
52 changes: 19 additions & 33 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

#include "scheduler.h"

#include <sync.h>
#include "reverselock.h"

#include <assert.h>
#include <boost/bind/bind.hpp>
#include <cassert>
#include <functional>
#include <utility>

CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stopWhenEmpty(false)
Expand All @@ -18,19 +19,12 @@ CScheduler::CScheduler() : nThreadsServicingQueue(0), stopRequested(false), stop
CScheduler::~CScheduler()
{
assert(nThreadsServicingQueue == 0);
if (stopWhenEmpty) assert(taskQueue.empty());
}


#if BOOST_VERSION < 105000
static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t)
{
return boost::posix_time::from_time_t(boost::chrono::system_clock::to_time_t(t));
}
#endif

void CScheduler::serviceQueue()
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
std::unique_lock<std::mutex> lock(newTaskMutex);
++nThreadsServicingQueue;

// newTaskMutex is locked throughout this loop EXCEPT
Expand All @@ -46,21 +40,13 @@ void CScheduler::serviceQueue()
// Wait until either there is a new task, or until
// the time of the first item on the queue:

// wait_until needs boost 1.50 or later; older versions have timed_wait:
#if BOOST_VERSION < 105000
while (!shouldStop() && !taskQueue.empty() &&
newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) {
// Keep waiting until timeout
}
#else
// Some boost versions have a conflicting overload of wait_until that returns void.
// Explicitly use a template here to avoid hitting that overload.
while (!shouldStop() && !taskQueue.empty()) {
boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first;
if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout)
std::chrono::steady_clock::time_point timeToWaitFor = taskQueue.begin()->first;
if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) {
break; // Exit loop after timeout, it means we reached the time of the event
}
}
#endif

// If there are multiple threads, the queue can empty while we're waiting (another
// thread may service the task we were waiting on).
if (shouldStop() || taskQueue.empty())
Expand All @@ -72,7 +58,7 @@ void CScheduler::serviceQueue()
{
// Unlock before calling f, so it can reschedule itself or another task
// without deadlocking:
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
reverse_lock<std::unique_lock<std::mutex> > rlock(lock);
f();
}
} catch (...) {
Expand All @@ -87,7 +73,7 @@ void CScheduler::serviceQueue()
void CScheduler::stop(bool drain)
{
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
std::unique_lock<std::mutex> lock(newTaskMutex);
if (drain)
stopWhenEmpty = true;
else
Expand All @@ -96,35 +82,35 @@ void CScheduler::stop(bool drain)
newTaskScheduled.notify_all();
}

void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t)
void CScheduler::schedule(CScheduler::Function f, std::chrono::steady_clock::time_point t)
{
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
std::unique_lock<std::mutex> lock(newTaskMutex);
taskQueue.insert(std::make_pair(t, f));
}
newTaskScheduled.notify_one();
}

void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds)
{
schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds));
schedule(f, std::chrono::steady_clock::now() + std::chrono::seconds(deltaSeconds));
}

static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds)
{
f();
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds);
s->scheduleFromNow(std::bind(&Repeat, s, f, deltaSeconds), deltaSeconds);
}

void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
{
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
scheduleFromNow(std::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
}

size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const
size_t CScheduler::getQueueInfo(std::chrono::steady_clock::time_point &first,
std::chrono::steady_clock::time_point &last) const
{
boost::unique_lock<boost::mutex> lock(newTaskMutex);
std::unique_lock<std::mutex> lock(newTaskMutex);
size_t result = taskQueue.size();
if (!taskQueue.empty()) {
first = taskQueue.begin()->first;
Expand Down
38 changes: 20 additions & 18 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
#ifndef BITCOIN_SCHEDULER_H
#define BITCOIN_SCHEDULER_H

//
// NOTE:
// boost::thread / boost::function / boost::chrono should be ported to
// std::thread / std::function / std::chrono when we support C++11.
//
#include <boost/function.hpp>
#include <boost/chrono/chrono.hpp>
#include <boost/thread.hpp>
#include <sync.h>
#include <threadsafety.h>

#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <list>
#include <map>
#include <thread>
#include <utility>

//
// Simple class for background tasks that should be run
Expand All @@ -23,8 +25,8 @@
//
// CScheduler* s = new CScheduler();
// s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { }
// s->scheduleFromNow(boost::bind(Class::func, this, argument), 3);
// boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, s));
// s->scheduleFromNow(std::bind(Class::func, this, argument), 3);
// std::thread* t = new std::thread(std::bind(CScheduler::serviceQueue, s));
//
// ... then at program shutdown, clean up the thread running serviceQueue:
// t->interrupt();
Expand All @@ -39,10 +41,10 @@ class CScheduler
CScheduler();
~CScheduler();

typedef boost::function<void(void)> Function;
typedef std::function<void(void)> Function;

// Call func at/after time t
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());
void schedule(Function f, std::chrono::steady_clock::time_point t=std::chrono::steady_clock::now());

// Convenience method: call f once deltaSeconds from now
void scheduleFromNow(Function f, int64_t deltaSeconds);
Expand All @@ -57,7 +59,7 @@ class CScheduler
// To keep things as simple as possible, there is no unschedule.

// Services the queue 'forever'. Should be run in a thread,
// and interrupted using boost::interrupt_thread
// and interrupted using std::interrupt_thread
void serviceQueue();

// Tell any threads running serviceQueue to stop as soon as they're
Expand All @@ -67,13 +69,13 @@ class CScheduler

// Returns number of tasks waiting to be serviced,
// and first and last task times
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const;
size_t getQueueInfo(std::chrono::steady_clock::time_point &first,
std::chrono::steady_clock::time_point &last) const;

private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled;
mutable boost::mutex newTaskMutex;
std::multimap<std::chrono::steady_clock::time_point, Function> taskQueue;
std::condition_variable newTaskScheduled;
mutable std::mutex newTaskMutex;
int nThreadsServicingQueue;
bool stopRequested;
bool stopWhenEmpty;
Expand Down
9 changes: 5 additions & 4 deletions src/script/sigcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
#include "util.h"

#include "cuckoocache.h"
#include <boost/thread.hpp>
#include <shared_mutex>
#include <thread>

namespace {

Expand Down Expand Up @@ -49,7 +50,7 @@ class CSignatureCache
uint256 nonce;
typedef CuckooCache::cache<uint256, SignatureCacheHasher> map_type;
map_type setValid;
boost::shared_mutex cs_sigcache;
std::shared_mutex cs_sigcache;

public:
CSignatureCache()
Expand All @@ -66,13 +67,13 @@ class CSignatureCache
bool
Get(const uint256& entry, const bool erase)
{
boost::shared_lock<boost::shared_mutex> lock(cs_sigcache);
std::shared_lock<std::shared_mutex> lock(cs_sigcache);
return setValid.contains(entry, erase);
}

void Set(uint256& entry)
{
boost::unique_lock<boost::shared_mutex> lock(cs_sigcache);
std::unique_lock<std::shared_mutex> lock(cs_sigcache);
setValid.insert(entry);
}
uint32_t setup_bytes(size_t n)
Expand Down
3 changes: 0 additions & 3 deletions src/sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ using RecursiveMutex = AnnotatedMixin<std::recursive_mutex>;
/** Wrapped mutex: supports waiting but not recursive locking */
using Mutex = AnnotatedMixin<std::mutex>;

// typedef RecursiveMutex CDynamicCriticalSection;
// /** Wrapped boost mutex: supports waiting but not recursive locking */
// typedef AnnotatedMixin<boost::mutex> Mutex;
#define AssertLockHeld(cs) AssertLockHeldInternal(#cs, __FILE__, __LINE__, &cs)

inline void AssertLockNotHeldInline(const char* name, const char* file, int line, Mutex* cs) EXCLUSIVE_LOCKS_REQUIRED(!cs) { AssertLockNotHeldInternal(name, file, line, cs); }
Expand Down
4 changes: 2 additions & 2 deletions src/test/cuckoocache_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ void test_cache_erase_parallel(size_t megabytes)

{
/** Grab lock to make sure we release inserts */
boost::unique_lock<boost::shared_mutex> l(mtx);
std::unique_lock<boost::shared_mutex> l(mtx);
/** Insert the first half */
for (uint32_t i = 0; i < (n_insert / 2); ++i)
set.insert(hashes_insert_copy[i]);
Expand All @@ -262,7 +262,7 @@ void test_cache_erase_parallel(size_t megabytes)
for (std::thread& t : threads)
t.join();
/** Grab lock to make sure we observe erases */
boost::unique_lock<boost::shared_mutex> l(mtx);
std::unique_lock<boost::shared_mutex> l(mtx);
/** Insert the second half */
for (uint32_t i = (n_insert / 2); i < n_insert; ++i)
set.insert(hashes_insert_copy[i]);
Expand Down
14 changes: 7 additions & 7 deletions src/test/reverselock_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ BOOST_FIXTURE_TEST_SUITE(reverselock_tests, BasicTestingSetup)

BOOST_AUTO_TEST_CASE(reverselock_basics)
{
boost::mutex mutex;
boost::unique_lock<boost::mutex> lock(mutex);
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);

BOOST_CHECK(lock.owns_lock());
{
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
reverse_lock<std::unique_lock<std::mutex> > rlock(lock);
BOOST_CHECK(!lock.owns_lock());
}
BOOST_CHECK(lock.owns_lock());
}

BOOST_AUTO_TEST_CASE(reverselock_errors)
{
boost::mutex mutex;
boost::unique_lock<boost::mutex> lock(mutex);
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);

// Make sure trying to reverse lock an unlocked lock fails
lock.unlock();
Expand All @@ -34,7 +34,7 @@ BOOST_AUTO_TEST_CASE(reverselock_errors)

bool failed = false;
try {
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
reverse_lock<std::unique_lock<std::mutex> > rlock(lock);
} catch(...) {
failed = true;
}
Expand All @@ -49,7 +49,7 @@ BOOST_AUTO_TEST_CASE(reverselock_errors)
lock.lock();
BOOST_CHECK(lock.owns_lock());
{
reverse_lock<boost::unique_lock<boost::mutex> > rlock(lock);
reverse_lock<std::unique_lock<std::mutex> > rlock(lock);
BOOST_CHECK(!lock.owns_lock());
}

Expand Down
Loading

0 comments on commit f0d2d37

Please sign in to comment.