From 6e1f0f425748912ba080f73ce0b15ad2edc5e415 Mon Sep 17 00:00:00 2001 From: Hossein Moein Date: Thu, 14 Dec 2023 12:30:32 -0500 Subject: [PATCH] Added parallel computing logic to the following visitors: Covariance, Variance, Correlation, Standard Deviation, Bets, SEM --- benchmarks/dataframe_performance.cc | 12 +- benchmarks/polars_performance.py | 4 +- include/DataFrame/DataFrameStatsVisitors.h | 273 ++++++++++++++---- include/DataFrame/Utils/Threads/ThreadPool.h | 15 + .../DataFrame/Utils/Threads/ThreadPool.tcc | 55 ++++ 5 files changed, 290 insertions(+), 69 deletions(-) diff --git a/benchmarks/dataframe_performance.cc b/benchmarks/dataframe_performance.cc index f617a91c..a202ec17 100644 --- a/benchmarks/dataframe_performance.cc +++ b/benchmarks/dataframe_performance.cc @@ -36,8 +36,8 @@ using namespace hmdf; using namespace std::chrono; constexpr std::size_t ALIGNMENT = 64; -// constexpr std::size_t SIZE = 300000000; -constexpr std::size_t SIZE = 10000000; +constexpr std::size_t SIZE = 300000000; +// constexpr std::size_t SIZE = 10000000; typedef StdDataFrame64 MyDataFrame; @@ -45,6 +45,8 @@ typedef StdDataFrame64 MyDataFrame; int main(int, char *[]) { + MyDataFrame::set_optimum_thread_level(); + const auto first = high_resolution_clock::now(); MyDataFrame df; @@ -64,9 +66,9 @@ int main(int, char *[]) { VarVisitor ln_vv; CorrVisitor e_ln_cv; - auto mean = df.visit_async("normal", n_mv); - auto var = df.visit_async("log_normal", ln_vv); - auto corr = df.visit_async("exponential", "log_normal", e_ln_cv); + auto mean = df.single_act_visit_async("normal", n_mv); + auto var = df.single_act_visit_async("log_normal", ln_vv); + auto corr = df.single_act_visit_async("exponential", "log_normal", e_ln_cv); std::cout << mean.get().get_result() << ", " << var.get().get_result() << ", " diff --git a/benchmarks/polars_performance.py b/benchmarks/polars_performance.py index f6b9463f..49e84c80 100644 --- a/benchmarks/polars_performance.py +++ b/benchmarks/polars_performance.py @@ -4,8 +4,8 @@ # ------------------------------------------------------------------------------ -# SIZE: int = 300000000 -SIZE: int = 10000000 +SIZE: int = 300000000 +# SIZE: int = 10000000 first = datetime.datetime.now() df = pl.DataFrame({"normal": np.random.normal(size=SIZE), diff --git a/include/DataFrame/DataFrameStatsVisitors.h b/include/DataFrame/DataFrameStatsVisitors.h index eee7121b..988564a5 100644 --- a/include/DataFrame/DataFrameStatsVisitors.h +++ b/include/DataFrame/DataFrameStatsVisitors.h @@ -698,8 +698,9 @@ using MinVisitor = ExtremumVisitor>; // than O(MlogM) vs. O(N*M) for majority of usage. // By default, this is a NLargestVisitor // -template> +template> struct NExtremumVisitor { DEFINE_VISIT_BASIC_TYPES @@ -709,7 +710,7 @@ struct NExtremumVisitor { index_type index { }; }; - using compare_type = Cmp; + using compare_type = C; using result_type = std::array; inline void operator() (const index_type &idx, const value_type &val) { @@ -720,14 +721,14 @@ struct NExtremumVisitor { result_[counter_] = { val, idx }; if (extremum_index_ < 0 || cmp_(val, result_[extremum_index_].value)) - extremum_index_ = static_cast(counter_); + extremum_index_ = static_cast(counter_); } else if (cmp_(result_[extremum_index_].value, val)) { result_[extremum_index_] = { val, idx }; extremum_index_ = 0; for (size_type i = 1; i < N; ++i) if (cmp_(result_[i].value, result_[extremum_index_].value)) - extremum_index_ = static_cast(i); + extremum_index_ = static_cast(i); } counter_ += 1; @@ -760,7 +761,7 @@ struct NExtremumVisitor { result_type result_ { }; size_type counter_ { 0 }; - int extremum_index_ { -1 }; + long extremum_index_ { -1 }; compare_type cmp_ { }; const bool skip_nan_; }; @@ -777,66 +778,150 @@ struct CovVisitor { DEFINE_VISIT_BASIC_TYPES_2 + struct InterResults { + value_type total1 { 0 }; + value_type total2 { 0 }; + value_type dot_prod { 0 }; + value_type dot_prod1 { 0 }; + value_type dot_prod2 { 0 }; + size_type cnt { 0 }; + + inline void clear() { + total1 = total2 = dot_prod = dot_prod1 = dot_prod2 = 0; + cnt = 0; + } + }; + inline void operator() (const index_type &, const value_type &val1, const value_type &val2) { if (skip_nan_ && (is_nan__(val1) || is_nan__(val2))) [[unlikely]] return; - total1_ += val1; - total2_ += val2; - dot_prod_ += (val1 * val2); - dot_prod1_ += (val1 * val1); - dot_prod2_ += (val2 * val2); - cnt_ += 1; + inter_result_.total1 += val1; + inter_result_.total2 += val2; + inter_result_.dot_prod += (val1 * val2); + inter_result_.dot_prod1 += (val1 * val1); + inter_result_.dot_prod2 += (val2 * val2); + inter_result_.cnt += 1; + } + template + inline void + operator() (K /*idx_begin*/, K /*idx_end*/, + H column_begin1, H column_end1, + H column_begin2, H column_end2) { + + if (ThreadGranularity::get_thread_level() > 2 && + std::distance(column_begin1, column_end1) >= + ThreadPool::MUL_THR_THHOLD) { + auto lbd = + [this] + (const auto &begin1, const auto &end1, + const auto &begin2) -> InterResults { + InterResults result { }; + auto iter2 = begin2; + + for (auto iter1 = begin1; iter1 < end1; ++iter1, ++iter2) { + const value_type &val1 = *iter1; + const value_type &val2 = *iter2; + + if (! this->skip_nan_ || + (! is_nan__(val1) && ! is_nan__(val2))) { + result.total1 += val1; + result.total2 += val2; + result.dot_prod += (val1 * val2); + result.dot_prod1 += (val1 * val1); + result.dot_prod2 += (val2 * val2); + result.cnt += 1; + } + } + return (result); + }; + auto futures = + ThreadGranularity::thr_pool_.parallel_loop2(column_begin1, + column_end1, + column_begin2, + column_end2, + std::move(lbd)); + + for (auto &fut : futures) { + const auto &result = fut.get(); + + inter_result_.total1 += result.total1; + inter_result_.total2 += result.total2; + inter_result_.dot_prod += result.dot_prod; + inter_result_.dot_prod1 += result.dot_prod1; + inter_result_.dot_prod2 += result.dot_prod2; + inter_result_.cnt += result.cnt; + } + } + else { + for (; column_begin1 < column_end1 && column_begin2 < column_end2; + ++column_begin1, ++column_begin2) { + const value_type &val1 = *column_begin1; + const value_type &val2 = *column_begin2; + + if (! skip_nan_ || (! is_nan__(val1) && ! is_nan__(val2))) { + inter_result_.total1 += val1; + inter_result_.total2 += val2; + inter_result_.dot_prod += (val1 * val2); + inter_result_.dot_prod1 += (val1 * val1); + inter_result_.dot_prod2 += (val2 * val2); + inter_result_.cnt += 1; + } + } + } } - PASS_DATA_ONE_BY_ONE_2 inline void pre () { - total1_ = total2_ = dot_prod_ = dot_prod1_ = dot_prod2_ = result_ = 0; - cnt_ = 0; + inter_result_.clear(); + result_ = 0; } inline void post () { - const value_type d = value_type(cnt_) - b_; + const value_type d = value_type(inter_result_.cnt) - b_; if (d != 0) [[likely]] - result_ = (dot_prod_ - (total1_ * total2_) / value_type(cnt_)) / d; + result_ = (inter_result_.dot_prod - + (inter_result_.total1 * inter_result_.total2) / + value_type(inter_result_.cnt)) / + d; else result_ = std::numeric_limits::quiet_NaN(); } inline result_type get_result () const { return (result_); } inline value_type get_var1 () const { - const value_type d = value_type(cnt_) - b_; + const value_type d = value_type(inter_result_.cnt) - b_; if (d != 0) [[likely]] - return ((dot_prod1_ - (total1_ * total1_) / value_type(cnt_)) / d); + return ((inter_result_.dot_prod1 - + (inter_result_.total1 * inter_result_.total1) / + value_type(inter_result_.cnt)) / + d); else return (std::numeric_limits::quiet_NaN()); } inline value_type get_var2 () const { - const value_type d = value_type(cnt_) - b_; + const value_type d = value_type(inter_result_.cnt) - b_; if (d != 0) [[likely]] - return ((dot_prod2_ - (total2_ * total2_) / value_type(cnt_)) / d); + return ((inter_result_.dot_prod2 - + (inter_result_.total2 * inter_result_.total2) / + value_type(inter_result_.cnt)) / + d); else return (std::numeric_limits::quiet_NaN()); } - inline size_type get_count() const { return (cnt_); } + inline size_type get_count() const { return (inter_result_.cnt); } explicit CovVisitor (bool biased = false, bool skipnan = true) : b_ (biased ? 0 : 1), skip_nan_(skipnan) { } private: - value_type total1_ { 0 }; - value_type total2_ { 0 }; - value_type dot_prod_ { 0 }; - value_type dot_prod1_ { 0 }; - value_type dot_prod2_ { 0 }; + InterResults inter_result_ { }; result_type result_ { 0 }; - size_type cnt_ { 0 }; const value_type b_; const bool skip_nan_; }; @@ -852,7 +937,13 @@ struct VarVisitor { cov_ (idx, val, val); } - PASS_DATA_ONE_BY_ONE + template + inline void + operator() (K idx_begin, K idx_end, H column_begin, H column_end) { + + cov_ (idx_begin, idx_end, + column_begin, column_end, column_begin, column_end); + } inline void pre () { cov_.pre(); } inline void post () { cov_.post(); } @@ -878,7 +969,15 @@ struct BetaVisitor { cov_ (idx, val, bmark); } - PASS_DATA_ONE_BY_ONE_2 + template + inline void + operator() (K idx_begin, K idx_end, + H data_begin, H data_end, + H benchmark_begin, H benchmark_end) { + + cov_ (idx_begin, idx_end, + data_begin, data_end, benchmark_begin, benchmark_end); + } inline void pre () { cov_.pre(); result_ = 0; } inline void post () { @@ -913,7 +1012,12 @@ struct StdVisitor { var_ (idx, val); } - PASS_DATA_ONE_BY_ONE + template + inline void + operator() (K idx_begin, K idx_end, H column_begin, H column_end) { + + var_ (idx_begin, idx_end, column_begin, column_end); + } inline void pre () { var_.pre(); result_ = 0; } inline void post () { var_.post(); result_ = ::sqrt(var_.get_result()); } @@ -941,7 +1045,12 @@ struct SEMVisitor { std_ (idx, val); } - PASS_DATA_ONE_BY_ONE + template + inline void + operator() (K idx_begin, K idx_end, H column_begin, H column_end) { + + std_ (idx_begin, idx_end, column_begin, column_end); + } inline void pre () { std_.pre(); result_ = 0; } inline void post () { @@ -1011,8 +1120,12 @@ struct RankVisitor { const H &column_begin, const H &column_end) { GET_COL_SIZE - std::vector::type> - rank_vec(col_s); + + using vec_t = + std::vector::type>; + + vec_t rank_vec(col_s); std::iota(rank_vec.begin(), rank_vec.end(), 0); std::stable_sort( @@ -1103,41 +1216,77 @@ struct CorrVisitor { H column_begin1, H column_end1, H column_begin2, H column_end2) { - const auto &idx = *idx_begin; const size_type col_s = std::min ({ std::distance(idx_begin, idx_end), std::distance(column_begin1, column_end1), std::distance(column_begin2, column_end2) }); if (type_ == correlation_type::pearson) { - while (column_begin1 < column_end1 && column_begin2 < column_end2) - (*this)(idx, *column_begin1++, *column_begin2++); + cov_ (idx_begin, idx_end, + column_begin1, column_end1, column_begin2, column_end2); } else { // correlation_type::spearman - RankVisitor rank; + auto calc_lbd = + [col_s, this](const auto &rank1, const auto &rank2) -> void { + value_type diff_sum { 0 }; - rank.pre(); - rank(idx_begin, idx_end, column_begin1, column_end1); - rank.post(); + for (size_type i = 0; i < col_s; ++i) { + const value_type diff = rank1[i] - rank2[i]; - const auto rank1 = rank.get_result(); + diff_sum += diff * diff; + } - rank.pre(); - rank(idx_begin, idx_end, column_begin2, column_end2); - rank.post(); + this->result_ = value_type(1) - + ((value_type(6) * diff_sum) / + (value_type(col_s * (col_s * col_s - 1)))); + }; - const auto rank2 = std::move(rank.get_result()); - value_type diff_sum { 0 }; + if (ThreadGranularity::get_thread_level() > 2 && + col_s >= ThreadPool::MUL_THR_THHOLD) { + auto lbd = + [](const K &ib, const K &ie, + const H &cb, const H &ce) -> RankVisitor { + RankVisitor rank; + + rank.pre(); + rank(ib, ie, cb, ce); + rank.post(); + return (rank); + }; + auto fut1 = + ThreadGranularity::thr_pool_.dispatch( + false, + lbd, + std::cref(idx_begin), + std::cref(idx_end), + std::cref(column_begin1), + std::cref(column_end1)); + auto fut2 = + ThreadGranularity::thr_pool_.dispatch( + false, + lbd, + std::cref(idx_begin), + std::cref(idx_end), + std::cref(column_begin2), + std::cref(column_end2)); + + calc_lbd(fut1.get().get_result(), fut2.get().get_result()); + } + else { + RankVisitor rank; - for (size_type i = 0; i < col_s; ++i) { - const value_type diff = rank1[i] - rank2[i]; + rank.pre(); + rank(idx_begin, idx_end, column_begin1, column_end1); + rank.post(); - diff_sum += diff * diff; - } + const auto rank1 = rank.get_result(); - result_ = value_type(1) - - ((value_type(6) * diff_sum) / - (value_type(col_s * (col_s * col_s - 1)))); + rank.pre(); + rank(idx_begin, idx_end, column_begin2, column_end2); + rank.post(); + + calc_lbd(rank1, rank.get_result()); + } } } @@ -1187,12 +1336,12 @@ struct DotProdVisitor { // ---------------------------------------------------------------------------- -template> +template> struct ExtremumSubArrayVisitor { DEFINE_VISIT_BASIC_TYPES_2 - using compare_type = Cmp; + using compare_type = C; inline void operator() (const index_type &, const value_type &val) { @@ -1275,7 +1424,7 @@ using MinSubArrayVisitor = ExtremumSubArrayVisitor>; template, + typename C = std::less, std::size_t A = 0> struct NExtremumSubArrayVisitor { @@ -1302,7 +1451,7 @@ struct NExtremumSubArrayVisitor { using result_type = std::vector::type>; - using compare_type = Cmp; + using compare_type = C; inline void operator() (const index_type &idx, const value_type &val) { @@ -1337,10 +1486,10 @@ struct NExtremumSubArrayVisitor { private: - ExtremumSubArrayVisitor extremum_sub_array_; + ExtremumSubArrayVisitor extremum_sub_array_; FixedSizePriorityQueue< SubArrayInfo, N, - typename template_switch::type> q_ { }; + typename template_switch::type> q_ { }; result_type result_ { }; compare_type cmp_ { }; }; @@ -1757,12 +1906,12 @@ struct CumProdVisitor { // ---------------------------------------------------------------------------- template, std::size_t A = 0> + typename C = std::less, std::size_t A = 0> struct CumExtremumVisitor { DEFINE_VISIT_BASIC_TYPES_3 - using compare_type = Cmp; + using compare_type = C; template inline void diff --git a/include/DataFrame/Utils/Threads/ThreadPool.h b/include/DataFrame/Utils/Threads/ThreadPool.h index 3d164900..ab8f367a 100644 --- a/include/DataFrame/Utils/Threads/ThreadPool.h +++ b/include/DataFrame/Utils/Threads/ThreadPool.h @@ -110,6 +110,14 @@ class ThreadPool { std::decay_t, std::decay_t, std::decay_t ...>>>; + template + requires std::invocable + using loop2_res_t = + std::vector, + std::decay_t, + std::decay_t, + std::decay_t, + std::decay_t ...>>>; // The return type of dispatch is std::future of return type of routine // @@ -124,6 +132,13 @@ class ThreadPool { loop_res_t parallel_loop(I begin, I end, F &&routine, As && ... args); + // Parallel loop operating with two ranges + // + template + loop2_res_t + parallel_loop2(I1 begin1, I1 end1, I2 begin2, I2 end2, + F &&routine, As && ... args); + template void parallel_sort(const I begin, const I end); template +ThreadPool::loop2_res_t +ThreadPool::parallel_loop2(I1 begin1, I1 end1, I2 begin2, I2 end2, + F &&routine, As && ... args) { + + using task_return_t = + std::invoke_result_t, + std::decay_t, + std::decay_t, + std::decay_t, + std::decay_t ...>; + using future_t = std::future; + + size_type n { 0 }; + + if constexpr (std::is_integral::value) + n = std::min(end1 - begin1, end2 - begin2); + else + n = std::min(std::distance(begin1, end1), std::distance(begin2, end2)); + + const size_type cap_thrs { capacity_threads() }; + const size_type block_size { (n > cap_thrs) ? n / cap_thrs : n }; + std::vector ret; + + if (block_size == n) { + ret.reserve(n); + for (size_type i = 0; i < n; ++i) + ret.emplace_back(dispatch(false, + routine, + begin1 + i, + begin1 + (i + 1), + begin2 + i, + std::forward(args) ...)); + } + else { + ret.reserve(cap_thrs + 1); + for (size_type i = 0; i < n; i += block_size) { + const size_type block_end { + ((i + block_size) > n) ? n : i + block_size + }; + + ret.emplace_back(dispatch(false, + routine, + begin1 + i, + begin1 + block_end, + begin2 + i, + std::forward(args) ...)); + } + } + + return (ret); +} + +// ---------------------------------------------------------------------------- + template void ThreadPool::parallel_sort(const I begin, const I end) {