Skip to content

Commit

Permalink
Merge pull request #25 from yasahi-hpc/optimization-executors
Browse files Browse the repository at this point in the history
Optimization executors
  • Loading branch information
yasahi-hpc authored Jul 11, 2023
2 parents 22537e3 + 2635075 commit f2a9279
Show file tree
Hide file tree
Showing 15 changed files with 646 additions and 40 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ jobs:
# Build your program with the given configuration
run: |
for backend in ${{ env.backends }}; do
docker run -v ${{github.workspace}}:/work ${{ env.container }} cmake --build build_${backend} --config ${{env.BUILD_TYPE}} -j 2
docker run -v ${{github.workspace}}:/work ${{ env.container }} cmake --build build_${backend} --config ${{env.BUILD_TYPE}}
#docker run -v ${{github.workspace}}:/work ${{ env.container }} cmake --build build_${backend} --config ${{env.BUILD_TYPE}} -j 2
#docker run -v ${{github.workspace}}:/work ${{ env.container }} cmake --build build_${backend} --config ${{env.BUILD_TYPE}} -- VERBOSE=1 -j 8
done
Expand Down
61 changes: 61 additions & 0 deletions lib/utils/io_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
#define __IO_UTILS_HPP__

#include <string>
#include <cstring>
#include <fstream>
#include <cassert>

