Skip to content

Commit

Permalink
Merge pull request #26 from yasahi-hpc/measure_performance
Browse files Browse the repository at this point in the history
Timestamp analyses to investigate the impact of overlaps with executors
  • Loading branch information
yasahi-hpc authored Jul 12, 2023
2 parents f2a9279 + 99a63a4 commit 2d4f228
Show file tree
Hide file tree
Showing 12 changed files with 422 additions and 205 deletions.
12 changes: 11 additions & 1 deletion lib/utils/io_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,24 @@ namespace Impl {
auto key = d.first;
auto value = d.second;
if(index) file << key << separator;


for(std::size_t i=0; i<value.size(); i++) {
if(i != value.size()-1) {
file << value[i] << separator;
} else {
file << value[i] << std::endl;
}
}

/*
for(auto v: value) {
if(v != value.back()) {
file << v << separator;
} else {
file << v << std::endl;
}
}
*/
}
}

Expand Down
1 change: 1 addition & 0 deletions mini-apps/lbm2d-letkf/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct Settings {
bool is_reference_ = true; // false for DA cases
bool is_async_ = false; // In order to enable overlapping, in senders/receivers version of letkf
bool is_bcast_on_host_ = false; // broadcast on device or host
bool use_time_stamps_ = false; // for detailed analysis
double ly_epsilon_ = 1.e-8;

// data assimilation parameter
Expand Down
221 changes: 56 additions & 165 deletions mini-apps/lbm2d-letkf/executors/letkf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ class LETKF : public DA_Model {
auto io_scheduler = io_thread_pool.get_scheduler();
auto _load = stdexec::just() |
stdexec::then([&]{
timers[DA_Load]->begin();
if(mpi_conf_.is_master()) {
timers[DA_Load]->begin();
load(data_vars, it);
timers[DA_Load]->end();
}
timers[DA_Load]->end();
});

timers[TimerEnum::DA]->begin();
Expand All @@ -134,17 +134,17 @@ class LETKF : public DA_Model {
auto xk_buffer = xk_buffer_.mdspan();
auto X = letkf_solver_->X().mdspan();

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_X]->begin();
Impl::transpose(blas_handle_, f, xk, {2, 0, 1}); // (nx, ny, Q) -> (Q, nx*ny)
timers[DA_Set_Matrix]->end();
timers[DA_Pack_X]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_X]->begin();
all2all(xk, xk_buffer); // xk(n_stt, n_batch, n_ens) -> xk_buffer(n_stt, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_X]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_X]->begin();
Impl::transpose(blas_handle_, xk_buffer, X, {0, 2, 1});
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_X]->end();

// set Y
auto yk = yk_.mdspan();
Expand All @@ -159,57 +159,42 @@ class LETKF : public DA_Model {
const int y_offset0 = 0;
auto _yk = Impl::reshape(yk, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny}));
Iterate_policy<4> yk_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny});
timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Y]->begin();
Impl::for_each(yk_pack_policy4d, pack_y_functor(conf_, y_offset0, rho, u, v, _yk));
timers[DA_Set_Matrix]->end();
timers[DA_Pack_Y]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_Y]->begin();
all2all(yk, yk_buffer); // yk(n_obs, n_batch, n_ens) -> yk_buffer(n_obs, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_Y]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_Y]->begin();
Impl::transpose(blas_handle_, yk_buffer, Y, {0, 2, 1}); // (n_obs, n_batch, n_ens) -> (n_obs, n_ens, n_batch)
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_Y]->end();

stdexec::sync_wait( scope.on_empty() );

