-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrent-executor.hpp
112 lines (92 loc) · 2.63 KB
/
concurrent-executor.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Author: Ryan Vickramasinghe
#ifndef CONCURRENT_EXECUTOR_H
#define CONCURRENT_EXECUTOR_H
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
/* This class is designed to accept arbitrary objects, and queue them for
concurrent execution by an arbitrary executor. */
template <typename T>
class ConcurrentExecutor{
public:
explicit ConcurrentExecutor(
std::function<void(T)> executor_func, int num_threads = 1);
~ConcurrentExecutor();
// Submits an item to the executor queue for processing.
void Submit(T& item);
void Submit(T&& item);
// Returns the current size of the buffer (i.e. how many items are
// pending).
size_t BufferSize();
private:
// Initializes `Executor` threads, based on the provided
// `thread_count`.
void LaunchExecutors(int thread_count);
// An executor loop.
void Executor(int id);
// The function used by the consumer thread.
std::function<void(T)> executor_func_;
// The thread pool
std::vector<std::thread> executor_threads_;
std::mutex mu_;
std::queue<T> queue_;
std::condition_variable cv_;
bool done_;
};
template <typename T>
ConcurrentExecutor<T>::ConcurrentExecutor(
std::function<void(T)> executor_func, int num_threads):
executor_func_(std::move(executor_func)) {
LaunchExecutors(num_threads);
}
template <typename T>
ConcurrentExecutor<T>::~ConcurrentExecutor() {
done_ = true;
cv_.notify_all();
for (auto& thread : executor_threads_) {
thread.join();
}
}
template <typename T>
void ConcurrentExecutor<T>::Submit(T& item) {
std::lock_guard<std::mutex> lock(mu_);
queue_.push(item);
cv_.notify_one();
}
template <typename T>
void ConcurrentExecutor<T>::Submit(T&& item) {
std::lock_guard<std::mutex> lock(mu_);
queue_.emplace(item);
cv_.notify_one();
}
template <typename T>
size_t ConcurrentExecutor<T>::BufferSize() {
std::lock_guard<std::mutex> lock(mu_);
return queue_.size();
}
template <typename T>
void ConcurrentExecutor<T>::LaunchExecutors(int thread_count) {
for (int i = 0; i < thread_count; i++) {
executor_threads_.emplace_back(&ConcurrentExecutor::Executor, this, i);
}
}
template <typename T>
void ConcurrentExecutor<T>::Executor(int id) {
std::cout << "Started executor " << id << std::endl;
while (!done_) {
// Get the next item from the queue.
std::unique_lock<std::mutex> lock(mu_);
cv_.wait(lock, [this]{ return !queue_.empty() || done_; });
if (done_ && queue_.empty()) break;
T data = queue_.front();
queue_.pop();
lock.unlock();
// Do the work outside of the lock.
executor_func_(data);
}
std::cout << "Exited executor " << id << std::endl;
}
#endif // CONCURRENT_EXECUTOR_H