Skip to content

Commit

Permalink
Parallel run with one proc w/o partition file
Browse files Browse the repository at this point in the history
  • Loading branch information
stcui007 committed Feb 2, 2024
1 parent 52f4354 commit 4a92129
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 38 deletions.
14 changes: 14 additions & 0 deletions include/core/Partition_Data.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef PARTITION_DATA_H
#define PARTITION_DATA_H

using Tuple = std::tuple<int, std::string, std::string, std::string>;

struct PartitionData
{
int mpi_world_rank;
std::unordered_set<std::string> catchment_ids;
std::unordered_set<std::string> nexus_ids;
std::vector<Tuple> remote_connections;
};

#endif //PARTITION_DATA_H
47 changes: 47 additions & 0 deletions include/core/Partition_One.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#ifndef PARTITION_ONE_H
#define PARTITION_ONE_H

#ifdef NGEN_MPI_ACTIVE

#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <unordered_set>
#include <FeatureBuilder.hpp>
#include "features/Features.hpp"
#include <FeatureCollection.hpp>
#include "Partition_Data.hpp"

class Partition_One {

public:
Partition_One() {};

void generate_partition(geojson::GeoJSON& catchment_collection)
{
int counter = 0;
for(auto& feature: *catchment_collection)
{
std::string cat_id = feature->get_id();
catchment_ids.emplace(cat_id);
std::string nex_id = feature->get_property("toid").as_string();
nexus_ids.emplace(nex_id);
counter++;
}
std::cout << "counter = " << counter << std::endl;
}

virtual ~Partition_One(){};

PartitionData partition_data;

private:
int mpi_world_rank;
std::unordered_set<std::string> catchment_ids;
std::unordered_set<std::string> nexus_ids;
std::vector<Tuple> remote_connections;
};

#endif // NGEN_MPI_ACTIVE
#endif // PARTITION_ONE_H
13 changes: 1 addition & 12 deletions include/core/Partition_Parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,7 @@
#include "features/Features.hpp"
#include <FeatureCollection.hpp>
#include "JSONProperty.hpp"

using Tuple = std::tuple<int, std::string, std::string, std::string>;

//This struct is moved from private section to here so that the unit test function can access it
struct PartitionData
{
int mpi_world_rank;
std::unordered_set<std::string> catchment_ids;
std::unordered_set<std::string> nexus_ids;
std::vector<Tuple> remote_connections;
};

#include "Partition_Data.hpp"

