Skip to content

Commit de0249a

Browse files
committed
initial queue
1 parent ffd5955 commit de0249a

8 files changed

Lines changed: 317 additions & 0 deletions

File tree

.clang-format

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
BasedOnStyle: Google
2+
AccessModifierOffset: '-2'
3+
AlignTrailingComments: 'true'
4+
AllowAllParametersOfDeclarationOnNextLine: 'false'
5+
AlwaysBreakTemplateDeclarations: 'No'
6+
BreakBeforeBraces: Attach
7+
ColumnLimit: '100'
8+
ConstructorInitializerAllOnOneLineOrOnePerLine: 'true'
9+
IncludeBlocks: Regroup
10+
IndentPPDirectives: AfterHash
11+
IndentWidth: '4'
12+
BreakBeforeBinaryOperators: All
13+
BreakBeforeTernaryOperators: 'true'
14+
15+
BinPackArguments: false
16+
BinPackParameters: false

.cmake-format

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
format:
2+
tab_size: 4
3+
line_width: 100
4+
dangle_parens: true
5+
6+
parse:
7+
additional_commands:
8+
cpmaddpackage:
9+
pargs:
10+
nargs: '*'
11+
flags: []
12+
spelling: CPMAddPackage
13+
kwargs: &cpmaddpackagekwargs
14+
NAME: 1
15+
FORCE: 1
16+
VERSION: 1
17+
GIT_TAG: 1
18+
DOWNLOAD_ONLY: 1
19+
GITHUB_REPOSITORY: 1
20+
GITLAB_REPOSITORY: 1
21+
GIT_REPOSITORY: 1
22+
SVN_REPOSITORY: 1
23+
SVN_REVISION: 1
24+
SOURCE_DIR: 1
25+
DOWNLOAD_COMMAND: 1
26+
FIND_PACKAGE_ARGUMENTS: 1
27+
NO_CACHE: 1
28+
GIT_SHALLOW: 1
29+
URL: 1
30+
URL_HASH: 1
31+
URL_MD5: 1
32+
DOWNLOAD_NAME: 1
33+
DOWNLOAD_NO_EXTRACT: 1
34+
HTTP_USERNAME: 1
35+
HTTP_PASSWORD: 1
36+
OPTIONS: +
37+
cpmfindpackage:
38+
pargs:
39+
nargs: '*'
40+
flags: []
41+
spelling: CPMFindPackage
42+
kwargs: *cpmaddpackagekwargs
43+
packageproject:
44+
pargs:
45+
nargs: '*'
46+
flags: []
47+
spelling: packageProject
48+
kwargs:
49+
NAME: 1
50+
VERSION: 1
51+
NAMESPACE: 1
52+
INCLUDE_DIR: 1
53+
INCLUDE_DESTINATION: 1
54+
BINARY_DIR: 1
55+
COMPATIBILITY: 1
56+
VERSION_HEADER: 1
57+
DEPENDENCIES: +

