forked from CodingHanYa/workspace
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbalanced_pond.h
133 lines (117 loc) · 3.51 KB
/
balanced_pond.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#pragma once
#include "header.h"
namespace hipe {
class OqThread : public ThreadBase
{
HipeTask task;
std::queue<HipeTask> tq;
util::spinlock tq_locker = {};
public:
/**
* @brief try give one task to another thread
* @param other another thread
* @return if succeed —— return true, or return false
*/
bool tryGiveTask(OqThread& another) {
if (tq_locker.try_lock()) {
if (!tq.empty()) {
another.task = std::move(tq.front());
tq.pop();
tq_locker.unlock();
this->task_numb--;
another.task_numb++;
return true;
} else {
tq_locker.unlock();
return false;
}
}
return false;
}
// push task to the task queue
template <typename T>
void enqueue(T&& tar) {
util::spinlock_guard lock(tq_locker);
tq.emplace(std::forward<T>(tar));
task_numb++;
}
// push tasks to the task queue
template <typename Container_>
void enqueue(Container_& cont, size_t size) {
util::spinlock_guard lock(tq_locker);
for (size_t i = 0; i < size; ++i) {
tq.emplace(std::move(cont[i]));
task_numb++;
}
}
// run the task
void runTask() {
util::invoke(task);
task_numb--;
}
// try load task from the task queue
bool tryLoadTask() {
tq_locker.lock();
if (!tq.empty()) {
task = std::move(tq.front());
tq.pop();
tq_locker.unlock();
return true;
} else {
tq_locker.unlock();
return false;
}
}
};
class BalancedThreadPond : public FixedThreadPond<OqThread>
{
public:
/**
* @param thread_numb fixed thread number
* @param task_capacity task capacity of the pond, default: unlimited
*/
explicit BalancedThreadPond(int thread_numb = 0, int task_capacity = HipeUnlimited)
: FixedThreadPond(thread_numb, task_capacity) {
// create
threads.reset(new OqThread[this->thread_numb]);
for (int i = 0; i < this->thread_numb; ++i) {
threads[i].bindHandle(AutoThread(&BalancedThreadPond::worker, this, i));
}
}
~BalancedThreadPond() override = default;
private:
void worker(int index) {
auto& self = threads[index];
while (!stop) {
// yield if no tasks
if (self.notask()) {
if (self.isWaiting()) {
self.notifyTaskDone();
std::this_thread::yield();
continue;
}
// steal tasks from other threads
if (enable_steal_tasks) {
for (int i = index, j = 0; j < max_steal; j++) {
util::recyclePlus(i, 0, thread_numb);
if (threads[i].tryGiveTask(self)) {
self.runTask();
break;
}
}
if (!self.notask() || self.isWaiting()) {
// go to handle the tasks or the waiting signal directly
continue;
}
}
std::this_thread::yield();
} else {
// try load task and run
if (self.tryLoadTask()) {
self.runTask();
}
}
}
}
};
} // namespace hipe