auto _axpy = letkf_solver_->solve_axpy_sender(scheduler);
if(mpi_conf_.is_master()) {
if(!load_to_device_) {
if(!load_to_device_) {
timers[DA_Load_H2D]->begin();
if(mpi_conf_.is_master()) {
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
}
timers[DA_Load_H2D]->end();
}

// set yo
auto _broadcast = stdexec::just() |
stdexec::then([&]{
if(load_to_device_) {
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
} else {
auto rho_obs = data_vars->rho_obs().host_mdspan();
auto u_obs = data_vars->u_obs().host_mdspan();
auto v_obs = data_vars->v_obs().host_mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

timers[DA_Load_H2D]->begin();
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
timers[DA_Load_H2D]->end();
}
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
});

auto _axpy_and_braodcast = stdexec::when_all(
Expand All @@ -231,36 +216,6 @@ class LETKF : public DA_Model {
timers[TimerEnum::DA]->end();
}

void packX(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// Pack X
const auto f = data_vars->f().mdspan();
auto xk = xk_.mdspan();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, f, xk, {2, 0, 1});
timers[DA_Set_Matrix]->end();
}

void unpackX(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// set X
auto xk_buffer = xk_buffer_.mdspan();
auto X = letkf_solver_->X().mdspan();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, xk_buffer, X, {0, 2, 1});
timers[DA_Set_Matrix]->end();
}

void unpackY(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// set Y
auto yk_buffer = yk_buffer_.mdspan();
auto Y = letkf_solver_->Y().mdspan();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, yk_buffer, Y, {0, 2, 1}); // (n_obs, n_batch, n_ens) -> (n_obs, n_ens, n_batch)
timers[DA_Set_Matrix]->end();
}

void setyo(std::unique_ptr<DataVars>& data_vars, std::vector<Timer*>& timers) {
// set yo
auto [nx, ny] = conf_.settings_.n_;
Expand All @@ -274,73 +229,20 @@ class LETKF : public DA_Model {
auto _y_obs = Impl::reshape(y_obs, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny_local}));
Iterate_policy<4> yo_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny_local});

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Obs]->begin();
Impl::for_each(yo_pack_policy4d, pack_y_functor(conf_, y_offset, rho_obs, u_obs, v_obs, _y_obs));
timers[DA_Set_Matrix]->end();
}

template <class Sender, class Scheduler>
stdexec::sender auto packY_sender(Sender&& sender, Scheduler&& scheduler, std::unique_ptr<DataVars>& data_vars) {
// Pack Y
auto yk = yk_.mdspan();

auto [nx, ny] = conf_.settings_.n_;
auto rho = data_vars->rho().mdspan();
auto u = data_vars->u().mdspan();
auto v = data_vars->v().mdspan();

const int y_offset0 = 0;
const std::size_t size = n_obs_x_ * n_obs_x_ * nx * ny;
auto _yk = Impl::reshape(yk, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny}));
auto f = pack_y_functor(conf_, y_offset0, rho, u, v, _yk);
int n0 = n_obs_x_, n1 = n_obs_x_, n2 = nx, n3 = ny;
auto functor_1d = [=] MDSPAN_FORCE_INLINE_FUNCTION (const int idx) {
if(std::is_same_v<default_iterate_layout, stdex::layout_left>) {
const int i0 = idx % n0;
const int i123 = idx / n0;
const int i1 = i123%n1;
const int i23 = i123/n1;
const int i2 = i23%n2;
const int i3 = i23/n2;
f(i0, i1, i2, i3);
} else {
const int i3 = idx % n3;
const int i012 = idx / n3;
const int i2 = i012%n2;
const int i01 = i012/n2;
const int i1 = i01%n1;
const int i0 = i01/n1;
f(i0, i1, i2, i3);
}
};
return sender | exec::on(scheduler, stdexec::bulk(size, functor_1d));
}

template <class Sender>
stdexec::sender auto all2all_sender(Sender&& sender, std::unique_ptr<DataVars>& data_vars) {
auto xk = xk_.mdspan();
auto xk_buffer = xk_buffer_.mdspan();

auto yk = yk_.mdspan();
auto yk_buffer = yk_buffer_.mdspan();

return sender | stdexec::then( [&] {
all2all(xk, xk_buffer); // xk(n_stt, n_batch, n_ens) -> xk_buffer(n_stt, n_batch, n_ens)
all2all(yk, yk_buffer); // yk(n_obs, n_batch, n_ens) -> yk_buffer(n_obs, n_batch, n_ens)
});
timers[DA_Pack_Obs]->end();
}

