Skip to content

Commit

Permalink
example
Browse files Browse the repository at this point in the history
  • Loading branch information
kelbon committed Apr 7, 2024
1 parent ce2d62b commit 7b92d4d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
13 changes: 12 additions & 1 deletion include/kelcoro/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ struct worker {
std::thread::id get_id() const noexcept {
return thread.get_id();
}
// precondition: caller should not break thread, e.g. do not use .join, .detach, .swap
// but may use it to set thread name or pinning thread to core
std::thread& get_thread() noexcept {
return thread;
}
const std::thread& get_thread() const noexcept {
return thread;
}
};

// executes tasks on one thread
Expand Down Expand Up @@ -210,7 +218,10 @@ struct thread_pool {
schedule(std::forward<decltype(foo)>(foo), calculate_operation_hash(foo));
}

KELCORO_PURE std::span<const worker> workers_range() noexcept KELCORO_LIFETIMEBOUND {
KELCORO_PURE std::span<worker> workers_range() noexcept KELCORO_LIFETIMEBOUND {
return std::span(workers, workers_size);
}
KELCORO_PURE std::span<const worker> workers_range() const noexcept KELCORO_LIFETIMEBOUND {
return std::span(workers, workers_size);
}

Expand Down
53 changes: 52 additions & 1 deletion tests/test_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@

#include <latch>
#include <iostream>
#include <string>

static_assert(dd::executor<dd::any_executor_ref> && dd::executor<dd::strand> &&
dd::executor<dd::thread_pool> && dd::executor<dd::worker>);

#define error_if(Cond) error_count += static_cast<bool>((Cond));
#define TEST(NAME) size_t test_##NAME(size_t error_count = 0)

bool pin_thread_to_cpu_core(std::thread&, int core_nb) noexcept;
bool set_thread_name(std::thread&, const char* name) noexcept;

TEST(latch) {
auto run_task = [](dd::thread_pool& p, std::atomic_int& i, dd::latch& start,
dd::latch& done) -> dd::task<void> {
Expand Down Expand Up @@ -63,7 +67,12 @@ TEST(thread_pool) {
std::latch l(COUNT);
dd::thread_pool p(16);
dd::latch start(COUNT, p);

std::span workers = p.workers_range();
for (int i = 0; i < workers.size(); ++i) {
(void)pin_thread_to_cpu_core(workers[i].get_thread(), i);
std::string name = "number " + std::to_string(i);
(void)set_thread_name(workers[i].get_thread(), name.c_str());
}
for (int ind = 0; ind < COUNT; ++ind) {
foo(p, start, i, l);
p.schedule([&] { ++i; });
Expand Down Expand Up @@ -92,3 +101,45 @@ int main() {
ec += test_thread_pool();
return ec;
}
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif

bool pin_thread_to_cpu_core(std::thread& t, int core_nb) noexcept {
if (core_nb < 0 || core_nb >= CHAR_BIT * sizeof(void*))
return false;
#ifdef _WIN32
HANDLE handle = t.native_handle();
DWORD_PTR mask = 1ull << core_nb;
return SetThreadAffinityMask(handle, mask);
#elif defined(__unix__)
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_nb, &cpuset);
pthread_t handle = t.native_handle();
return pthread_setaffinity_np(handle, sizeof(cpu_set_t), &cpuset) == 0;
#else
// nothing, not pinned
return false;
#endif
}

bool set_thread_name(std::thread& t, const char* name) noexcept {
if (!name)
return false;
#if defined(_WIN32) && defined(_WIN32_WINNT) && _WIN32_WINNT >= 0x0A00
HANDLE handle = t.native_handle();
int size_needed = MultiByteToWideChar(CP_UTF8, 0, name, strlen(name), NULL, 0);
std::wstring nm(size_needed, 0);
MultiByteToWideChar(CP_UTF8, 0, name, strlen(name), &nm[0], size_needed);
HRESULT r = SetThreadDescription(handle, nm.c_str());
return !(FAILED(r));
#elif defined(__unix__)
return pthread_setname_np(pthread_self(), name) == 0;
#else
// nothing, name not setted
return false;
#endif
}

0 comments on commit 7b92d4d

Please sign in to comment.