CMakeLists.txt

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
cmake_minimum_required(VERSION 3.14 FATAL_ERROR)
2+
3+
# ---- Project ----
4+
5+
# Note: update this to your new project's name and version
6+
project(
7+
Threadpool
8+
VERSION 0.1
9+
LANGUAGES CXX
10+
)
11+
12+
# ---- Include guards ----
13+
14+
if(PROJECT_SOURCE_DIR STREQUAL PROJECT_BINARY_DIR)
15+
message(
16+
FATAL_ERROR
17+
"In-source builds not allowed. Please make a new directory (called a build directory) and run CMake from there."
18+
)
19+
endif()
20+
21+
# ---- Add dependencies ----
22+
23+
find_package( Threads )
24+
25+
include(cmake/CPM.cmake)
26+
27+
CPMAddPackage("gh:TheLartians/PackageProject.cmake@1.4.1")
28+
CPMAddPackage("gh:Naios/function2#4.1.0")
29+
30+
31+
32+
33+
# ---- Create library ----
34+
35+
set(sources "include/singlequeue.cpp")
36+
37+
# Note: for header-only libraries change all PUBLIC flags to INTERFACE and create an interface
38+
# target: add_library(Threadpool INTERFACE)
39+
add_library(Threadpool ${sources})
40+
41+
target_compile_features(Threadpool PUBLIC cxx_std_20)
42+
43+
target_compile_options(Threadpool PRIVATE -Wall -Wextra -Wpedantic)
44+
45+
# Being a cross-platform target, we enforce standards conformance on MSVC
46+
target_compile_options(Threadpool PUBLIC "$<$<COMPILE_LANG_AND_ID:CXX,MSVC>:/permissive>")
47+
48+
# Link dependencies
49+
target_link_libraries(Threadpool PUBLIC function2::function2 ${CMAKE_THREAD_LIBS_INIT})
50+
51+
target_include_directories(
52+
Threadpool PUBLIC $<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/include>
53+
$<INSTALL_INTERFACE:include/${PROJECT_NAME}-${PROJECT_VERSION}>
54+
)
55+
56+
# ---- Create an installable target ----
57+
# this allows users to install and find the library via `find_package()`.
58+
59+
# the location where the project's version header will be placed should match the project's regular
60+
# header paths
61+
string(TOLOWER ${PROJECT_NAME}/version.h VERSION_HEADER_LOCATION)
62+
63+
packageProject(
64+
NAME ${PROJECT_NAME}
65+
VERSION ${PROJECT_VERSION}
66+
NAMESPACE ${PROJECT_NAME}
67+
BINARY_DIR ${PROJECT_BINARY_DIR}
68+
INCLUDE_DIR ${PROJECT_SOURCE_DIR}/include
69+
INCLUDE_DESTINATION include/${PROJECT_NAME}-${PROJECT_VERSION}
70+
VERSION_HEADER "${VERSION_HEADER_LOCATION}"
71+
COMPATIBILITY SameMajorVersion
72+
DEPENDENCIES ""
73+
)
74+
75+
76+
# ---- Conditionally build examples ----
77+
78+
option(BUILD_EXAMPLES "Build the examples" OFF)
79+
80+
if(BUILD_EXAMPLES)
81+
add_executable(example "include/main.cpp")
82+
target_link_libraries(example PUBLIC Threadpool::Threadpool)
83+
endif()

cmake/CPM.cmake

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
set(CPM_DOWNLOAD_VERSION 0.31.1)
2+
3+
if(CPM_SOURCE_CACHE)
4+
# Expand relative path. This is important if the provided path contains a tilde (~)
5+
get_filename_component(CPM_SOURCE_CACHE ${CPM_SOURCE_CACHE} ABSOLUTE)
6+
set(CPM_DOWNLOAD_LOCATION "${CPM_SOURCE_CACHE}/cpm/CPM_${CPM_DOWNLOAD_VERSION}.cmake")
7+
elseif(DEFINED ENV{CPM_SOURCE_CACHE})
8+
set(CPM_DOWNLOAD_LOCATION "$ENV{CPM_SOURCE_CACHE}/cpm/CPM_${CPM_DOWNLOAD_VERSION}.cmake")
9+
else()
10+
set(CPM_DOWNLOAD_LOCATION "${CMAKE_BINARY_DIR}/cmake/CPM_${CPM_DOWNLOAD_VERSION}.cmake")
11+
endif()
12+
13+
if(NOT (EXISTS ${CPM_DOWNLOAD_LOCATION}))
14+
message(STATUS "Downloading CPM.cmake to ${CPM_DOWNLOAD_LOCATION}")
15+
file(DOWNLOAD
16+
https://github.com/cpm-cmake/CPM.cmake/releases/download/v${CPM_DOWNLOAD_VERSION}/CPM.cmake
17+
${CPM_DOWNLOAD_LOCATION}
18+
)
19+
endif()
20+
21+
include(${CPM_DOWNLOAD_LOCATION})

include/lockfree.hpp

Whitespace-only changes.