namespace Impl {
// Binary files
template <class ViewType>
void to_binary(const std::string& filename, const ViewType& view) {
std::FILE *fp = std::fopen(filename.c_str(), "wb");
Expand All @@ -25,6 +28,64 @@ namespace Impl {
auto* data = view.data_handle();
file.read(reinterpret_cast<char*>(data), sizeof(value_type) * size);
}

// CSV files
template <typename IndexType>
void to_csv(const std::string& filename,
std::map<IndexType, std::vector<std::string>>& dict,
const std::string& separator=",",
const bool header=true,
const bool index=false) {
std::ofstream file(filename);
assert( file.is_open() );
for(auto d: dict) {
if(!header) continue;
auto key = d.first;
auto value = d.second;
if(index) file << key << separator;

for(auto v: value) {
if(v != value.back()) {
file << v << separator;
} else {
file << v << std::endl;
}
}
}
}

template <typename IndexType>
void from_csv(const std::string& filename,
std::map<IndexType, std::vector<std::string>>& dict,
const std::string& separator=",",
const bool header=true,
const bool index=false) {
std::ifstream file(filename, std::ios::in);
assert( file.is_open() );
std::string line, word;
std::vector<std::string> row;
int count = 0;
while(std::getline(file, line)) {
row.clear();
std::stringstream str(line);
char* delim = new char[separator.size()];
std::strcpy(delim, separator.c_str());
while(std::getline(str, word, *delim)) {
row.push_back(word);
}
delete[] delim;

if(index) {
int key = stoi(row.at(0));
row.erase(row.begin());
dict[key] = row;
} else {
int key = count;
dict[key] = row;
}
count += 1;
}
}
};

#endif
1 change: 1 addition & 0 deletions mini-apps/lbm2d-letkf/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ struct Settings {
bool is_les_ = true;
bool is_reference_ = true; // false for DA cases
bool is_async_ = false; // In order to enable overlapping, in senders/receivers version of letkf
bool is_bcast_on_host_ = false; // broadcast on device or host
double ly_epsilon_ = 1.e-8;

// data assimilation parameter
Expand Down
15 changes: 14 additions & 1 deletion mini-apps/lbm2d-letkf/executors/da_models.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class DA_Model {
Config conf_;
IOConfig io_conf_;
std::string base_dir_name_;
bool load_to_device_ = true;

public:
DA_Model(Config& conf, IOConfig& io_conf) : conf_(conf), io_conf_(io_conf) {
Expand Down Expand Up @@ -53,13 +54,25 @@ class DA_Model {
from_file(data_vars->v_obs(), it);
}

void load(std::unique_ptr<DataVars>& data_vars, const std::string variable, const int it) {
if(variable == "rho") {
from_file(data_vars->rho_obs(), it);
} else if(variable == "u") {
from_file(data_vars->u_obs(), it);
} else if(variable == "v") {
from_file(data_vars->v_obs(), it);
}
}

private:
template <class ViewType>
void from_file(ViewType& value, const int step) {
auto file_name = base_dir_name_ + "/" + value.name() + "_step" + Impl::zfill(step, 10) + ".dat";
auto mdspan = value.host_mdspan();
Impl::from_binary(file_name, mdspan);
value.updateDevice();
if(load_to_device_) {
value.updateDevice();
}
}

};
Expand Down
176 changes: 144 additions & 32 deletions mini-apps/lbm2d-letkf/executors/letkf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@

namespace stdex = std::experimental;

#if defined(ENABLE_OPENMP)
#include <exec/static_thread_pool.hpp>
#else
#include "nvexec/stream_context.cuh"
#endif
#include "nvexec/stream_context.cuh"
#include <exec/static_thread_pool.hpp>
#include <stdexec/execution.hpp>
#include "exec/on.hpp"
#include <exec/async_scope.hpp>
#include <exec/on.hpp>

class LETKF : public DA_Model {
private:
Expand Down Expand Up @@ -49,6 +47,13 @@ class LETKF : public DA_Model {
setFileInfo();

is_async_ = conf_.settings_.is_async_;
// if load_to_device is true, then load data from file to device memory
if(is_async_) {
load_to_device_ = false;
} else {
load_to_device_ = !conf_.settings_.is_bcast_on_host_;
}

auto [nx, ny] = conf_.settings_.n_;
const int n_batch0 = nx * ny;
const int n_stt = conf_.phys_.Q_; // lbm
Expand Down Expand Up @@ -85,6 +90,11 @@ class LETKF : public DA_Model {

void apply(std::unique_ptr<DataVars>& data_vars, const int it, std::vector<Timer*>& timers){
if(it == 0 || it % conf_.settings_.da_interval_ != 0) return;

if(mpi_conf_.is_master()) {
std::cout << __PRETTY_FUNCTION__ << ": t=" << it << std::endl;
}

if(is_async_) {
apply_async(data_vars, it, timers);
} else {
Expand All @@ -95,37 +105,123 @@ class LETKF : public DA_Model {
private:
// Asynchronous implementation with senders/receivers
void apply_async(std::unique_ptr<DataVars>& data_vars, const int it, std::vector<Timer*>& timers) {
timers[TimerEnum::DA]->begin();
#if defined(ENABLE_OPENMP)
exec::static_thread_pool pool{std::thread::hardware_concurrency()};
auto scheduler = pool.get_scheduler();
#else
nvexec::stream_context stream_ctx{};
auto scheduler = stream_ctx.get_scheduler();
#endif
if(mpi_conf_.is_master()) {
std::cout << __PRETTY_FUNCTION__ << ": t=" << it << std::endl;
}

if(mpi_conf_.is_master()) {
timers[DA_Load]->begin();
load(data_vars, it);
timers[DA_Load]->end();
}
exec::async_scope scope;
exec::static_thread_pool io_thread_pool{std::thread::hardware_concurrency()};
auto io_scheduler = io_thread_pool.get_scheduler();
auto _load = stdexec::just() |
stdexec::then([&]{
if(mpi_conf_.is_master()) {
timers[DA_Load]->begin();
load(data_vars, it);
timers[DA_Load]->end();
}
});

timers[TimerEnum::DA]->begin();
scope.spawn(stdexec::on(io_scheduler, std::move(_load)));

// set X
const auto f = data_vars->f().mdspan();
auto xk = xk_.mdspan();
auto xk_buffer = xk_buffer_.mdspan();
auto X = letkf_solver_->X().mdspan();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, f, xk, {2, 0, 1}); // (nx, ny, Q) -> (Q, nx*ny)
timers[DA_Set_Matrix]->end();

timers[DA_All2All]->begin();
all2all(xk, xk_buffer); // xk(n_stt, n_batch, n_ens) -> xk_buffer(n_stt, n_batch, n_ens)
timers[DA_All2All]->end();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, xk_buffer, X, {0, 2, 1});
timers[DA_Set_Matrix]->end();

// set Y
auto yk = yk_.mdspan();
auto yk_buffer = yk_buffer_.mdspan();
auto Y = letkf_solver_->Y().mdspan();

auto [nx, ny] = conf_.settings_.n_;
auto rho = data_vars->rho().mdspan();
auto u = data_vars->u().mdspan();
auto v = data_vars->v().mdspan();

const int y_offset0 = 0;
auto _yk = Impl::reshape(yk, std::array<std::size_t, 3>({n_obs_x_*n_obs_x_, 3, nx*ny}));
Iterate_policy<4> yk_pack_policy4d({0, 0, 0, 0}, {n_obs_x_, n_obs_x_, nx, ny});
timers[DA_Set_Matrix]->begin();
Impl::for_each(yk_pack_policy4d, pack_y_functor(conf_, y_offset0, rho, u, v, _yk));
timers[DA_Set_Matrix]->end();

timers[DA_All2All]->begin();
all2all(yk, yk_buffer); // yk(n_obs, n_batch, n_ens) -> yk_buffer(n_obs, n_batch, n_ens)
timers[DA_All2All]->end();

timers[DA_Set_Matrix]->begin();
Impl::transpose(blas_handle_, yk_buffer, Y, {0, 2, 1}); // (n_obs, n_batch, n_ens) -> (n_obs, n_ens, n_batch)
timers[DA_Set_Matrix]->end();

packX(data_vars, timers);
stdexec::sync_wait( scope.on_empty() );

auto _packY = packY_sender(stdexec::just(), scheduler, data_vars);
auto _all2all = all2all_sender(_packY, data_vars);
stdexec::sync_wait( std::move( _all2all ) );
auto _axpy = letkf_solver_->solve_axpy_sender(scheduler);
if(mpi_conf_.is_master()) {
if(!load_to_device_) {
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
}
}

unpackX(data_vars, timers);
unpackY(data_vars, timers);
// set yo
auto _broadcast = stdexec::just() |
stdexec::then([&]{
if(load_to_device_) {
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
} else {
auto rho_obs = data_vars->rho_obs().host_mdspan();
auto u_obs = data_vars->u_obs().host_mdspan();
auto v_obs = data_vars->v_obs().host_mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

timers[DA_Load_H2D]->begin();
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
timers[DA_Load_H2D]->end();
}
});

auto _axpy_and_braodcast = stdexec::when_all(
std::move(_broadcast),
std::move(_axpy)
);
stdexec::sync_wait( std::move(_axpy_and_braodcast) );

setyo(data_vars, timers);

timers[DA_LETKF]->begin();
letkf_solver_->solve();
letkf_solver_->solve_evd();
timers[DA_LETKF]->end();

timers[DA_Update]->begin();
Expand Down Expand Up @@ -172,11 +268,6 @@ class LETKF : public DA_Model {
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
auto y_obs = letkf_solver_->y_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

const int ny_local = ny/mpi_conf_.size();
const int y_offset = ny_local * mpi_conf_.rank();
Expand Down Expand Up @@ -313,15 +404,36 @@ class LETKF : public DA_Model {
timers[DA_Set_Matrix]->end();

// set yo
if(load_to_device_) {
auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();
} else {
auto rho_obs = data_vars->rho_obs().host_mdspan();
auto u_obs = data_vars->u_obs().host_mdspan();
auto v_obs = data_vars->v_obs().host_mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

timers[DA_Load_H2D]->begin();
data_vars->rho_obs().updateDevice();
data_vars->u_obs().updateDevice();
data_vars->v_obs().updateDevice();
timers[DA_Load_H2D]->end();
}

auto rho_obs = data_vars->rho_obs().mdspan();
auto u_obs = data_vars->u_obs().mdspan();
auto v_obs = data_vars->v_obs().mdspan();
auto y_obs = letkf_solver_->y_obs().mdspan();
timers[DA_Broadcast]->begin();
broadcast(rho_obs);
broadcast(u_obs);
broadcast(v_obs);
timers[DA_Broadcast]->end();

const int ny_local = ny/mpi_conf_.size();
const int y_offset = ny_local * mpi_conf_.rank();
Expand Down
Loading

0 comments on commit f2a9279

Please sign in to comment.