From 4b63bbd5019cf7cde63dad233f8c37faa16cf086 Mon Sep 17 00:00:00 2001 From: Hossein Moein Date: Wed, 13 Dec 2023 13:23:37 -0500 Subject: [PATCH 1/7] Added parallel computing logic to the following visitors: Sum, Mean, Prod, Extermum --- docs/HTML/DataFrame.html | 6 +- include/DataFrame/DataFrameStatsVisitors.h | 168 ++++++++++++++++++- include/DataFrame/Utils/Threads/ThreadPool.h | 2 +- test/dataframe_tester.cc | 4 +- 4 files changed, 166 insertions(+), 14 deletions(-) diff --git a/docs/HTML/DataFrame.html b/docs/HTML/DataFrame.html index 14a1670f..9019310e 100644 --- a/docs/HTML/DataFrame.html +++ b/docs/HTML/DataFrame.html @@ -1441,16 +1441,16 @@

API Reference with code samples

Multithreading

In general, multithreading could be very tricky. A lot of times you think by using multithreading you enhance the performance of your program. But in fact, you are hindering it. It requires measuring and careful adjustments. It is recommended to start with a single-threaded version and when that is working correctly, take measurements and adjust to move to multithreading version.
-DataFrame uses multithreading extensively and provides granular tools to adjust your program. Let’s divide the multithreading subject in DataFrame into two categories:
+DataFrame uses multithreading extensively and provides granular tools to adjust your environment. Let’s divide the multithreading subject in DataFrame into two categories:
-

1. User Utilizing Multithreading

+

1. User Multithreading

So, In general if you as the user of DataFrame utilize multithreading, you must protect the DataFrame with a synchronization tool (i.e. SpinLock)

2. DataFrame Internal Multithreading

- Whether or not you, as the user, use multithreading, DataFrame utilizes a versatile thread-pool to employ parallel computing extensively in almost all its functionalities, when appropriate. DataFrame also gives you the interface to control and tweak that. You do not need to worry about synchronization for DataFrame internal multithreading.
+ Whether or not you, as the user, use multithreading, DataFrame utilizes a versatile thread-pool to employ parallel computing extensively in almost all its functionalities, when appropriate -- currently, most parallel algorithms trigger when number of items exceeds 150k and number of threads exceeds 2. DataFrame also gives you the interface to control and tweak that. You do not need to worry about synchronization for DataFrame internal multithreading.
So, In general if you as the user of DataFrame utilize multithreading, you must protect the DataFrame with a synchronization tool (i.e. SpinLock)

2. DataFrame Internal Multithreading