include/main.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
2+
3+
#include <chrono>
4+
#include <future>
5+
#include <iostream>
6+
#include <ratio>
7+
#include <thread>
8+
9+
#include "singlequeue.hpp"
10+
11+
int main() {
12+
ThreadPool pool(12);
13+
14+
std::vector<std::future<int>> _futures;
15+
16+
for (size_t i = 0; i < 10; i++) {
17+
pool.execute([time = 100]() {
18+
std::this_thread::sleep_for(std::chrono::milliseconds(time));
19+
std::cout << "Waited :" << time << std::endl;
20+
return 3;
21+
});
22+
}
23+
24+
std::cout << "Working\n";
25+
return 0;
26+
}

include/singlequeue.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include "singlequeue.hpp"
2+
3+
ThreadPool::ThreadPool(size_t threads) {
4+
for (size_t i = 0; i < threads; ++i) {
5+
_threads.emplace_back(std::thread([&]() {
6+
std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);
7+
8+
while (true) {
9+
queue_lock.lock();
10+
11+
_task_cv.wait(queue_lock, [&]() { return !_tasks.empty() || _stop_threads; });
12+
13+
if (_stop_threads && _tasks.empty()) {
14+
return;
15+
}
16+
17+
auto one_shot = std::move(_tasks.front());
18+
19+
_tasks.pop();
20+
21+
queue_lock.unlock();
22+
23+
std::invoke(std::move(one_shot));
24+
}
25+
}));
26+
}
27+
}
28+
29+
ThreadPool::~ThreadPool() {
30+
_stop_threads = true;
31+
_task_cv.notify_all();
32+
33+
for (auto &thread : _threads) {
34+
thread.join();
35+
}
36+
}

include/singlequeue.hpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#pragma once
2+
3+
#include <concepts>
4+
#include <condition_variable>
5+
#include <future>
6+
#include <mutex>
7+
#include <queue>
8+
#include <thread>
9+
#include <type_traits> // invoke_result
10+
#include <utility>
11+
#include <vector>
12+
13+
#include "function2/function2.hpp"
14+
15+
// Bind F and args... into nullary lambda
16+
template <typename F, typename... Args> auto bind(F &&f, Args &&...arg) {
17+
return [f = std::forward<F>(f), ... arg = std::forward<Args>(arg)]() mutable -> decltype(auto) {
18+
return std::invoke(std::forward<F>(f), std::forward<Args>(arg)...);
19+
};
20+
}
21+
22+
// Like std::packaged_task<R() &&>, but garantees no type erasure.
23+
template <std::invocable F> class NullaryOneShot {
24+
public:
25+
using invoke_result_t = std::invoke_result_t<F>;
26+
27+
NullaryOneShot(F &&fn) : _fn(std::forward<F>(fn)) {}
28+
29+
void operator()() && {
30+
if constexpr (!std::is_same_v<void, invoke_result_t>) {
31+
_promise.set_value(std::invoke(std::forward<F>(_fn)));
32+
} else {
33+
std::invoke(std::forward<F>(_fn));
34+
_promise.set_value();
35+
}
36+
}
37+
38+
std::future<invoke_result_t> get_future() { return _promise.get_future(); }
39+
40+
private:
41+
F _fn;
42+
std::promise<invoke_result_t> _promise;
43+
};
44+
45+
template <typename F> NullaryOneShot(F &&) -> NullaryOneShot<F>;
46+
47+
class ThreadPool {
48+
public:
49+
explicit ThreadPool(size_t threads = std::thread::hardware_concurrency());
50+
51+
~ThreadPool();
52+
53+
template <typename F, typename... Args> auto execute(F &&f, Args &&...args) {
54+
//
55+
auto task = NullaryOneShot(bind(std::forward<F>(f), std::forward<Args>(args)...));
56+
57+
auto future = task.get_future();
58+
//
59+
std::unique_lock<std::mutex> queue_lock(_task_mutex, std::defer_lock);
60+
61+
queue_lock.lock();
62+
63+
_tasks.emplace(std::move(task));
64+
65+
queue_lock.unlock();
66+
67+
_task_cv.notify_one();
68+
69+
return future;
70+
}
71+
72+
private:
73+
std::vector<std::thread> _threads;
74+
std::queue<fu2::unique_function<void() &&>> _tasks;
75+
std::mutex _task_mutex;
76+
std::condition_variable _task_cv;
77+
bool _stop_threads = false;
78+
};

0 commit comments

Comments
 (0)