Skip to content

Commit 41bd17f

Browse files
committed
completed the integration of thread-pool
1 parent 7fbfd81 commit 41bd17f

10 files changed

+55
-46
lines changed

include/DataFrame/DataFrame.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -4059,7 +4059,7 @@ class DataFrame : public ThreadGranularity {
40594059
//
40604060
ColNameList column_list_ { }; // Vector of column names and indices
40614061

4062-
inline static SpinLock *lock_ { nullptr }; // No lock safety by default
4062+
inline static SpinLock *lock_ { nullptr }; // No lock safety by default
40634063

40644064
// Private methods
40654065
//

include/DataFrame/DataFrameFinancialVisitors.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ struct DoubleCrossOver {
188188
const H &prices_begin, const H &prices_end) {
189189

190190
const size_type thread_level =
191-
ThreadGranularity::get_sensible_thread_level();
191+
ThreadGranularity::get_thread_level();
192192
size_type re_count1 = 0;
193193
size_type re_count2 = 0;
194194

@@ -319,7 +319,7 @@ struct BollingerBand {
319319
const H &prices_begin, const H &prices_end) {
320320

321321
const size_type thread_level =
322-
ThreadGranularity::get_sensible_thread_level();
322+
ThreadGranularity::get_thread_level();
323323

324324
if (thread_level >= 2) {
325325
std::future<void> fut1 =

include/DataFrame/DataFrameStatsVisitors.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1795,7 +1795,7 @@ struct AutoCorrVisitor {
17951795
vec_type<value_type> tmp_result(col_s - 4);
17961796
size_type lag = 1;
17971797
const size_type thread_level =
1798-
ThreadGranularity::get_sensible_thread_level();
1798+
ThreadGranularity::get_thread_level();
17991799
vec_type<std::future<CorrResult>> futures(thread_level);
18001800
size_type thread_count = 0;
18011801

include/DataFrame/Internals/DataFrame.tcc

+12-9
Original file line numberDiff line numberDiff line change
@@ -903,15 +903,18 @@ sort(const char *name1, sort_spec dir1,
903903
std::ranges::views::zip(*vec1, *vec2, indices_, sorting_idxs);
904904

905905
if (dir1 == sort_spec::ascen && dir2 == sort_spec::ascen) {
906-
// if (! ignore_index)
907-
// std::sort(std::execution::par_unseq,
908-
// zip_idx.begin(), zip_idx.end(), a_a);
909-
// else
910-
// std::sort(std::execution::par_unseq, zip.begin(), zip.end(), a_a);
911-
if (! ignore_index)
912-
std::ranges::sort(zip_idx, a_a);
913-
else
914-
std::ranges::sort(zip, a_a);
906+
if (get_thread_level() > 0) {
907+
if (! ignore_index)
908+
thr_pool_.parallel_sort(zip_idx.begin(), zip_idx.end(), a_a);
909+
else
910+
thr_pool_.parallel_sort(zip.begin(), zip.end(), a_a);
911+
}
912+
else {
913+
if (! ignore_index)
914+
std::ranges::sort(zip_idx, a_a);
915+
else
916+
std::ranges::sort(zip, a_a);
917+
}
915918
}
916919
else if (dir1 == sort_spec::desce && dir2 == sort_spec::desce) {
917920
if (! ignore_index)

include/DataFrame/Utils/Threads/SharedQueue.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class SharedQueue {
9292
// ----------------------------------------------------------------------------
9393

9494
#ifndef HMDF_DO_NOT_INCLUDE_TCC_FILES
95-
# include <DataFrame/Threads/SharedQueue.tcc>
95+
# include <DataFrame/Utils/Threads/SharedQueue.tcc>
9696
#endif // HMDF_DO_NOT_INCLUDE_TCC_FILES
9797

9898
// ----------------------------------------------------------------------------

include/DataFrame/Utils/Threads/SharedQueue.tcc

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
2727
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828
*/
2929

30-
#include <DataFrame/Threads/SharedQueue.h>
30+
#include <DataFrame/Utils/Threads/SharedQueue.h>
3131

3232
// ----------------------------------------------------------------------------
3333

include/DataFrame/Utils/Threads/ThreadGranularity.h

+16-16
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2929

3030
#pragma once
3131

32+
#include <DataFrame/Utils/Threads/ThreadPool.h>
33+
3234
#include <atomic>
3335
#include <cassert>
3436
#include <thread>
@@ -40,30 +42,28 @@ namespace hmdf
4042

4143
struct ThreadGranularity {
4244

43-
static inline void
44-
set_thread_level(unsigned int n) { num_of_threads_ = n; }
45-
static inline unsigned int
46-
get_thread_level() { return (num_of_threads_); }
47-
static inline unsigned int
48-
get_supported_thread() { return (supported_threads_); }
45+
using size_type = ThreadPool::size_type;
46+
47+
static inline void set_thread_level(size_type n) {
48+
49+
thr_pool_.add_thread(n - thr_pool_.capacity_threads());
50+
}
51+
static inline void set_optimum_thread_level() {
4952

50-
static inline unsigned int
51-
get_sensible_thread_level() {
53+
set_thread_level(std::thread::hardware_concurrency());
54+
}
55+
static inline size_type get_thread_level() {
5256

53-
return (supported_threads_ != 0
54-
? std::min(supported_threads_, num_of_threads_)
55-
: num_of_threads_);
57+
return (thr_pool_.capacity_threads());
5658
}
5759

5860
protected:
5961

6062
ThreadGranularity() = default;
6163

62-
private:
63-
64-
inline static unsigned int num_of_threads_ { 0 };
65-
inline static const unsigned int supported_threads_ {
66-
std::thread::hardware_concurrency() };
64+
// By defaut, there are no threads
65+
//
66+
inline static ThreadPool thr_pool_ { 0 };
6767
};
6868

6969
// ----------------------------------------------------------------------------

include/DataFrame/Utils/Threads/ThreadPool.h

+8-6
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2929

3030
#pragma once
3131

32-
#include <DataFrame/Threads/SharedQueue.h>
32+
#include <DataFrame/Utils/Threads/SharedQueue.h>
3333

3434
#include <atomic>
3535
#include <concepts>
@@ -80,6 +80,8 @@ class ThreadPool {
8080
using time_type = time_t;
8181
using thread_type = std::thread;
8282

83+
inline static constexpr size_type MUL_THR_THHOLD = 250'000L;
84+
8385
ThreadPool(const ThreadPool &) = delete;
8486
ThreadPool &operator = (const ThreadPool &) = delete;
8587

@@ -122,11 +124,11 @@ class ThreadPool {
122124
loop_res_t<F, I, As ...>
123125
parallel_loop(I begin, I end, F &&routine, As && ... args);
124126

125-
template<std::random_access_iterator I, std::size_t TH = 500'000>
126-
void parallel_sort(I begin, I end);
127+
template<std::random_access_iterator I, long TH = MUL_THR_THHOLD>
128+
void parallel_sort(const I begin, const I end);
127129
template<std::random_access_iterator I, typename P,
128-
std::size_t TH = 500'000>
129-
void parallel_sort(I begin, I end, P compare);
130+
long TH = MUL_THR_THHOLD>
131+
void parallel_sort(const I begin, const I end, P compare);
130132

131133

132134
// It attaches the current thread to the pool so that it may be used for
@@ -212,7 +214,7 @@ class ThreadPool {
212214
// ----------------------------------------------------------------------------
213215

214216
#ifndef HMDF_DO_NOT_INCLUDE_TCC_FILES
215-
# include <DataFrame/Threads/ThreadPool.tcc>
217+
# include <DataFrame/Utils/Threads/ThreadPool.tcc>
216218
#endif // HMDF_DO_NOT_INCLUDE_TCC_FILES
217219

218220
// ----------------------------------------------------------------------------

include/DataFrame/Utils/Threads/ThreadPool.tcc

+9-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
2727
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2828
*/
2929

30-
#include <DataFrame/Threads/ThreadPool.h>
30+
#include <DataFrame/Utils/Threads/ThreadPool.h>
3131

3232
#include <chrono>
3333
#include <cstdlib>
@@ -185,9 +185,9 @@ ThreadPool::parallel_loop(I begin, I end, F &&routine, As && ... args) {
185185

186186
// ----------------------------------------------------------------------------
187187

188-
template<std::random_access_iterator I, std::size_t TH>
188+
template<std::random_access_iterator I, long TH>
189189
void
190-
ThreadPool::parallel_sort(I begin, I end) {
190+
ThreadPool::parallel_sort(const I begin, const I end) {
191191

192192
using value_type = typename std::iterator_traits<I>::value_type;
193193

@@ -198,20 +198,20 @@ ThreadPool::parallel_sort(I begin, I end) {
198198

199199
// ----------------------------------------------------------------------------
200200

201-
template<std::random_access_iterator I, typename P, std::size_t TH>
201+
template<std::random_access_iterator I, typename P, long TH>
202202
void
203-
ThreadPool::parallel_sort(I begin, I end, P compare) {
203+
ThreadPool::parallel_sort(const I begin, const I end, P compare) {
204204

205205
using value_type = typename std::iterator_traits<I>::value_type;
206206
using fut_type = std::future<void>;
207207

208208
if (begin >= end) return;
209209

210-
const std::size_t data_size = std::distance(begin, end);
210+
const size_type data_size = std::distance(begin, end);
211211

212212
if (data_size > 0) {
213213
auto left_iter = begin;
214-
auto right_iter = end;
214+
auto right_iter = end - 1;
215215
bool is_swapped_left = false;
216216
bool is_swapped_right = false;
217217
const value_type pivot = *begin;
@@ -246,7 +246,7 @@ ThreadPool::parallel_sort(I begin, I end, P compare) {
246246
&ThreadPool::parallel_sort<I, P, TH>,
247247
this,
248248
begin,
249-
left_iter - 1,
249+
left_iter,
250250
compare);
251251
if (do_right)
252252
right_fut = dispatch(false,
@@ -267,7 +267,7 @@ ThreadPool::parallel_sort(I begin, I end, P compare) {
267267
}
268268
else {
269269
if (do_left)
270-
parallel_sort<I, P, TH>(begin, left_iter - 1, compare);
270+
parallel_sort<I, P, TH>(begin, left_iter, compare);
271271

272272
if (do_right)
273273
parallel_sort<I, P, TH>(right_iter + 1, end, compare);

src/CommonMakefile.mk

+4
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ HEADERS = $(LOCAL_INCLUDE_DIR)/DataFrame/Vectors/HeteroVector.h \
6262
$(LOCAL_INCLUDE_DIR)/DataFrame/Vectors/VectorView.h \
6363
$(LOCAL_INCLUDE_DIR)/DataFrame/Vectors/VectorPtrView.h \
6464
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/Threads/ThreadGranularity.h \
65+
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/Threads/SharedQueue.h \
66+
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/Threads/SharedQueue.tcc \
67+
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/Threads/ThreadPool.h \
68+
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/Threads/ThreadPool.tcc \
6569
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/DateTime.h \
6670
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/Utils.h \
6771
$(LOCAL_INCLUDE_DIR)/DataFrame/Utils/MetaProg.h \

0 commit comments

Comments
 (0)