Skip to content

Commit 74a0d65

Browse files
committed
Added parallel computing logic to the following visitors: DetrendPriceOsci, AccerelationBands, PriceDistance, EldersThermometer, EldersForceIndex, EaseOfMovement
1 parent 1800134 commit 74a0d65

File tree

1 file changed

+243
-72
lines changed

1 file changed

+243
-72
lines changed

include/DataFrame/DataFrameFinancialVisitors.h

+243-72
Original file line numberDiff line numberDiff line change
@@ -7184,12 +7184,29 @@ struct DetrendPriceOsciVisitor {
71847184
const size_type shift { roll_period_ / 2 + 1 };
71857185
result_type result(col_s, std::numeric_limits<T>::quiet_NaN());
71867186

7187-
std::transform(column_begin + shift, column_begin + col_s,
7188-
savg.get_result().begin(), // Offset by (i - shift)
7189-
result.begin() + shift,
7190-
[] (const auto &c, const auto &s) -> value_type {
7191-
return (c - s);
7192-
});
7187+
if (col_s >= ThreadPool::MUL_THR_THHOLD &&
7188+
ThreadGranularity::get_thread_level() > 2) {
7189+
auto futures =
7190+
ThreadGranularity::thr_pool_.parallel_loop(
7191+
shift,
7192+
col_s,
7193+
[&savg = std::as_const(savg.get_result()),
7194+
&result, &column_begin, shift]
7195+
(auto begin, auto end) mutable -> void {
7196+
for (size_type i = begin; i < end; ++i) [[likely]]
7197+
result[i] = *(column_begin + i) - savg[i - shift];
7198+
});
7199+
7200+
for (auto &fut : futures) fut.get();
7201+
}
7202+
else {
7203+
std::transform(column_begin + shift, column_begin + col_s,
7204+
savg.get_result().begin(), // Offset by (i - shift)
7205+
result.begin() + shift,
7206+
[] (const auto &c, const auto &s) -> value_type {
7207+
return (c - s);
7208+
});
7209+
}
71937210
result_.swap(result);
71947211
}
71957212