class Partitions_Parser {

Expand Down
80 changes: 54 additions & 26 deletions src/NGen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ int mpi_rank = 0;
#include "core/Partition_Parser.hpp"
#include <HY_Features_MPI.hpp>

#include "core/Partition_One.hpp"

std::string PARTITION_PATH = "";
int mpi_num_procs;
#endif // NGEN_MPI_ACTIVE
Expand Down Expand Up @@ -249,10 +251,16 @@ int main(int argc, char *argv[]) {
REALIZATION_CONFIG_PATH = argv[5];

#ifdef NGEN_MPI_ACTIVE

// Initalize MPI
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_num_procs);

if (argc >= 7) {
PARTITION_PATH = argv[6];
}
else {
else if (mpi_num_procs > 1) {
std::cout << "Missing required argument for partition file path." << std::endl;
exit(-1);
}
Expand All @@ -261,17 +269,11 @@ int main(int argc, char *argv[]) {
if (strcmp(argv[7], MPI_HF_SUB_CLI_FLAG) == 0) {
is_subdivided_hydrofabric_wanted = true;
}
else {
else if (mpi_num_procs > 1) {
std::cout << "Unexpected arg '" << argv[7] << "'; try " << MPI_HF_SUB_CLI_FLAG << std::endl;
exit(-1);
}
}

// Initalize MPI
MPI_Init(NULL, NULL);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_num_procs);

#endif // NGEN_MPI_ACTIVE

#ifdef WRITE_PID_FILE_FOR_GDB_SERVER
Expand Down Expand Up @@ -336,20 +338,24 @@ int main(int argc, char *argv[]) {
std::cout << "Building Nexus collection" << std::endl;

#ifdef NGEN_MPI_ACTIVE
Partitions_Parser partition_parser(PARTITION_PATH);
// TODO: add something here to make sure this step worked for every rank, and maybe to checksum the file
partition_parser.parse_partition_file();

std::vector<PartitionData> &partitions = partition_parser.partition_ranks;
PartitionData &local_data = partitions[mpi_rank];
if (!nexus_subset_ids.empty()) {
std::cerr << "Warning: CLI provided nexus subset will be ignored when using partition config";
}
if (!catchment_subset_ids.empty()) {
std::cerr << "Warning: CLI provided catchment subset will be ignored when using partition config";
PartitionData local_data_tmp;
if (mpi_num_procs > 1) {
Partitions_Parser partition_parser(PARTITION_PATH);
// TODO: add something here to make sure this step worked for every rank, and maybe to checksum the file
partition_parser.parse_partition_file();

std::vector<PartitionData> &partitions = partition_parser.partition_ranks;
PartitionData local_data_tmp = partitions[mpi_rank];
if (!nexus_subset_ids.empty()) {
std::cerr << "Warning: CLI provided nexus subset will be ignored when using partition config";
}
if (!catchment_subset_ids.empty()) {
std::cerr << "Warning: CLI provided catchment subset will be ignored when using partition config";
}
nexus_subset_ids = std::vector<std::string>(local_data_tmp.nexus_ids.begin(), local_data_tmp.nexus_ids.end());
catchment_subset_ids = std::vector<std::string>(local_data_tmp.catchment_ids.begin(), local_data_tmp.catchment_ids.end());
}
nexus_subset_ids = std::vector<std::string>(local_data.nexus_ids.begin(), local_data.nexus_ids.end());
catchment_subset_ids = std::vector<std::string>(local_data.catchment_ids.begin(), local_data.catchment_ids.end());
PartitionData& local_data = local_data_tmp;
#endif // NGEN_MPI_ACTIVE

// TODO: Instead of iterating through a collection of FeatureBase objects mapping to nexi, we instead want to iterate through HY_HydroLocation objects
Expand All @@ -376,17 +382,20 @@ int main(int argc, char *argv[]) {
} else {
catchment_collection = geojson::read(catchmentDataFile, catchment_subset_ids);
}
std::cout << "Building Nexus collection" << std::endl;

for(auto& feature: *catchment_collection)
{
//feature->set_id(feature->get_property("ID").as_string());
std::cout << "cat feature: " << feature << std::endl;
//feature->set_id(feature->get_property("id").as_string());
nexus_collection->add_feature(feature);
//std::cout<<"Catchment "<<feature->get_id()<<" -> Nexus "<<feature->get_property("toID").as_string()<<std::endl;
std::cout<<"Catchment "<<feature->get_id()<<" -> Nexus "<<feature->get_property("toid").as_string()<<std::endl;
}
//Update the feature ids for the combined collection, using the alternative property 'id'
//to map features to their primary id as well as the alternative property
nexus_collection->update_ids("id");
std::cout<<"Initializing formulations\n";
//std::cout<<"Initializing formulations\n";
std::cout<<"Initializing formulations" << std::endl;
std::shared_ptr<realization::Formulation_Manager> manager = std::make_shared<realization::Formulation_Manager>(REALIZATION_CONFIG_PATH);
manager->read(catchment_collection, utils::getStdOut());

Expand All @@ -406,10 +415,17 @@ int main(int argc, char *argv[]) {
}
}
#endif //NGEN_ROUTING_ACTIVE
std::cout<<"Building Feature Index\n";
std::cout<<"Building Feature Index" <<std::endl;;
std::string link_key = "toid";
nexus_collection->link_features_from_property(nullptr, &link_key);

#ifdef NGEN_MPI_ACTIVE
Partition_One partition_one;
//mpirun with one processor without partition file
if (mpi_num_procs == 1) {
partition_one.generate_partition(catchment_collection);
local_data = partition_one.partition_data;
}
hy_features::HY_Features_MPI features = hy_features::HY_Features_MPI(local_data, nexus_collection, manager, mpi_rank, mpi_num_procs);
#else
hy_features::HY_Features features = hy_features::HY_Features(nexus_collection, manager);
Expand All @@ -425,7 +441,11 @@ int main(int argc, char *argv[]) {
//Still hacking nexus output for the moment
for(const auto& id : features.nexuses()) {
#ifdef NGEN_MPI_ACTIVE
if (!features.is_remote_sender_nexus(id)) {
if (mpi_num_procs > 1) {
if (!features.is_remote_sender_nexus(id)) {
nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc);
}
} else {
nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc);
}
#else
Expand Down Expand Up @@ -472,13 +492,21 @@ int main(int argc, char *argv[]) {
// make a new simulation time object with a different output interval
Simulation_Time sim_time(*manager->Simulation_Time_Object, time_steps[i]);

/*
for ( std::string id : features.catchments(keys[i]) ) { cat_ids.push_back(id); }
if (keys[i] != 0 )
{
layers[i] = std::make_shared<ngen::Layer>(desc, cat_ids, sim_time, features, catchment_collection, 0);
}
*/
for ( std::string id : features.catchments(keys[i]) ) { cat_ids.push_back(id); }
if (keys[i] != 0 )
{
layers[i] = std::make_shared<ngen::Layer>(desc, cat_ids, sim_time, features, catchment_collection, 0);
}
else
{
//layers[i] = std::make_shared<ngen::SurfaceLayer>(desc, cat_ids, sim_time, features, catchment_collection, 0, nexus_subset_ids, nexus_outfiles);
layers[i] = std::make_shared<ngen::SurfaceLayer>(desc, cat_ids, sim_time, features, catchment_collection, 0, nexus_subset_ids, nexus_outfiles);
}

Expand Down

0 comments on commit 4a92129

Please sign in to comment.