- Whether or not you, as the user, use multithreading, DataFrame utilizes a versatile thread-pool to employ parallel computing extensively in almost all its functionalities, when appropriate -- currently, most parallel algorithms trigger when number of items exceeds 150k and number of threads exceeds 2. DataFrame also gives you the interface to control and tweak that. You do not need to worry about synchronization for DataFrame internal multithreading.
+ Whether or not you, as the user, use multithreading, DataFrame utilizes a versatile thread-pool to employ parallel computing extensively in almost all its functionalities, when appropriate -- currently, most parallel algorithms trigger when number of items exceeds 250k and number of threads exceeds 2. DataFrame also gives you the interface to control and tweak that. You do not need to worry about synchronization for DataFrame internal multithreading.
  • There are asynchronous versions of some methods. For example, you have sort()/sort_async(), visit()/visit_async(), ... The latter versions return a std::future and would execute in parallel.
    If you chose to use DataFrame async interfaces, it is highly recommended to call ThreadGranularity::set_optimum_thread_level(), So your thread-pool is populated with optimal number of threads. Otherwise, if thread-pool is empty, async interfaces will add one thread to it. Having only one thread in thread-pool could be suboptimal and hinder performance.
  • As mentioned above, DataFrame uses parallel computing extensively. But by default, DataFrame is single threaded, because by default its thread-pool is empty. If you want to fully take advantage of DataFrame parallel computing, it is recommended to call ThreadGranularity::set_optimum_thread_level() at the beginning of your program. Alternatively you could call ThreadGranularity:: set_thread_level(n) to add a custom number of threads to the thread-pool. But you better have a good reason for that.
    Thread-pool and thread level are static properties of DataFrame. Once the thread level is set, it applies to all DataFrame instances.
  • diff --git a/include/DataFrame/DataFrameStatsVisitors.h b/include/DataFrame/DataFrameStatsVisitors.h index de89a776..fe28a4c9 100644 --- a/include/DataFrame/DataFrameStatsVisitors.h +++ b/include/DataFrame/DataFrameStatsVisitors.h @@ -3123,8 +3123,8 @@ struct KthValueVisitor { const bool skip_nan_; template - inline size_type - parttition_ (V &vec, size_type begin, size_type end) const { + inline static size_type + parttition_(V &vec, size_type begin, size_type end) { const value_type x = vec[end]; size_type i = begin; @@ -3141,11 +3141,8 @@ struct KthValueVisitor { } template - inline value_type - find_kth_element_ (V &vec, - size_type begin, - size_type end, - size_type k) const { + inline static value_type + find_kth_element_(V &vec, size_type begin, size_type end, size_type k) { // If k is smaller than number of elements in array // @@ -3431,10 +3428,20 @@ struct ModeVisitor { val_vec.push_back(map_pair.second); }); - std::sort(val_vec.begin(), val_vec.end(), - [](const DataItem &lhs, const DataItem &rhs) -> bool { - return (lhs.repeat_count() > rhs.repeat_count()); // dec - }); + if (ThreadGranularity::get_thread_level() > 2 && + val_vec.size() >= ThreadPool::MUL_THR_THHOLD) { + ThreadGranularity::thr_pool_.parallel_sort( + val_vec.begin(), val_vec.end(), + [](const DataItem &lhs, const DataItem &rhs) -> bool { + return (lhs.repeat_count() > rhs.repeat_count()); + }); // Descending + } + else { + std::sort(val_vec.begin(), val_vec.end(), + [](const DataItem &lhs, const DataItem &rhs) -> bool { + return (lhs.repeat_count() > rhs.repeat_count()); + }); // Descending + } for (size_type i = 0; i < N && i < val_vec.size(); ++i) result_[i] = val_vec[i]; } @@ -3454,17 +3461,33 @@ struct ModeVisitor { inline void sort_by_repeat_count() { - std::sort(result_.begin(), result_.end(), - [](const DataItem &lhs, const DataItem &rhs) -> bool { - return (lhs.repeat_count() < rhs.repeat_count()); - }); + if (ThreadGranularity::get_thread_level() > 2 && + result_.size() >= ThreadPool::MUL_THR_THHOLD) + ThreadGranularity::thr_pool_.parallel_sort( + result_.begin(), result_.end(), + [](const DataItem &lhs, const DataItem &rhs) -> bool { + return (lhs.repeat_count() < rhs.repeat_count()); + }); + else + std::sort(result_.begin(), result_.end(), + [](const DataItem &lhs, const DataItem &rhs) -> bool { + return (lhs.repeat_count() < rhs.repeat_count()); + }); } inline void sort_by_value() { - std::sort(result_.begin(), result_.end(), - [](const DataItem &lhs, const DataItem &rhs) -> bool { - return (*(lhs.value) < *(rhs.value)); - }); + if (ThreadGranularity::get_thread_level() > 2 && + result_.size() >= ThreadPool::MUL_THR_THHOLD) + ThreadGranularity::thr_pool_.parallel_sort( + result_.begin(), result_.end(), + [](const DataItem &lhs, const DataItem &rhs) -> bool { + return (*(lhs.value) < *(rhs.value)); + }); + else + std::sort(result_.begin(), result_.end(), + [](const DataItem &lhs, const DataItem &rhs) -> bool { + return (*(lhs.value) < *(rhs.value)); + }); } private: @@ -3493,19 +3516,17 @@ struct MADVisitor { template inline void - calc_mean_abs_dev_around_mean_(const K &, - const K &, + calc_mean_abs_dev_around_mean_(const K &idx_begin, + const K &idx_end, const H &column_begin, const H &column_end) { GET_COL_SIZE2 MeanVisitor mean_visitor(skip_nan_); - const index_type idx_value { }; mean_visitor.pre(); - for (std::size_t i = 0; i < col_s; ++i) [[likely]] - mean_visitor(idx_value, *(column_begin + i)); + mean_visitor(idx_begin, idx_end, column_begin, column_end); mean_visitor.post(); MeanVisitor mean_mean_visitor(skip_nan_); @@ -3515,8 +3536,9 @@ struct MADVisitor { const value_type value = *(column_begin + i); if (! is_nan__(value) || ! skip_nan_) [[likely]] - mean_mean_visitor(idx_value, - std::fabs(value - mean_visitor.get_result())); + mean_mean_visitor( + *idx_begin, + std::fabs(value - mean_visitor.get_result())); } mean_mean_visitor.post(); @@ -3539,15 +3561,15 @@ struct MADVisitor { GET_COL_SIZE2 MeanVisitor mean_median_visitor(skip_nan_); - const index_type idx_value { }; mean_median_visitor.pre(); for (std::size_t i = 0; i < col_s; ++i) [[likely]] { const value_type value = *(column_begin + i); if (skip_nan_ && is_nan__(value)) [[unlikely]] continue; - mean_median_visitor(idx_value, - std::fabs(value - median_visitor.get_result())); + mean_median_visitor( + *idx_begin, + std::fabs(value - median_visitor.get_result())); } mean_median_visitor.post(); @@ -3561,18 +3583,18 @@ struct MADVisitor { const H &column_begin, const H &column_end) { + using vec_t = std::vector::type>; + GET_COL_SIZE2 MeanVisitor mean_visitor(skip_nan_); - const index_type idx_value { }; mean_visitor.pre(); - for (std::size_t i = 0; i < col_s; ++i) [[likely]] - mean_visitor(idx_value, *(column_begin + i)); + mean_visitor(idx_begin, idx_end, column_begin, column_end); mean_visitor.post(); - MedianVisitor median_mean_visitor; - std::vector::type> mean_dists; + MedianVisitor median_mean_visitor; + vec_t mean_dists; mean_dists.reserve(col_s); for (std::size_t i = 0; i < col_s; ++i) [[likely]] @@ -3593,6 +3615,8 @@ struct MADVisitor { const H &column_begin, const H &column_end) { + using vec_t = std::vector::type>; + MedianVisitor median_visitor; median_visitor.pre(); @@ -3601,8 +3625,8 @@ struct MADVisitor { GET_COL_SIZE2 - MedianVisitor median_median_visitor; - std::vector::type> median_dists; + MedianVisitor median_median_visitor; + vec_t median_dists; median_dists.reserve(col_s); for (std::size_t i = 0; i < col_s; ++i) [[likely]] @@ -3789,27 +3813,42 @@ struct ZScoreVisitor { template inline void - operator() (const K &, const K &, + operator() (const K &idx_begin, const K &idx_end, const H &column_begin, const H &column_end) { GET_COL_SIZE2 - MeanVisitor mvisit; + MeanVisitor mvisit { skip_nan_ }; StdVisitor svisit; - - // None of these visitors look at the index value - // - const index_type idx_value { }; + const auto thread_level { + ThreadGranularity::get_thread_level() }; mvisit.pre(); svisit.pre(); - for (size_type i = 0; i < col_s; ++i) [[likely]] { - const value_type value = *(column_begin + i); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto fut1 = + ThreadGranularity::thr_pool_.dispatch( + false, + [&svisit, + &idx_begin, &idx_end, + &column_begin, &column_end]() -> void { + svisit(idx_begin, idx_end, column_begin, column_end); + }); + auto fut2 = + ThreadGranularity::thr_pool_.dispatch( + false, + [&mvisit, + &idx_begin, &idx_end, + &column_begin, &column_end]() -> void { + mvisit(idx_begin, idx_end, column_begin, column_end); + }); - if (! skip_nan_ || ! is_nan__(value)) [[likely]] { - mvisit(idx_value, value); - svisit(idx_value, value); - } + fut1.get(); + fut2.get(); + } + else { + mvisit(idx_begin, idx_end, column_begin, column_end); + svisit(idx_begin, idx_end, column_begin, column_end); } mvisit.post(); svisit.post(); @@ -3818,12 +3857,29 @@ struct ZScoreVisitor { const value_type s = svisit.get_result(); result_type result; - result.reserve(col_s); - std::transform(column_begin, column_end, - std::back_inserter(result), - [m, s](const auto &val) -> value_type { - return ((val - m) / s); - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [m, s, &result, &column_begin] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + result[i] = (*(column_begin + i) - m) / s; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result), + [m, s](const auto &val) -> value_type { + return ((val - m) / s); + }); + } result_.swap(result); } @@ -3860,46 +3916,61 @@ struct SampleZScoreVisitor { template inline void - operator() (const K &, const K &, + operator() (const K &idx_begin, const K &idx_end, const H &population_begin, const H &population_end, const H &sample_begin, const H &sample_end) { - MeanVisitor p_mvisit; + MeanVisitor p_mvisit { skip_nan_ }; StdVisitor p_svisit; - MeanVisitor s_mvisit; - const size_type p_col_s = - std::distance(population_begin, population_end); - const size_type s_col_s = std::distance(sample_begin, sample_end); - const size_type max_s = std::max(p_col_s, s_col_s); - - // None of these visitors look at the index value - // - const index_type idx_value { }; + MeanVisitor s_mvisit { skip_nan_ }; p_mvisit.pre(); p_svisit.pre(); s_mvisit.pre(); - for (size_type i = 0; i < max_s; ++i) [[likely]] { - if (i < p_col_s) { - const value_type value = *(population_begin + i); - - if (! skip_nan_ || ! is_nan__(value)) [[likely]] { - p_mvisit(idx_value, value); - p_svisit(idx_value, value); - } - } - if (i < s_col_s) { - const value_type value = *(sample_begin + i); + if (ThreadGranularity::get_thread_level() > 3) { + auto fut1 = + ThreadGranularity::thr_pool_.dispatch( + false, + [&p_svisit, + &idx_begin, &idx_end, + &population_begin, &population_end]() -> void { + p_svisit(idx_begin, idx_end, + population_begin, population_end); + }); + auto fut2 = + ThreadGranularity::thr_pool_.dispatch( + false, + [&p_mvisit, + &idx_begin, &idx_end, + &population_begin, &population_end]() -> void { + p_mvisit(idx_begin, idx_end, + population_begin, population_end); + }); + auto fut3 = + ThreadGranularity::thr_pool_.dispatch( + false, + [&s_mvisit, + &idx_begin, &idx_end, + &sample_begin, &sample_end]() -> void { + s_mvisit(idx_begin, idx_end, + sample_begin, sample_end); + }); - if (! skip_nan_ || ! is_nan__(value)) { - s_mvisit(idx_value, value); - } - } + fut1.get(); + fut2.get(); + fut3.get(); + } + else { + p_mvisit(idx_begin, idx_end, population_begin, population_end); + p_svisit(idx_begin, idx_end, population_begin, population_end); + s_mvisit(idx_begin, idx_end, sample_begin, sample_end); } p_mvisit.post(); p_svisit.post(); s_mvisit.post(); + const size_type s_col_s = std::distance(sample_begin, sample_end); + result_ = (s_mvisit.get_result() - p_mvisit.get_result()) / (p_svisit.get_result() / ::sqrt(s_col_s)); } @@ -3929,74 +4000,209 @@ struct BoxCoxVisitor { private: template - inline void modulus_(const H &column_begin, const H &column_end) { + inline void modulus_(const H &column_begin, const H &column_end, + size_type col_s, size_type thread_level) { if (lambda_ != 0) { - std::transform( - column_begin, column_end, - std::back_inserter(result_), - [this](const auto &val) -> value_type { - const value_type sign = std::signbit(val) ? -1 : 1; - const value_type v = - (std::pow(std::fabs(val) + (1), this->lambda_) - - T(1)) / lambda_; - - return (sign * v); - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + const auto val = *(column_begin + i); + const value_type sign = + std::signbit(val) ? -1 : 1; + const value_type v = + (std::pow(std::fabs(val) + (1), + this->lambda_) - + T(1)) / this->lambda_; + + this->result_[i] = sign * v; + } + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform( + column_begin, column_end, + std::back_inserter(result_), + [this](const auto &val) -> value_type { + const value_type sign = std::signbit(val) ? -1 : 1; + const value_type v = + (std::pow(std::fabs(val) + (1), this->lambda_) - + T(1)) / this->lambda_; + + return (sign * v); + }); + } } else { - std::transform( - column_begin, column_end, - std::back_inserter(result_), - [](const auto &val) -> value_type { - const value_type sign = std::signbit(val) ? -1 : 1; + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + const auto val = *(column_begin + i); + const value_type sign = + std::signbit(val) ? -1 : 1; + + result_[i] = + sign * std::log(std::fabs(val) + T(1)); + } + }); - return (sign * std::log(std::fabs(val) + T(1))); - }); + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform( + column_begin, column_end, + std::back_inserter(result_), + [](const auto &val) -> value_type { + const value_type sign = std::signbit(val) ? -1 : 1; + + return (sign * std::log(std::fabs(val) + T(1))); + }); + } } } template - inline void exponential_(const H &column_begin, const H &column_end) { + inline void exponential_(const H &column_begin, const H &column_end, + size_type col_s, size_type thread_level) { if (lambda_ != 0) { - std::transform( - column_begin, column_end, - std::back_inserter(result_), - [this](const auto &val) -> value_type { - return ((std::exp(this->lambda_ * val) - T(1)) / - this->lambda_); - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + const auto val = *(column_begin + i); + + this->result_[i] = + (std::exp(this->lambda_ * val) - T(1)) / + this->lambda_; + } + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform( + column_begin, column_end, + std::back_inserter(result_), + [this](const auto &val) -> value_type { + return ((std::exp(this->lambda_ * val) - T(1)) / + this->lambda_); + }); + } } else { - std::transform(column_begin, column_end, - std::back_inserter(result_), - [](const auto &val) -> value_type { - return (val); - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + this->result_[i] = *(column_begin + i); + } + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [](const auto &val) -> value_type { + return (val); + }); + } } } template inline void original_(const H &column_begin, const H &column_end, - value_type shift) { + value_type shift, + size_type col_s, + size_type thread_level) { if (lambda_ != 0) { - std::transform( - column_begin, column_end, - std::back_inserter(result_), - [this, shift](const auto &val) -> value_type { - return ((std::pow(val + shift, this->lambda_) - - T(1)) / this->lambda_); - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, shift, &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + const auto val = *(column_begin + i); + + this->result_[i] = + (std::pow(val + shift, this->lambda_) - + T(1)) / this->lambda_; + } + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform( + column_begin, column_end, + std::back_inserter(result_), + [this, shift](const auto &val) -> value_type { + return ((std::pow(val + shift, this->lambda_) - + T(1)) / this->lambda_); + }); + } } else { - std::transform(column_begin, column_end, - std::back_inserter(result_), - [shift](const auto &val) -> value_type { - return (std::log(val + shift)); - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, shift, &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + const auto val = *(column_begin + i); + + this->result_[i] = std::log(val + shift); + } + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [shift](const auto &val) -> value_type { + return (std::log(val + shift)); + }); + } } } @@ -4004,7 +4210,8 @@ struct BoxCoxVisitor { inline void geometric_mean_(const K &dummy, const H &column_begin, const H &column_end, - value_type shift) { + value_type shift, + size_type col_s, size_type thread_level) { H citer = column_begin; GeometricMeanVisitor gm; @@ -4015,29 +4222,74 @@ struct BoxCoxVisitor { gm(dummy, *citer++ + shift); gm.post(); - std::transform( - column_begin, column_end, - std::back_inserter(result_), - [this, shift, gm = gm.get_result()] - (const auto &val) -> value_type { - const value_type raw_v = val + shift; + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, shift, gm = gm.get_result(), &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + const auto val = *(column_begin + i); + const value_type raw_v = val + shift; + + this->result_[i] = + (std::pow(raw_v, this->lambda_) - T(1)) / + (this->lambda_ * + std::pow(gm, this->lambda_ - T(1))); + + } + }); - return ((std::pow(raw_v, this->lambda_) - T(1)) / - (this->lambda_ * - std::pow(gm, this->lambda_ - T(1)))); - }); + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform( + column_begin, column_end, + std::back_inserter(result_), + [this, shift, gm = gm.get_result()] + (const auto &val) -> value_type { + const value_type raw_v = val + shift; + + return ((std::pow(raw_v, this->lambda_) - T(1)) / + (this->lambda_ * + std::pow(gm, this->lambda_ - T(1)))); + }); + } } else { while (citer < column_end) [[likely]] gm(dummy, std::log(*citer++ + shift)); gm.post(); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [shift, gm = gm.get_result()] - (const auto &val) -> value_type { - return ((val + shift) * gm); - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [this, shift, gm = gm.get_result(), &column_begin] + (auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) { + const auto val = *(column_begin + i); + + this->result_[i] = (val + shift) * gm; + } + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [shift, gm = gm.get_result()] + (const auto &val) -> value_type { + return ((val + shift) * gm); + }); + } } } @@ -4062,15 +4314,18 @@ struct BoxCoxVisitor { shift = std::fabs(mv.get_result()) + value_type(0.0000001); } - result_.reserve(std::distance(column_begin, column_end)); + const size_type col_s = std::distance(column_begin, column_end); + const size_type thread_level = ThreadGranularity::get_thread_level(); + if (box_cox_type_ == box_cox_type::original) - original_(column_begin, column_end, shift); + original_(column_begin, column_end, shift, col_s, thread_level); else if (box_cox_type_ == box_cox_type::geometric_mean) - geometric_mean_(*idx_begin, column_begin, column_end, shift); + geometric_mean_(*idx_begin, column_begin, column_end, + shift, col_s, thread_level); else if (box_cox_type_ == box_cox_type::modulus) - modulus_(column_begin, column_end); + modulus_(column_begin, column_end, col_s, thread_level); else if (box_cox_type_ == box_cox_type::exponential) - exponential_(column_begin, column_end); + exponential_(column_begin, column_end, col_s, thread_level); } DEFINE_PRE_POST @@ -4109,54 +4364,200 @@ struct ProbabilityDistVisitor { result_type result; value_type sum { 0 }; - result.reserve(col_s); - if (pdtype_ == prob_dist_type::arithmetic) { - std::for_each(column_begin, column_end, - [&sum](const auto &v) -> void { sum += v; }); - std::for_each(column_begin, column_end, - [&sum, &result](const auto &v) -> void { - result.push_back(v / sum); - }); - } - else if (pdtype_ == prob_dist_type::log) { - std::for_each(column_begin, column_end, - [&sum](const auto &v) -> void { - sum += std::log(v); - }); - std::for_each(column_begin, column_end, - [&sum, &result](const auto &v) -> void { - result.push_back(std::log(v) / sum); - }); - } - else if (pdtype_ == prob_dist_type::softmax) { - std::for_each(column_begin, column_end, - [&sum](const auto &v) -> void { - sum += std::exp(v); - }); - std::for_each(column_begin, column_end, - [&sum, &result](const auto &v) -> void { - result.push_back(std::exp(v) / sum); - }); - } - else if (pdtype_ == prob_dist_type::pow2) { - std::for_each(column_begin, column_end, - [&sum](const auto &v) -> void { - sum += std::pow(T(2), v); - }); - std::for_each(column_begin, column_end, - [&sum, &result](const auto &v) -> void { - result.push_back(std::pow(T(2), v) / sum); - }); + if (ThreadGranularity::get_thread_level() > 2 && + std::distance(column_begin, column_end) >= + ThreadPool::MUL_THR_THHOLD) { + result.resize(col_s); + if (pdtype_ == prob_dist_type::arithmetic) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + column_begin, + column_end, + [] + (const auto &begin, const auto &end) -> value_type { + value_type sum { 0 }; + + for (auto citer = begin; citer < end; ++citer) + sum += *citer; + return (sum); + }); + + for (auto &fut : futures) sum += fut.get(); + futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&column_begin, &result, sum] + (auto begin, auto end) -> value_type { + for (auto i = begin; i < end; ++i) + result[i] = *(column_begin + i) / sum; + return (0); + }); + for (auto &fut : futures) fut.get(); + } + else if (pdtype_ == prob_dist_type::log) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + column_begin, + column_end, + [] + (const auto &begin, const auto &end) -> value_type { + value_type sum { 0 }; + + for (auto citer = begin; citer < end; ++citer) + sum += std::log(*citer); + return (sum); + }); + + for (auto &fut : futures) sum += fut.get(); + futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&column_begin, &result, sum] + (auto begin, auto end) -> value_type { + for (auto i = begin; i < end; ++i) + result[i] = + std::log(*(column_begin + i)) / sum; + return (0); + }); + for (auto &fut : futures) fut.get(); + } + else if (pdtype_ == prob_dist_type::softmax) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + column_begin, + column_end, + [] + (const auto &begin, const auto &end) -> value_type { + value_type sum { 0 }; + + for (auto citer = begin; citer < end; ++citer) + sum += std::exp(*citer); + return (sum); + }); + + for (auto &fut : futures) sum += fut.get(); + futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&column_begin, &result, sum] + (auto begin, auto end) -> value_type { + for (auto i = begin; i < end; ++i) + result[i] = + std::exp(*(column_begin + i)) / sum; + return (0); + }); + for (auto &fut : futures) fut.get(); + } + else if (pdtype_ == prob_dist_type::pow2) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + column_begin, + column_end, + [] + (const auto &begin, const auto &end) -> value_type { + value_type sum { 0 }; + + for (auto citer = begin; citer < end; ++citer) + sum += std::pow(T(2), *citer); + return (sum); + }); + + for (auto &fut : futures) sum += fut.get(); + futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&column_begin, &result, sum] + (auto begin, auto end) -> value_type { + for (auto i = begin; i < end; ++i) + result[i] = + std::pow(T(2), *(column_begin + i)) / sum; + return (0); + }); + for (auto &fut : futures) fut.get(); + } + else if (pdtype_ == prob_dist_type::pow10) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + column_begin, + column_end, + [] + (const auto &begin, const auto &end) -> value_type { + value_type sum { 0 }; + + for (auto citer = begin; citer < end; ++citer) + sum += std::pow(T(10), *citer); + return (sum); + }); + + for (auto &fut : futures) sum += fut.get(); + futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&column_begin, &result, sum] + (auto begin, auto end) -> value_type { + for (auto i = begin; i < end; ++i) + result[i] = + std::pow(T(10), *(column_begin + i)) / sum; + return (0); + }); + for (auto &fut : futures) fut.get(); + } } - else if (pdtype_ == prob_dist_type::pow10) { - std::for_each(column_begin, column_end, - [&sum](const auto &v) -> void { - sum += std::pow(T(10), v); - }); - std::for_each(column_begin, column_end, - [&sum, &result](const auto &v) -> void { - result.push_back(std::pow(T(10), v) / sum); - }); + else { + result.reserve(col_s); + if (pdtype_ == prob_dist_type::arithmetic) { + std::for_each(column_begin, column_end, + [&sum](const auto &v) -> void { sum += v; }); + std::for_each(column_begin, column_end, + [&sum, &result](const auto &v) -> void { + result.push_back(v / sum); + }); + } + else if (pdtype_ == prob_dist_type::log) { + std::for_each(column_begin, column_end, + [&sum](const auto &v) -> void { + sum += std::log(v); + }); + std::for_each(column_begin, column_end, + [&sum, &result](const auto &v) -> void { + result.push_back(std::log(v) / sum); + }); + } + else if (pdtype_ == prob_dist_type::softmax) { + std::for_each(column_begin, column_end, + [&sum](const auto &v) -> void { + sum += std::exp(v); + }); + std::for_each(column_begin, column_end, + [&sum, &result](const auto &v) -> void { + result.push_back(std::exp(v) / sum); + }); + } + else if (pdtype_ == prob_dist_type::pow2) { + std::for_each(column_begin, column_end, + [&sum](const auto &v) -> void { + sum += std::pow(T(2), v); + }); + std::for_each(column_begin, column_end, + [&sum, &result](const auto &v) -> void { + result.push_back(std::pow(T(2), v) / sum); + }); + } + else if (pdtype_ == prob_dist_type::pow10) { + std::for_each(column_begin, column_end, + [&sum](const auto &v) -> void { + sum += std::pow(T(10), v); + }); + std::for_each(column_begin, column_end, + [&sum, &result](const auto &v) -> void { + result.push_back(std::pow(T(10), v) / sum); + }); + } } result_.swap(result); @@ -4232,14 +4633,34 @@ struct NormalizeVisitor { maxv.post(); const value_type diff = maxv.get_result() - minv.get_result(); + const size_type col_s = std::distance(column_begin, column_end); - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [minv = minv.get_result(), diff] - (const auto &val) -> value_type { - return ((val - minv) / diff); - }); + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [minv = minv.get_result(), &column_begin, diff, this] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = + (*(column_begin + i) - minv) / diff; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [minv = minv.get_result(), diff] + (const auto &val) -> value_type { + return ((val - minv) / diff); + }); + } } template inline void @@ -4252,13 +4673,33 @@ struct NormalizeVisitor { sumv(idx_begin, idx_end, column_begin, column_end); sumv.post(); - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [sumv = sumv.get_result()] - (const auto &val) -> value_type { - return (val / sumv); - }); + const size_type col_s = std::distance(column_begin, column_end); + + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [sumv = sumv.get_result(), &column_begin, this] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = *(column_begin + i) / sumv; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [sumv = sumv.get_result()] + (const auto &val) -> value_type { + return (val / sumv); + }); + } } template inline void @@ -4271,13 +4712,33 @@ struct NormalizeVisitor { eucliv(idx_begin, idx_end, column_begin, column_end); eucliv.post(); - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [eucli = eucliv.get_euclidean_norm()] - (const auto &val) -> value_type { - return (val / eucli); - }); + const size_type col_s = std::distance(column_begin, column_end); + + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [eucli = eucliv.get_euclidean_norm(), &column_begin, this] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = *(column_begin + i) / eucli; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [eucli = eucliv.get_euclidean_norm()] + (const auto &val) -> value_type { + return (val / eucli); + }); + } } template inline void @@ -4290,13 +4751,33 @@ struct NormalizeVisitor { maxv(idx_begin, idx_end, column_begin, column_end); maxv.post(); - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [maxv = maxv.get_result()] - (const auto &val) -> value_type { - return (val / maxv); - }); + const size_type col_s = std::distance(column_begin, column_end); + + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [maxv = maxv.get_result(), &column_begin, this] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = *(column_begin + i) / maxv; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [maxv = maxv.get_result()] + (const auto &val) -> value_type { + return (val / maxv); + }); + } } template inline void @@ -4313,13 +4794,36 @@ struct NormalizeVisitor { meanv.post(); stdv.post(); - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [meanv = meanv.get_result(), stdv = stdv.get_result()] - (const auto &val) -> value_type { - return ((val - meanv) / stdv); - }); + const size_type col_s = std::distance(column_begin, column_end); + + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [meanv = meanv.get_result(), stdv = stdv.get_result(), + &column_begin, this] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = + (*(column_begin + i) - meanv) / stdv; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [meanv = meanv.get_result(), + stdv = stdv.get_result()] + (const auto &val) -> value_type { + return ((val - meanv) / stdv); + }); + } } template inline void @@ -4336,35 +4840,92 @@ struct NormalizeVisitor { // const value_type scale = std::pow(10, std::log10(maxv.get_result()) + 1); + const size_type col_s = std::distance(column_begin, column_end); - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [scale](const auto &val) -> value_type { - return (val / scale); - }); + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [scale, &column_begin, this] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = *(column_begin + i) / scale; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [scale](const auto &val) -> value_type { + return (val / scale); + }); + } } template inline void log_transform_(const H &column_begin, const H &column_end) { - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [](const auto &val) -> value_type { - return (std::log(val)); - }); + const size_type col_s = std::distance(column_begin, column_end); + + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&column_begin, this](auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = std::log(*(column_begin + i)); + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(std::distance(column_begin, column_end)); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [](const auto &val) -> value_type { + return (std::log(val)); + }); + } } template inline void root_transform_(const H &column_begin, const H &column_end) { - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [](const auto &val) -> value_type { - return (std::sqrt(val)); - }); + const size_type col_s = std::distance(column_begin, column_end); + + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&column_begin, this](auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = std::sqrt(*(column_begin + i)); + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [](const auto &val) -> value_type { + return (std::sqrt(val)); + }); + } } result_type result_ { }; // Normalized @@ -4396,13 +4957,34 @@ struct StandardizeVisitor { mv.post(); sv.post(); - result_.reserve(std::distance(column_begin, column_end)); - std::transform(column_begin, column_end, - std::back_inserter(result_), - [mv = mv.get_result(), sv = sv.get_result()] - (const auto &val) -> value_type { - return ((val - mv) / sv); - }); + const size_type col_s = std::distance(column_begin, column_end); + + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [mv = mv.get_result(), sv = sv.get_result(), + &column_begin, this] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + this->result_[i] = (*(column_begin + i) - mv) / sv; + }); + + for (auto &fut : futures) fut.get(); + } + else { + result_.reserve(col_s); + std::transform(column_begin, column_end, + std::back_inserter(result_), + [mv = mv.get_result(), sv = sv.get_result()] + (const auto &val) -> value_type { + return ((val - mv) / sv); + }); + } } DEFINE_PRE_POST diff --git a/include/DataFrame/Utils/Threads/SharedQueue.h b/include/DataFrame/Utils/Threads/SharedQueue.h index 814e8c85..17f51dab 100644 --- a/include/DataFrame/Utils/Threads/SharedQueue.h +++ b/include/DataFrame/Utils/Threads/SharedQueue.h @@ -40,10 +40,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. namespace hmdf { -class SQEmpty { public: inline SQEmpty () noexcept { } }; - -// ---------------------------------------------------------------------------- - template class SharedQueue { @@ -61,11 +57,6 @@ class SharedQueue { inline void push(const value_type &element) noexcept; - inline const value_type & - front(bool wait_on_front = true) const; // throw (SQEmpty); - inline value_type & - front(bool wait_on_front = true); // throw (SQEmpty); - // NOTE: The following method returns the data by value. // Therefore it is not as efficient as front(). // Use it only if you have to. @@ -73,7 +64,6 @@ class SharedQueue { inline optional_ret pop_front(bool wait_on_front = true) noexcept; - void pop() noexcept; bool empty() const noexcept; size_type size() const noexcept; diff --git a/include/DataFrame/Utils/Threads/SharedQueue.tcc b/include/DataFrame/Utils/Threads/SharedQueue.tcc index 9813af8c..556e8209 100644 --- a/include/DataFrame/Utils/Threads/SharedQueue.tcc +++ b/include/DataFrame/Utils/Threads/SharedQueue.tcc @@ -51,24 +51,6 @@ SharedQueue::push(const value_type &element) noexcept { // ---------------------------------------------------------------------------- -template -inline const typename SharedQueue::value_type & -SharedQueue::front(bool wait_on_front) const { // throw (SQEmpty) - - std::unique_lock ul { mutex_ }; - - if (queue_.empty()) { - if (wait_on_front) - while (queue_.empty()) cvx_.wait(ul); - else - throw SQEmpty { }; - } - - return (queue_.front()); -} - -// ---------------------------------------------------------------------------- - template inline typename SharedQueue::optional_ret SharedQueue::pop_front(bool wait_on_front) noexcept { @@ -88,34 +70,6 @@ SharedQueue::pop_front(bool wait_on_front) noexcept { // ---------------------------------------------------------------------------- -template -inline typename SharedQueue::value_type & -SharedQueue::front(bool wait_on_front) { // throw (SQEmpty) - - std::unique_lock ul { mutex_ }; - - if (queue_.empty()) { - if (wait_on_front) - while (queue_.empty()) cvx_.wait(ul); - else - throw SQEmpty { }; - } - - return (queue_.front()); -} - -// ---------------------------------------------------------------------------- - -template -void SharedQueue::pop() noexcept { - - const AutoLockable lock { mutex_ }; - - queue_.pop(); -} - -// ---------------------------------------------------------------------------- - template bool SharedQueue::empty() const noexcept { diff --git a/include/DataFrame/Utils/Threads/ThreadPool.h b/include/DataFrame/Utils/Threads/ThreadPool.h index ab8f367a..aacb828f 100644 --- a/include/DataFrame/Utils/Threads/ThreadPool.h +++ b/include/DataFrame/Utils/Threads/ThreadPool.h @@ -50,53 +50,22 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. namespace hmdf { -struct Conditioner { - - template - requires std::invocable - explicit Conditioner(F &&routine, As && ... args); - - Conditioner() = default; - Conditioner(const Conditioner &) = default; - Conditioner(Conditioner &&) = default; - ~Conditioner() = default; - - void execute(); - -private: - - using routine_type = std::function; - - routine_type func_ { [] () -> void { } }; -}; - -// ---------------------------------------------------------------------------- - class ThreadPool { public: using size_type = long; - using time_type = time_t; using thread_type = std::thread; - inline static constexpr size_type MUL_THR_THHOLD = 150'000L; + inline static constexpr size_type MUL_THR_THHOLD = 250'000L; ThreadPool(const ThreadPool &) = delete; ThreadPool &operator = (const ThreadPool &) = delete; - // Conditioner(s) are a handy interface, if threads need to be initialized - // before doing anything. And/or they need a clean up before exiting. - // For example, see Windows CoInitializeEx function in COM library - // explicit - ThreadPool(size_type thr_num = std::thread::hardware_concurrency(), - Conditioner pre_conditioner = Conditioner { }, - Conditioner post_conditioner = Conditioner { }); + ThreadPool(size_type thr_num = std::thread::hardware_concurrency()); ~ThreadPool(); - void set_timeout(bool timeout_flag, time_type timeout_time = 30 * 60); - template requires std::invocable using dispatch_res_t = @@ -145,14 +114,6 @@ class ThreadPool { long TH = MUL_THR_THHOLD> void parallel_sort(const I begin, const I end, P compare); - - // It attaches the current thread to the pool so that it may be used for - // executing submitted tasks. It blocks the calling thread until the pool - // is shutdown or the thread is timed-out. - // This is handy, if you already have thread(s), and want to repurpose them - // - void attach(thread_type &&this_thr); - // If the pool is not shutdown and there is a pending task, run the one // task on the calling thread. // Return true, if a task was executed, otherwise false. @@ -176,7 +137,6 @@ class ThreadPool { _undefined_ = 0, _client_service_ = 1, _terminate_ = 2, - _timeout_ = 3, }; struct WorkUnit { @@ -197,7 +157,6 @@ class ThreadPool { }; bool thread_routine_(size_type local_q_idx) noexcept; // Engine routine - void queue_timed_outs_() noexcept; WorkUnit get_one_local_task_() noexcept; using guard_type = std::lock_guard; @@ -216,12 +175,7 @@ class ThreadPool { std::atomic available_threads_ { 0 }; std::atomic capacity_threads_ { 0 }; std::atomic_bool shutdown_flag_ { false }; - time_type timeout_time_ { 30 * 60 }; mutable std::mutex state_ { }; - bool timeout_flag_ { false }; - - Conditioner pre_conditioner_ { }; - Conditioner post_conditioner_ { }; }; } // namespace hmdf diff --git a/include/DataFrame/Utils/Threads/ThreadPool.tcc b/include/DataFrame/Utils/Threads/ThreadPool.tcc index 46120fa9..0532540f 100644 --- a/include/DataFrame/Utils/Threads/ThreadPool.tcc +++ b/include/DataFrame/Utils/Threads/ThreadPool.tcc @@ -31,7 +31,6 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include -#include #include #include #include @@ -42,21 +41,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. namespace hmdf { -template -requires std::invocable -Conditioner::Conditioner(F &&routine, As && ... args) - : func_([&] () -> void { routine(std::forward(args) ...); }) { } - -// ---------------------------------------------------------------------------- - -void Conditioner::execute() { func_(); } - -// ---------------------------------------------------------------------------- - -ThreadPool::ThreadPool(size_type thr_num, - Conditioner pre_conditioner, - Conditioner post_conditioner) - : pre_conditioner_(pre_conditioner), post_conditioner_(post_conditioner) { +ThreadPool::ThreadPool(size_type thr_num) { threads_.reserve(thr_num * 2); for (size_type i = 0; i < thr_num; ++i) { @@ -82,31 +67,6 @@ ThreadPool::~ThreadPool() { // ---------------------------------------------------------------------------- -void -ThreadPool::set_timeout(bool timeout_flag, time_type timeout_time) { - - timeout_flag_ = timeout_flag; - timeout_time_ = timeout_time; -} - -// ---------------------------------------------------------------------------- - -void -ThreadPool::queue_timed_outs_() noexcept { - - const size_type timeys { capacity_threads() }; - - for (size_type i = 0; i < timeys; ++i) { - const WorkUnit work_unit { WORK_TYPE::_timeout_ }; - - global_queue_.push(work_unit); - } - - return; -} - -// ---------------------------------------------------------------------------- - bool ThreadPool::add_thread(size_type thr_num) { @@ -183,8 +143,6 @@ ThreadPool::dispatch(bool immediately, F &&routine, As && ... args) { else global_queue_.push(work_unit); - if (timeout_flag_) - queue_timed_outs_(); return (return_fut); } @@ -388,27 +346,6 @@ ThreadPool::parallel_sort(const I begin, const I end, P compare) { // ---------------------------------------------------------------------------- -void -ThreadPool::attach(thread_type &&this_thr) { - - if (is_shutdown()) - throw std::runtime_error("ThreadPool::attach(): " - "Thread pool is shutdown."); - - size_type local_size { 0 }; - - { - const guard_type guard { state_ }; - - local_size = size_type(threads_.size()); - local_queues_.push_back(LocalQueueType { }); - threads_.push_back(std::move(this_thr)); - } - thread_routine_(local_size); -} - -// ---------------------------------------------------------------------------- - ThreadPool::size_type ThreadPool::available_threads() const noexcept { @@ -514,10 +451,7 @@ ThreadPool::thread_routine_(size_type local_q_idx) noexcept { if (is_shutdown()) return (false); - pre_conditioner_.execute(); - - time_type last_busy_time { timeout_flag_ ? ::time(nullptr) : 0 }; - auto iter = local_queues_.begin(); + auto iter = local_queues_.begin(); std::advance(iter, local_q_idx); local_queue_ = &(*iter); @@ -537,23 +471,13 @@ ThreadPool::thread_routine_(size_type local_q_idx) noexcept { --available_threads_; - if (work_unit.work_type == WORK_TYPE::_terminate_) { - break; - } - else if (work_unit.work_type == WORK_TYPE::_timeout_) { - if (timeout_flag_ && - ((::time(nullptr) - last_busy_time) >= timeout_time_)) - break; - } - else if (work_unit.work_type == WORK_TYPE::_client_service_) { - if (timeout_flag_) - last_busy_time = ::time(nullptr); + if (work_unit.work_type == WORK_TYPE::_client_service_) (work_unit.func)(); // Execute the callable - } + else if (work_unit.work_type == WORK_TYPE::_terminate_) + break; } --capacity_threads_; local_queue_ = nullptr; - post_conditioner_.execute(); return (true); } From 678120d4ebbbe1e977e3fa7c102483b62ba82b8e Mon Sep 17 00:00:00 2001 From: Hossein Moein Date: Tue, 19 Dec 2023 11:35:16 -0500 Subject: [PATCH 7/7] Added parallel computing logic to the following visitors: PolyFit, LogFit, ExponencialFit, LinearFit, CubicSplinFit, Lowess, Decompose, Bias, NonZeroRange --- include/DataFrame/DataFrameStatsVisitors.h | 753 +++++++++++++++++---- 1 file changed, 615 insertions(+), 138 deletions(-) diff --git a/include/DataFrame/DataFrameStatsVisitors.h b/include/DataFrame/DataFrameStatsVisitors.h index fe28a4c9..3d3adf2f 100644 --- a/include/DataFrame/DataFrameStatsVisitors.h +++ b/include/DataFrame/DataFrameStatsVisitors.h @@ -54,6 +54,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include #include @@ -5025,6 +5026,7 @@ struct PolyFitVisitor { const Hy &y_begin, const Hy &y_end) { const size_type col_s = std::distance(x_begin, x_end); + const size_type thread_level = ThreadGranularity::get_thread_level(); assert((col_s == size_type(std::distance(y_begin, y_end)))); @@ -5042,10 +5044,32 @@ struct PolyFitVisitor { // consecutive positions of the array will store // col_s, sigma(xi), sigma(xi^2), sigma(xi^3) ... sigma(xi^2n) // - for (size_type j = 0; j < col_s; ++j) [[likely]] { - const value_type w = weights_(*(idx_begin + j), j); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&x_begin, &idx_begin, i, this] + (auto begin, auto end) -> value_type { + value_type sum { 0 }; - sigma_x[i] += std::pow(*(x_begin + j), i) * w; + for (auto j = begin; j < end; ++j) { + const value_type w = + this->weights_(*(idx_begin + j), j); + + sum += std::pow(*(x_begin + j), i) * w; + } + return (sum); + }); + + for (auto &fut : futures) sigma_x[i] += fut.get(); + } + else { + for (size_type j = 0; j < col_s; ++j) [[likely]] { + const value_type w = weights_(*(idx_begin + j), j); + + sigma_x[i] += std::pow(*(x_begin + j), i) * w; + } } } @@ -5072,10 +5096,34 @@ struct PolyFitVisitor { // consecutive positions will store // sigma(yi), sigma(xi * yi), sigma(xi^2 * yi) ... sigma(xi^n * yi) // - for (size_type j = 0; j < col_s; ++j) { - const value_type w = weights_(*(idx_begin + j), j); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&x_begin, &y_begin, &idx_begin, i, this] + (auto begin, auto end) -> value_type { + value_type sum { 0 }; + + for (auto j = begin; j < end; ++j) { + const value_type w = + this->weights_(*(idx_begin + j), j); + + sum += std::pow(*(x_begin + j), i) * + *(y_begin + j) * w; + } + return (sum); + }); - sigma_y[i] += std::pow(*(x_begin + j), i) * *(y_begin + j) * w; + for (auto &fut : futures) sigma_y[i] += fut.get(); + } + else { + for (size_type j = 0; j < col_s; ++j) { + const value_type w = weights_(*(idx_begin + j), j); + + sigma_y[i] += + std::pow(*(x_begin + j), i) * *(y_begin + j) * w; + } } } @@ -5151,12 +5199,12 @@ struct PolyFitVisitor { for (size_type j = 0; j < deg; ++j) pred += coeffs_[j] * std::pow(*(x_begin + i), j); + y_fits_.push_back(pred); const value_type w = weights_(*(idx_begin + i), i); // y fits at given x points // - y_fits_.push_back(pred); residual_ += ((*(y_begin + i) - pred) * w) * ((*(y_begin + i) - pred) * w); } @@ -5205,26 +5253,74 @@ struct LogFitVisitor { const H &x_begin, const H &x_end, const H &y_begin, const H &y_end) { + const size_type col_s = std::distance(x_begin, x_end); + const size_type thread_level = ThreadGranularity::get_thread_level(); + result_type logx (x_begin, x_end); - std::transform(logx.begin(), logx.end(), logx.begin(), - (value_type(*)(value_type)) std::log); - poly_fit_(idx_begin, idx_end, logx.begin(), logx.end(), y_begin, y_end); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&logx](auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) + logx[i] = std::log(logx[i]); + }); - const size_type col_s = std::distance(x_begin, x_end); + for (auto &fut : futures) fut.get(); + } + else { + std::transform(logx.begin(), logx.end(), logx.begin(), + (value_type(*)(value_type)) std::log); + } - y_fits_.reserve(col_s); - for (size_type i = 0; i < col_s; ++i) [[likely]] { - const value_type pred = - poly_fit_.get_result()[0] + - poly_fit_.get_result()[1] * std::log(*(x_begin + i)); - const value_type w = weights_(*(idx_begin + i), i); + poly_fit_(idx_begin, idx_end, + logx.begin(), logx.end(), + y_begin, y_end); - // y fits at given x points - // - y_fits_.push_back(pred); - residual_ += ((*(y_begin + i) - pred) * w) * - ((*(y_begin + i) - pred) * w); + y_fits_.resize(col_s); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&x_begin, &y_begin, &idx_begin, this] + (auto begin, auto end) -> value_type { + value_type residual { 0 }; + + for (auto i = begin; i < end; ++i) { + const value_type pred = + this->poly_fit_.get_result()[0] + + this->poly_fit_.get_result()[1] * + std::log(*(x_begin + i)); + const value_type w = + this->weights_(*(idx_begin + i), i); + + // y fits at given x points + // + this->y_fits_[i] = pred; + residual += ((*(y_begin + i) - pred) * w) * + ((*(y_begin + i) - pred) * w); + } + return (residual); + }); + + for (auto &fut : futures) residual_ += fut.get(); + } + else { + for (size_type i = 0; i < col_s; ++i) [[likely]] { + const value_type pred = + poly_fit_.get_result()[0] + + poly_fit_.get_result()[1] * std::log(*(x_begin + i)); + const value_type w = weights_(*(idx_begin + i), i); + + // y fits at given x points + // + y_fits_[i] = pred; + residual_ += ((*(y_begin + i) - pred) * w) * + ((*(y_begin + i) - pred) * w); + } } } @@ -5268,28 +5364,67 @@ struct ExponentialFitVisitor { const H &y_begin, const H &y_end) { const size_type col_s = std::distance(x_begin, x_end); + const size_type thread_level = ThreadGranularity::get_thread_level(); assert((col_s == size_type(std::distance(y_begin, y_end)))); - value_type sum_x = 0; // Sum of all observed x - value_type sum_y = 0; // Sum of all observed y - value_type sum_x2 = 0; // Sum of all observed x squared - value_type sum_xy = 0; // Sum of all x times sum of all observed y + value_type sum_x { 0 }; // Sum of all observed x + value_type sum_y { 0 }; // Sum of all observed y + value_type sum_x2 { 0 }; // Sum of all observed x squared + value_type sum_xy { 0 }; // Sum of all x times sum of all observed y - for (size_type i = 0; i < col_s; ++i) [[likely]] { - const value_type x = *(x_begin + i); - const value_type log_y = std::log(*(y_begin + i)); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + using sum_t = + std::tuple; - sum_x += x; - sum_y += log_y; - sum_xy += x * log_y; - sum_x2 += x * x; + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&x_begin, &y_begin](auto begin, auto end) -> sum_t { + value_type sum_x { 0 }; + value_type sum_y { 0 }; + value_type sum_x2 { 0 }; + value_type sum_xy { 0 }; + + for (auto i = begin; i < end; ++i) { + const value_type x = *(x_begin + i); + const value_type log_y = + std::log(*(y_begin + i)); + + sum_x += x; + sum_y += log_y; + sum_xy += x * log_y; + sum_x2 += x * x; + } + return (std::make_tuple(sum_x, sum_y, sum_xy, sum_x2)); + }); + + for (auto &fut : futures) { + const auto &sums = fut.get(); + + sum_x += std::get<0>(sums); + sum_y += std::get<1>(sums); + sum_xy += std::get<2>(sums); + sum_x2 += std::get<3>(sums); + } + } + else { + for (size_type i = 0; i < col_s; ++i) [[likely]] { + const value_type x = *(x_begin + i); + const value_type log_y = std::log(*(y_begin + i)); + + sum_x += x; + sum_y += log_y; + sum_xy += x * log_y; + sum_x2 += x * x; + } } // The slope (the the power of exp) of best fit line // - slope_ = - (col_s * sum_xy - sum_x * sum_y) / (col_s * sum_x2 - sum_x * sum_x); + slope_ = (col_s * sum_xy - sum_x * sum_y) / + (col_s * sum_x2 - sum_x * sum_x); // The intercept of best fit line // @@ -5297,18 +5432,47 @@ struct ExponentialFitVisitor { const value_type prefactor = std::exp(intercept_); - y_fits_.reserve(col_s); - for (size_type i = 0; i < col_s; ++i) [[likely]] { - const value_type x = *(x_begin + i); - const value_type pred = prefactor * std::exp(x * slope_); + y_fits_.resize(col_s); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&x_begin, &y_begin, prefactor, this] + (auto begin, auto end) -> value_type { + value_type residual { 0 }; - // y fits at given x points - // - y_fits_.push_back(pred); + for (auto i = begin; i < end; ++i) { + const value_type x = *(x_begin + i); + const value_type pred = + prefactor * std::exp(x * this->slope_); + + // y fits at given x points + // + this->y_fits_[i] = pred; + + const value_type r = *(y_begin + i) - pred; + + residual += r * r; + } + return (residual); + }); - const value_type r = *(y_begin + i) - pred; + for (auto &fut : futures) residual_ += fut.get(); + } + else { + for (size_type i = 0; i < col_s; ++i) [[likely]] { + const value_type x = *(x_begin + i); + const value_type pred = prefactor * std::exp(x * slope_); + + // y fits at given x points + // + y_fits_[i] = pred; - residual_ += r * r; + const value_type r = *(y_begin + i) - pred; + + residual_ += r * r; + } } } @@ -5353,22 +5517,60 @@ struct LinearFitVisitor { const H &y_begin, const H &y_end) { const size_type col_s = std::distance(x_begin, x_end); + const size_type thread_level = ThreadGranularity::get_thread_level(); assert((col_s == size_type(std::distance(y_begin, y_end)))); - value_type sum_x = 0; // Sum of all observed x - value_type sum_y = 0; // Sum of all observed y - value_type sum_x2 = 0; // Sum of all observed x squared - value_type sum_xy = 0; // Sum of all x times sum of all observed y + value_type sum_x { 0 }; // Sum of all observed x + value_type sum_y { 0 }; // Sum of all observed y + value_type sum_x2 { 0 }; // Sum of all observed x squared + value_type sum_xy { 0 }; // Sum of all x times sum of all observed y - for (size_type i = 0; i < col_s; ++i) { - const value_type x = *(x_begin + i); - const value_type y = *(y_begin + i); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + using sum_t = + std::tuple; - sum_x += x; - sum_y += y; - sum_xy += x * y; - sum_x2 += x * x; + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&x_begin, &y_begin](auto begin, auto end) -> sum_t { + value_type sum_x { 0 }; + value_type sum_y { 0 }; + value_type sum_x2 { 0 }; + value_type sum_xy { 0 }; + + for (auto i = begin; i < end; ++i) { + const value_type x = *(x_begin + i); + const value_type y = *(y_begin + i); + + sum_x += x; + sum_y += y; + sum_xy += x * y; + sum_x2 += x * x; + } + return (std::make_tuple(sum_x, sum_y, sum_xy, sum_x2)); + }); + + for (auto &fut : futures) { + const auto &sums = fut.get(); + + sum_x += std::get<0>(sums); + sum_y += std::get<1>(sums); + sum_xy += std::get<2>(sums); + sum_x2 += std::get<3>(sums); + } + } + else { + for (size_type i = 0; i < col_s; ++i) { + const value_type x = *(x_begin + i); + const value_type y = *(y_begin + i); + + sum_x += x; + sum_y += y; + sum_xy += x * y; + sum_x2 += x * x; + } } const value_type divisor = sum_x2 * col_s - sum_x * sum_x; @@ -5381,18 +5583,49 @@ struct LinearFitVisitor { // intercept_ = (sum_x2 * sum_y - sum_x * sum_xy) / divisor; - y_fits_.reserve(col_s); - std::transform(x_begin, x_end, - y_begin, - std::back_inserter(y_fits_), - [this](const auto &x, const auto &y) -> value_type { - const value_type pred = - this->slope_ * x + this->intercept_; - const value_type r = y - pred; - - this->residual_ += r * r; - return (pred); // y fits at given x points - }); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + y_fits_.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&x_begin, &y_begin, this] + (auto begin, auto end) -> value_type { + value_type residual { 0 }; + + for (auto i = begin; i < end; ++i) { + const value_type x = *(x_begin + i); + const value_type y = *(y_begin + i); + const value_type pred = + this->slope_ * x + this->intercept_; + const value_type r = y - pred; + + // y fits at given x points + // + this->y_fits_[i] = pred; + residual += r * r; + } + return (residual); + }); + + for (auto &fut : futures) residual_ += fut.get(); + } + else { + y_fits_.reserve(col_s); + std::transform(x_begin, x_end, + y_begin, + std::back_inserter(y_fits_), + [this] + (const auto &x, const auto &y) -> value_type { + const value_type pred = + this->slope_ * x + this->intercept_; + const value_type r = y - pred; + + this->residual_ += r * r; + return (pred); // y fits at given x points + }); + } } inline void pre () { @@ -5441,15 +5674,32 @@ struct CubicSplineFitVisitor { const H &y_begin, const H &y_end) { const size_type col_s = std::distance(x_begin, x_end); + const size_type thread_level = ThreadGranularity::get_thread_level(); assert(col_s > 3); assert((col_s == size_type(std::distance(y_begin, y_end)))); result_type h; - h.reserve(col_s - 1); - for(size_type i = 0; i < col_s - 1; ++i) [[likely]] - h.push_back (*(x_begin + (i + 1)) - *(x_begin + i)); + if (thread_level > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + h.resize(col_s - 1); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s - 1, + [&x_begin, &h](auto begin, auto end) -> void { + for (auto i = begin; i < end; ++i) + h[i] = *(x_begin + (i + 1)) - *(x_begin + i); + }); + + for (auto &fut : futures) fut.get(); + } + else { + h.reserve(col_s - 1); + for(size_type i = 0; i < col_s - 1; ++i) [[likely]] + h.push_back (*(x_begin + (i + 1)) - *(x_begin + i)); + } result_type mu (col_s, 0); result_type z (col_s, 0); @@ -5552,28 +5802,68 @@ struct LowessVisitor { // function. // template - inline static void bi_square_(X x_begin, X x_end) { + inline static + void bi_square_(X x_begin, X x_end, long thread_level) { - std::for_each(x_begin, x_end, - [](auto &x) -> void { - const value_type val = T(1) - x * x; + if (thread_level > 2 && + std::distance(x_begin, x_end) >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + x_begin, + x_end, + [](const auto &begin, const auto &end) -> void { + for (auto citer = begin; citer < end; ++citer) { + value_type &x = *citer; + const value_type val = T(1) - x * x; + + x = val * val; + } + }); - x = val * val; - }); + for (auto &fut : futures) fut.get(); + } + else { + std::for_each(x_begin, x_end, + [](auto &x) -> void { + const value_type val = T(1) - x * x; + + x = val * val; + }); + } } // The tri-cubic function (1 - x^3)^3. Used to weight neighboring points // along the x-axis based on their distance to the current point. // template - inline static void tri_cube_(X x_begin, X x_end) { + inline static + void tri_cube_(X x_begin, X x_end, long thread_level) { - std::for_each(x_begin, x_end, - [](auto &x) -> void { - const value_type val = T(1) - x * x * x; + if (thread_level > 2 && + std::distance(x_begin, x_end) >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + x_begin, + x_end, + [](const auto &begin, const auto &end) -> void { + for (auto citer = begin; citer < end; ++citer) { + value_type &x = *citer; + const value_type val = T(1) - x * x * x; + + x = val * val * val; + } + }); - x = val * val * val; - }); + for (auto &fut : futures) fut.get(); + } + else { + std::for_each(x_begin, x_end, + [](auto &x) -> void { + const value_type val = T(1) - x * x * x; + + x = val * val * val; + }); + } } // Calculate residual weights for the next robustifying iteration. @@ -5582,14 +5872,35 @@ struct LowessVisitor { inline void calc_residual_weights_(const IDX &idx_begin, const IDX &idx_end, const Y &y_begin, const Y &y_end, - const K &y_fits_begin, const K & /*y_fits_end*/) { + const K &y_fits_begin, const K &y_fits_end) { - std::transform(y_begin, y_end, - y_fits_begin, - resid_weights_.begin(), - [](auto y, auto y_fit) -> value_type { - return (std::fabs(y - y_fit)); - }); + const size_type col_s = std::distance(y_begin, y_end); + + if (thread_level_ > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop2( + size_type(0), + col_s, + size_type(0), + size_type(std::distance(y_fits_begin, y_fits_end)), + [&y_begin, &y_fits_begin, this] + (auto begin, auto end, auto) -> void { + for (size_type i = begin; i < end; ++i) [[likely]] + this->resid_weights_[i] = + std::fabs(*(y_begin + i) - + *(y_fits_begin + i)); + }); + + for (auto &fut : futures) fut.get(); + } + else { + std::transform(y_begin, y_end, + y_fits_begin, + resid_weights_.begin(), + [](auto y, auto y_fit) -> value_type { + return (std::fabs(y - y_fit)); + }); + } MedianVisitor median_v; @@ -5607,9 +5918,25 @@ struct LowessVisitor { else { const value_type val = T(6) * median_v.get_result(); - std::transform(resid_weights_.begin(), resid_weights_.end(), - resid_weights_.begin(), - [val](auto c) -> value_type { return (c / val); }); + if (thread_level_ > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + resid_weights_.begin(), + resid_weights_.end(), + [val](const auto &begin, const auto &end) -> void { + for (auto citer = begin; citer < end; ++citer) + *citer /= val; + }); + + for (auto &fut : futures) fut.get(); + } + else { + std::transform(resid_weights_.begin(), resid_weights_.end(), + resid_weights_.begin(), + [val](auto c) -> value_type { + return (c / val); + }); + } } // Some trimming of outlier residuals. @@ -5628,7 +5955,8 @@ struct LowessVisitor { // std::placeholders::_1, value_type(0.001)), // 0); - bi_square_(resid_weights_.begin(), resid_weights_.end()); + bi_square_(resid_weights_.begin(), resid_weights_.end(), + thread_level_); } // Update the counters of the local regression. @@ -5803,7 +6131,7 @@ struct LowessVisitor { *(w_begin + j) = dist_i_j_.back(); } - tri_cube_(w_begin + left_end, w_begin + right_end); + tri_cube_(w_begin + left_end, w_begin + right_end, thread_level_); for (size_type j = left_end; j < right_end; ++j) [[likely]] *(w_begin + j) *= resid_weights_[j]; @@ -6030,7 +6358,8 @@ struct LowessVisitor { : frac_(frac), loop_n_(loop_n + 1), delta_(delta), - sorted_(sorted) { } + sorted_(sorted), + thread_level_(ThreadGranularity::get_thread_level()) { } private: @@ -6049,6 +6378,8 @@ struct LowessVisitor { // const bool sorted_; + const long thread_level_; + result_type y_fits_ { }; result_type resid_weights_ { }; @@ -6078,7 +6409,10 @@ struct DecomposeVisitor { std::iota(xvals.begin(), xvals.end(), 0); - LowessVisitor l_v (3, frac_, delta_ * value_type(col_s), true); + LowessVisitor l_v (3, + frac_, + delta_ * value_type(col_s), + true); // Calculate trend // @@ -6138,16 +6472,51 @@ struct DecomposeVisitor { // What is left is residual // residual_.resize(col_s, 0); - if (type_ == decompose_type::additive) - std::transform(detrended.begin(), detrended.end(), - seasonal_.begin(), - residual_.begin(), - std::minus()); - else - std::transform(detrended.begin(), detrended.end(), - seasonal_.begin(), - residual_.begin(), - std::divides()); + if (thread_level_ > 2 && + detrended.size() >= ThreadPool::MUL_THR_THHOLD) { + std::vector> futures; + + if (type_ == decompose_type::additive) + futures = + ThreadGranularity::thr_pool_.parallel_loop2( + size_type(0), + detrended.size(), + size_type(0), + seasonal_.size(), + [this, &detrended] + (auto begin, auto end, auto) -> void { + for (size_type i = begin; i < end; ++i) [[likely]] + this->residual_[i] = + detrended[i] - this->seasonal_[i]; + }); + else + futures = + ThreadGranularity::thr_pool_.parallel_loop2( + size_type(0), + detrended.size(), + size_type(0), + seasonal_.size(), + [this, &detrended] + (auto begin, auto end, auto) -> void { + for (size_type i = begin; i < end; ++i) [[likely]] + this->residual_[i] = + detrended[i] / this->seasonal_[i]; + }); + + for (auto &fut : futures) fut.get(); + } + else { + if (type_ == decompose_type::additive) + std::transform(detrended.begin(), detrended.end(), + seasonal_.begin(), + residual_.begin(), + std::minus()); + else + std::transform(detrended.begin(), detrended.end(), + seasonal_.begin(), + residual_.begin(), + std::divides()); + } } public: @@ -6172,16 +6541,50 @@ struct DecomposeVisitor { // Remove trend from observations in y // - if (type_ == decompose_type::additive) - std::transform(y_begin, y_end, - trend_.begin(), - detrended.begin(), - std::minus()); - else - std::transform(y_begin, y_end, - trend_.begin(), - detrended.begin(), - std::divides()); + if (thread_level_ > 2 && col_s >= ThreadPool::MUL_THR_THHOLD) { + std::vector> futures; + + if (type_ == decompose_type::additive) + futures = + ThreadGranularity::thr_pool_.parallel_loop2( + size_type(0), + col_s, + size_type(0), + trend_.size(), + [this, &detrended, &y_begin] + (auto begin, auto end, auto) -> void { + for (size_type i = begin; i < end; ++i) [[likely]] + detrended[i] = + *(y_begin + i) - this->trend_[i]; + }); + else + futures = + ThreadGranularity::thr_pool_.parallel_loop2( + size_type(0), + col_s, + size_type(0), + trend_.size(), + [this, &detrended, &y_begin] + (auto begin, auto end, auto) -> void { + for (size_type i = begin; i < end; ++i) [[likely]] + detrended[i] = + *(y_begin + i) / this->trend_[i]; + }); + + for (auto &fut : futures) fut.get(); + } + else { + if (type_ == decompose_type::additive) + std::transform(y_begin, y_end, + trend_.begin(), + detrended.begin(), + std::minus()); + else + std::transform(y_begin, y_end, + trend_.begin(), + detrended.begin(), + std::divides()); + } if (type_ == decompose_type::additive) do_seasonal_> @@ -6213,7 +6616,11 @@ struct DecomposeVisitor { value_type frac, value_type delta, decompose_type t = decompose_type::additive) - : frac_(frac), s_period_(s_period), delta_(delta), type_(t) { } + : frac_(frac), + s_period_(s_period), + delta_(delta), + type_(t), + thread_level_(ThreadGranularity::get_thread_level()) { } private: @@ -6231,6 +6638,8 @@ struct DecomposeVisitor { const value_type delta_; const decompose_type type_; + const long thread_level_; + result_type trend_ { }; result_type seasonal_ { }; result_type residual_ { }; @@ -6260,10 +6669,12 @@ is_normal(const V &column, double epsl, bool check_for_standard) { const value_type high_band_1 = static_cast(mean + std); const value_type low_band_1 = static_cast(mean - std); double count_1 = 0.0; - const value_type high_band_2 = static_cast(mean + std * 2.0); + const value_type high_band_2 = + static_cast(mean + std * 2.0); const value_type low_band_2 = static_cast(mean - std * 2.0); double count_2 = 0.0; - const value_type high_band_3 = static_cast(mean + std * 3.0); + const value_type high_band_3 = + static_cast(mean + std * 3.0); const value_type low_band_3 = static_cast(mean - std * 3.0); double count_3 = 0.0; @@ -6319,10 +6730,12 @@ is_lognormal(const V &column, double epsl) { const value_type high_band_1 = static_cast(mean + std); const value_type low_band_1 = static_cast(mean - std); double count_1 = 0.0; - const value_type high_band_2 = static_cast(mean + std * 2.0); + const value_type high_band_2 = + static_cast(mean + std * 2.0); const value_type low_band_2 = static_cast(mean - std * 2.0); double count_2 = 0.0; - const value_type high_band_3 = static_cast(mean + std * 3.0); + const value_type high_band_3 = + static_cast(mean + std * 3.0); const value_type low_band_3 = static_cast(mean - std * 3.0); double count_3 = 0.0; @@ -6380,12 +6793,33 @@ struct BiasVisitor { avger.post(); result_ = std::move(avger.get_result()); - std::transform(column_begin + (roll_period_ - 1), column_end, - result_.begin() + (roll_period_ - 1), - result_.begin() + (roll_period_ - 1), - [](auto val, auto result) -> value_type { - return (val / result - T(1)); - }); + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop2( + roll_period_ - 1, + col_s, + roll_period_ - 1, + col_s, + [this, &column_begin] + (auto begin, auto end, auto) -> void { + for (size_type i = begin; i < end; ++i) [[likely]] { + value_type &re = this->result_[i]; + + re = *(column_begin + i) / re - T(1); + } + }); + + for (auto &fut : futures) fut.get(); + } + else { + std::transform(column_begin + (roll_period_ - 1), column_end, + result_.begin() + (roll_period_ - 1), + result_.begin() + (roll_period_ - 1), + [](auto val, auto result) -> value_type { + return (val / result - T(1)); + }); + } } DEFINE_PRE_POST @@ -6402,7 +6836,8 @@ struct BiasVisitor { result_type result_ { }; }; -template +template using bias_v = BiasVisitor; // ---------------------------------------------------------------------------- @@ -6426,18 +6861,60 @@ struct NonZeroRangeVisitor { bool there_is_zero = false; result_type result; - result.reserve(col_s); - for (size_type i = 0; i < col_s; ++i) [[likely]] { - const value_type v = *(column1_begin + i) - *(column2_begin + i); + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + result.resize(col_s); + + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&result, &column1_begin, &column2_begin] + (auto begin, auto end) -> bool { + bool there_is_zero = false; + + for (size_type i = begin; i < end; ++i) { + const value_type v = + *(column1_begin + i) - *(column2_begin + i); + + result[i] = v; + if (v == 0) there_is_zero = true; + } + return (there_is_zero); + }); + + for (auto &fut : futures) there_is_zero |= fut.get(); + if (there_is_zero) { + auto futures = + ThreadGranularity::thr_pool_.parallel_loop( + size_type(0), + col_s, + [&result] + (auto begin, auto end) -> void { + for (size_type i = begin; i < end; ++i) + result[i] += + std::numeric_limits::epsilon(); + }); - result.push_back(v); - if (v == 0) there_is_zero = true; + for (auto &fut : futures) fut.get(); + } + } + else { + result.reserve(col_s); + for (size_type i = 0; i < col_s; ++i) [[likely]] { + const value_type v = + *(column1_begin + i) - *(column2_begin + i); + + result.push_back(v); + if (v == 0) there_is_zero = true; + } + if (there_is_zero) + std::for_each(result.begin(), result.end(), + [](value_type &v) -> void { + v += std::numeric_limits + ::epsilon(); + }); } - if (there_is_zero) - std::for_each(result.begin(), result.end(), - [](value_type &v) -> void { - v += std::numeric_limits::epsilon(); - }); result_.swap(result); }