@@ -7244,19 +7261,42 @@ struct AccelerationBandsVisitor {
72447261
nzr(idx_begin, idx_end, high_begin, high_end, low_begin, low_end);
72457262
nzr.post();
72467263

7247-
result_type lower_band;
7248-
result_type upper_band;
7264+
result_type lower_band(col_s);
7265+
result_type upper_band(col_s);
72497266

7250-
lower_band.reserve(col_s);
7251-
upper_band.reserve(col_s);
7252-
for (size_type i { 0 }; i < col_s; ++i) [[likely]] {
7253-
const value_type low = *(low_begin + i);
7254-
const value_type high = *(high_begin + i);
7255-
const value_type hl_ratio =
7256-
(nzr.get_result()[i] / (high + low)) * mlp_;
7267+
if (col_s >= ThreadPool::MUL_THR_THHOLD &&
7268+
ThreadGranularity::get_thread_level() > 2) {
7269+
auto futures =
7270+
ThreadGranularity::thr_pool_.parallel_loop(
7271+
size_type(0),
7272+
col_s,
7273+
[&nzr = std::as_const(nzr.get_result()),
7274+
&lower_band, &upper_band,
7275+
&low_begin, &high_begin, this]
7276+
(auto begin, auto end) mutable -> void {
7277+
for (size_type i = begin; i < end; ++i) [[likely]] {
7278+
const value_type low = *(low_begin + i);
7279+
const value_type high = *(high_begin + i);
7280+
const value_type hl_ratio =
7281+
(nzr[i] / (high + low)) * this->mlp_;
7282+
7283+
lower_band[i] = low * (T(1) - hl_ratio);
7284+
upper_band[i] = high * (T(1) + hl_ratio);
7285+
}
7286+
});
7287+
7288+
for (auto &fut : futures) fut.get();
7289+
}
7290+
else {
7291+
for (size_type i { 0 }; i < col_s; ++i) [[likely]] {
7292+
const value_type low = *(low_begin + i);
7293+
const value_type high = *(high_begin + i);
7294+
const value_type hl_ratio =
7295+
(nzr.get_result()[i] / (high + low)) * mlp_;
72577296

7258-
lower_band.push_back(low * (T(1) - hl_ratio));
7259-
upper_band.push_back(high * (T(1) + hl_ratio));
7297+
lower_band[i] = low * (T(1) - hl_ratio);
7298+
upper_band[i] = high * (T(1) + hl_ratio);
7299+
}
72607300
}
72617301

72627302
SimpleRollAdopter<MeanVisitor<T, I>, T, I, A> savg
@@ -7327,6 +7367,8 @@ struct PriceDistanceVisitor {
73277367
const H &close_begin, const H &close_end) {
73287368

73297369
const size_type col_s = std::distance(close_begin, close_end);
7370+
const auto thread_level = (col_s < ThreadPool::MUL_THR_THHOLD)
7371+
? 0L : ThreadGranularity::get_thread_level();
73307372

73317373
assert((col_s == size_type(std::distance(low_begin, low_end))));
73327374
assert((col_s == size_type(std::distance(open_begin, open_end))));
@@ -7338,40 +7380,84 @@ struct PriceDistanceVisitor {
73387380
nzr(idx_begin, idx_end, high_begin, high_end, low_begin, low_end);
73397381
nzr.post();
73407382

7341-
result_type result;
7383+
result_type result(col_s);
73427384

7343-
result.reserve(col_s);
7344-
std::transform(nzr.get_result().begin(),
7345-
nzr.get_result().begin() + col_s,
7346-
std::back_inserter(result),
7347-
[](const auto &n) -> value_type {
7348-
return (T(2) * n);
7349-
});
7385+
if (thread_level > 2) {
7386+
auto futures =
7387+
ThreadGranularity::thr_pool_.parallel_loop(
7388+
size_type(0),
7389+
col_s,
7390+
[&nzr = std::as_const(nzr.get_result()), &result]
7391+
(auto begin, auto end) mutable -> void {
7392+
for (size_type i = begin; i < end; ++i) [[likely]]
7393+
result[i] = T(2) * nzr[i];
7394+
});
7395+
7396+
for (auto &fut : futures) fut.get();
7397+
}
7398+
else {
7399+
std::transform(nzr.get_result().begin(),
7400+
nzr.get_result().begin() + col_s,
7401+
result.begin(),
7402+
[](const auto &n) -> value_type {
7403+
return (T(2) * n);
7404+
});
7405+
}
73507406

73517407
nzr.pre();
73527408
nzr(idx_begin, idx_end,
73537409
// FIXME: "close_begin - 1" is not good
73547410
open_begin, open_end, close_begin - 1, close_end);
73557411
nzr.post();
73567412
result[0] = std::numeric_limits<T>::quiet_NaN();
7357-
std::transform(nzr.get_result().begin() + 1,
7358-
nzr.get_result().begin() + col_s,
7359-
result.begin() + 1,
7360-
result.begin() + 1,
7361-
[](const auto &n, const auto &r) -> value_type {
7362-
return (r + abs__(n));
7363-
});
7413+
if (thread_level > 2) {
7414+
auto futures =
7415+
ThreadGranularity::thr_pool_.parallel_loop(
7416+
size_type(1),
7417+
col_s,
7418+
[&nzr = std::as_const(nzr.get_result()), &result]
7419+
(auto begin, auto end) mutable -> void {
7420+
for (size_type i = begin; i < end; ++i) [[likely]]
7421+
result[i] += abs__(nzr[i]);
7422+
});
7423+
7424+
for (auto &fut : futures) fut.get();
7425+
}
7426+
else {
7427+
std::transform(nzr.get_result().begin() + 1,
7428+
nzr.get_result().begin() + col_s,
7429+
result.begin() + 1,
7430+
result.begin() + 1,
7431+
[](const auto &n, const auto &r) -> value_type {
7432+
return (r + abs__(n));
7433+
});
7434+
}
73647435

73657436
nzr.pre();
73667437
nzr(idx_begin, idx_end, close_begin, close_end, open_begin, open_end);
73677438
nzr.post();
7368-
std::transform(nzr.get_result().begin(),
7369-
nzr.get_result().begin() + col_s,
7370-
result.begin(),
7371-
result.begin(),
7372-
[](const auto &n, const auto &r) -> value_type {
7373-
return (r - abs__(n));
7374-
});
7439+
if (thread_level > 2) {
7440+
auto futures =
7441+
ThreadGranularity::thr_pool_.parallel_loop(
7442+
size_type(0),
7443+
col_s,
7444+
[&nzr = std::as_const(nzr.get_result()), &result]
7445+
(auto begin, auto end) mutable -> void {
7446+
for (size_type i = begin; i < end; ++i) [[likely]]
7447+
result[i] -= abs__(nzr[i]);
7448+
});
7449+
7450+
for (auto &fut : futures) fut.get();
7451+
}
7452+
else {
7453+
std::transform(nzr.get_result().begin(),
7454+
nzr.get_result().begin() + col_s,
7455+
result.begin(),
7456+
result.begin(),
7457+
[](const auto &n, const auto &r) -> value_type {
7458+
return (r - abs__(n));
7459+
});
7460+
}
73757461
result_.swap(result);
73767462
}
73777463

@@ -7405,20 +7491,43 @@ struct EldersThermometerVisitor {
74057491
const H &high_begin, const H &high_end) {
74067492

74077493
const size_type col_s = std::distance(low_begin, low_end);
7494+
const auto thread_level = (col_s < ThreadPool::MUL_THR_THHOLD)
7495+
? 0L : ThreadGranularity::get_thread_level();
74087496

74097497
assert((col_s == size_type(std::distance(high_begin, high_end))));
74107498

7411-
result_type result;
7499+
result_type result(col_s, std::numeric_limits<T>::quiet_NaN());
74127500

7413-
result.reserve(col_s);
7414-
result.push_back(std::numeric_limits<T>::quiet_NaN());
7415-
for (size_type i { 1 }; i < col_s; ++i) [[likely]] {
7416-
const value_type low =
7417-
abs__(*(low_begin + (i - 1)) - *(low_begin + i));
7418-
const value_type high =
7419-
abs__(*(high_begin + i) - *(high_begin + (i - 1)));
7501+
if (thread_level > 2) {
7502+
auto futures =
7503+
ThreadGranularity::thr_pool_.parallel_loop(
7504+
size_type(1),
7505+
col_s,
7506+
[&result, &low_begin, &high_begin]
7507+
(auto begin, auto end) mutable -> void {
7508+
for (size_type i = begin; i < end; ++i) [[likely]] {
7509+
const value_type low =
7510+
abs__(*(low_begin + (i - 1)) -
7511+
*(low_begin + i));
7512+
const value_type high =
7513+
abs__(*(high_begin + i) -
7514+
*(high_begin + (i - 1)));
7515+
7516+
result[i] = std::max(high, low);
7517+
}
7518+
});
7519+
7520+
for (auto &fut : futures) fut.get();
7521+
}
7522+
else {
7523+
for (size_type i { 1 }; i < col_s; ++i) [[likely]] {
7524+
const value_type low =
7525+
abs__(*(low_begin + (i - 1)) - *(low_begin + i));
7526+
const value_type high =
7527+
abs__(*(high_begin + i) - *(high_begin + (i - 1)));
74207528

7421-
result.push_back(std::max(high, low));
7529+
result[i] = std::max(high, low);
7530+
}
74227531
}
74237532

74247533
ewm_v<T, I, A> ewm(exponential_decay_spec::span, roll_period_, true);
@@ -7427,17 +7536,37 @@ struct EldersThermometerVisitor {
74277536
ewm (idx_begin, idx_end, result.begin(), result.end());
74287537
ewm.post();
74297538

7430-
bool_vec t_long;
7431-
bool_vec t_short;
7539+
bool_vec t_long(col_s);
7540+
bool_vec t_short(col_s);
74327541

7433-
t_long.reserve(col_s);
7434-
t_short.reserve(col_s);
7435-
for (size_type i { 0 }; i < col_s; ++i) [[likely]] {
7436-
const value_type thermo = result[i];
7437-
const value_type thermo_ma = ewm.get_result()[i];
7542+
if (thread_level > 2) {
7543+
auto futures =
7544+
ThreadGranularity::thr_pool_.parallel_loop(
7545+
size_type(0),
7546+
col_s,
7547+
[&ewm = std::as_const(ewm.get_result()),
7548+
&result = std::as_const(result),
7549+
&t_long, &t_short, this]
7550+
(auto begin, auto end) mutable -> void {
7551+
for (size_type i = begin; i < end; ++i) [[likely]] {
7552+
const value_type thermo = result[i];
7553+
const value_type thermo_ma = ewm[i];
74387554

7439-
t_long.push_back(thermo < (thermo_ma * buy_f_));
7440-
t_short.push_back(thermo > (thermo_ma * sell_f_));
7555+
t_long[i] = thermo < (thermo_ma * this->buy_f_);
7556+
t_short[i] = thermo > (thermo_ma * this->sell_f_);
7557+
}
7558+
});
7559+
7560+
for (auto &fut : futures) fut.get();
7561+
}
7562+
else {
7563+
for (size_type i { 0 }; i < col_s; ++i) [[likely]] {
7564+
const value_type thermo = result[i];
7565+
const value_type thermo_ma = ewm.get_result()[i];
7566+
7567+
t_long[i] = thermo < (thermo_ma * buy_f_);
7568+
t_short[i] = thermo > (thermo_ma * sell_f_);
7569+
}
74417570
}
74427571

74437572
result_.swap(result);
@@ -7512,8 +7641,24 @@ struct EldersForceIndexVisitor {
75127641

75137642
result_type result = std::move(diff.get_result());;
75147643

7515-
for (size_type i { 0 }; i < col_s; ++i) [[likely]]
7516-
result[i] *= *(volume_begin + i);
7644+
if (col_s >= ThreadPool::MUL_THR_THHOLD &&
7645+
ThreadGranularity::get_thread_level() > 2) {
7646+
auto futures =
7647+
ThreadGranularity::thr_pool_.parallel_loop(
7648+
size_type(0),
7649+
col_s,
7650+
[&result, &volume_begin]
7651+
(auto begin, auto end) mutable -> void {
7652+
for (size_type i = begin; i < end; ++i) [[likely]]
7653+
result[i] *= *(volume_begin + i);
7654+
});
7655+
7656+
for (auto &fut : futures) fut.get();
7657+
}
7658+
else {
7659+
for (size_type i { 0 }; i < col_s; ++i) [[likely]]
7660+
result[i] *= *(volume_begin + i);
7661+
}
75177662

75187663
ewm_v<T, I, A> ewm(exponential_decay_spec::span, roll_period_, true);
75197664

@@ -7562,20 +7707,46 @@ struct EaseOfMovementVisitor {
75627707
assert((col_s == size_type(std::distance(volume_begin, volume_end))));
75637708
assert(roll_period_ < col_s);
75647709

7565-
result_type result;
7710+
result_type result(col_s, std::numeric_limits<T>::quiet_NaN());
75667711

7567-
result.reserve(col_s);
7568-
result.push_back(std::numeric_limits<T>::quiet_NaN());
7569-
for (size_type i { 1 }; i < col_s; ++i) {
7570-
const value_type low = *(low_begin + i);
7571-
const value_type high = *(high_begin + i);
7572-
const value_type distance =
7573-
((high + low) / T(2)) -
7574-
((*(high_begin + (i - 1)) + *(low_begin + (i - 1))) / T(2));
7575-
const value_type box_ratio =
7576-
(*(volume_begin + i) / vol_div_) / (high - low);
7577-
7578-
result.push_back(distance / box_ratio);
7712+
if (col_s >= ThreadPool::MUL_THR_THHOLD &&
7713+
ThreadGranularity::get_thread_level() > 2) {
7714+
auto futures =
7715+
ThreadGranularity::thr_pool_.parallel_loop(
7716+
size_type(1),
7717+
col_s,
7718+
[&result, &low_begin, &high_begin, &volume_begin, this]
7719+
(auto begin, auto end) mutable -> void {
7720+
for (size_type i = begin; i < end; ++i) [[likely]] {
7721+
const value_type low = *(low_begin + i);
7722+
const value_type high = *(high_begin + i);
7723+
const value_type distance =
7724+
((high + low) / T(2)) -
7725+
((*(high_begin + (i - 1)) +
7726+
*(low_begin + (i - 1))) / T(2));
7727+
const value_type box_ratio =
7728+
(*(volume_begin + i) / this->vol_div_) /
7729+
(high - low);
7730+
7731+
result[i] = distance / box_ratio;
7732+
}
7733+
});
7734+
7735+
for (auto &fut : futures) fut.get();
7736+
}
7737+
else {
7738+
for (size_type i { 1 }; i < col_s; ++i) {
7739+
const value_type low = *(low_begin + i);
7740+
const value_type high = *(high_begin + i);
7741+
const value_type distance =
7742+
((high + low) / T(2)) -
7743+
((*(high_begin + (i - 1)) +
7744+
*(low_begin + (i - 1))) / T(2));
7745+
const value_type box_ratio =
7746+
(*(volume_begin + i) / vol_div_) / (high - low);
7747+
7748+
result[i] = distance / box_ratio;
7749+
}
75797750
}
75807751

75817752
SimpleRollAdopter<MeanVisitor<T, I>, T, I, A> savg

0 commit comments

Comments
 (0)