private:
// Conventional implementation with thrust
void apply_sync(std::unique_ptr<DataVars>& data_vars, const int it, std::vector<Timer*>& timers) {
timers[TimerEnum::DA]->begin();
timers[DA_Load]->begin();
if(mpi_conf_.is_master()) {
std::cout << __PRETTY_FUNCTION__ << ": t=" << it << std::endl;

timers[DA_Load]->begin();
load(data_vars, it);
timers[DA_Load]->end();
}
timers[DA_Load]->end();
setXandY(data_vars, timers);

timers[DA_LETKF]->begin();
Expand All @@ -366,17 +268,17 @@ class LETKF : public DA_Model {
auto xk_buffer = xk_buffer_.mdspan();
auto X = letkf_solver_->X().mdspan();

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_X]->begin();
Impl::transpose(blas_handle_, f, xk, {2, 0, 1}); // (nx, ny, Q) -> (Q, nx*ny)
timers[DA_Set_Matrix]->end();
timers[DA_Pack_X]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_X]->begin();
all2all(xk, xk_buffer); // xk(n_stt, n_batch, n_ens) -> xk_buffer(n_stt, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_X]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_X]->begin();
Impl::transpose(blas_handle_, xk_buffer, X, {0, 2, 1});
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_X]->end();

// set Y
auto yk = yk_.mdspan();
Expand All @@ -391,58 +293,47 @@ class LETKF : public DA_Model {
const int y_offset0 = 0;
auto _yk = Impl::reshape(yk, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny}));
Iterate_policy<4> yk_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny});
timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Y]->begin();
Impl::for_each(yk_pack_policy4d, pack_y_functor(conf_, y_offset0, rho, u, v, _yk));
timers[DA_Set_Matrix]->end();
timers[DA_Pack_Y]->end();

timers[DA_All2All]->begin();
timers[DA_All2All_Y]->begin();
all2all(yk, yk_buffer); // yk(n_obs, n_batch, n_ens) -> yk_buffer(n_obs, n_batch, n_ens)
timers[DA_All2All]->end();
timers[DA_All2All_Y]->end();

timers[DA_Set_Matrix]->begin();
timers[DA_Unpack_Y]->begin();
Impl::transpose(blas_handle_, yk_buffer, Y, {0, 2, 1}); // (n_obs, n_batch, n_ens) -> (n_obs, n_ens, n_batch)
timers[DA_Set_Matrix]->end();
timers[DA_Unpack_Y]->end();

// set yo
if(load_to_device_) {
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
} else {
auto rho_obs = data_vars->rho_obs().host_mdspan();
auto u_obs = data_vars->u_obs().host_mdspan();
auto v_obs = data_vars->v_obs().host_mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

if(!load_to_device_) {
timers[DA_Load_H2D]->begin();
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
if(mpi_conf_.is_master()) {
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
}
timers[DA_Load_H2D]->end();
}

auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
auto y_obs = letkf_solver_->y_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

const int ny_local = ny/mpi_conf_.size();
const int y_offset = ny_local * mpi_conf_.rank();
auto y_obs = letkf_solver_->y_obs().mdspan();
auto _y_obs = Impl::reshape(y_obs, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny_local}));
Iterate_policy<4> yo_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny_local});

timers[DA_Set_Matrix]->begin();
timers[DA_Pack_Obs]->begin();
Impl::for_each(yo_pack_policy4d, pack_y_functor(conf_, y_offset, rho_obs, u_obs, v_obs, _y_obs));
timers[DA_Set_Matrix]->end();
timers[DA_Pack_Obs]->end();
}

void update(std::unique_ptr<DataVars>& data_vars) {
Expand Down
Loading

0 comments on commit 2d4f228

Please sign in to comment.