Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/converse.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,11 @@ int CmiGetArgc(char **argv);
int CmiScanf(const char *format, ...);
int CmiError(const char *format, ...);

#define ConverseExit(...) CmiExit(__VA_ARGS__ + 0)
#ifdef __cplusplus
void ConverseExit(int status=0);
#else
void ConverseExit(int status);
#endif
#define CmiMemcpy(dest, src, size) memcpy((dest), (src), (size))

#define setMemoryTypeChare(p) /* empty memory debugging method */
Expand Down
21 changes: 13 additions & 8 deletions src/collectives.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ void CmiReductionsInit(void) {
CpvAccess(_reduction_counter) = 0;

// SETUP NODE-LEVEL REDUCTIONS
CsvInitialize(CmiNodeReduction *, _node_reduction_info);
CsvInitialize(CmiNodeReductionID, _node_reduction_counter);
if(CmiMyRank() == 0)
{
CsvInitialize(CmiNodeReduction *, _node_reduction_info);
CsvInitialize(CmiNodeReductionID, _node_reduction_counter);


auto noderedinfo =
(CmiNodeReduction *)malloc(CmiMaxReductions * sizeof(CmiNodeReduction));
Expand All @@ -239,10 +242,13 @@ void CmiReductionsInit(void) {

// node reduction must be initialized with a valid lock
nodered.lock = CmiCreateLock(); // in non-smp this would just be a nullptr


CsvAccess(_node_reduction_info) = noderedinfo;
CsvAccess(_node_reduction_counter) = 0;
}
CsvAccess(_node_reduction_info) = noderedinfo;
CsvAccess(_node_reduction_counter) = 0;

}

}

// extract reduction ID from message
Expand Down Expand Up @@ -428,16 +434,15 @@ static void CmiClearNodeReduction(CmiReductionID id) {

// lock and unlock are used to support SMP
void CmiNodeReduce(void *msg, int size, CmiReduceMergeFn mergeFn) {
const CmiReductionID id = CmiGetNextNodeReductionID();

CmiNodeReduction nodeRed =
CsvAccess(_node_reduction_info)[CmiGetReductionIndex(CmiGetRedID(msg))];
CsvAccess(_node_reduction_info)[CmiGetReductionIndex(id)];
CmiLock(nodeRed.lock);

const CmiReductionID id = CmiGetNextNodeReductionID();
CmiReduction *red = CmiGetCreateNodeReduction(id);
CmiInternalNodeReduce(msg, size, mergeFn, red);


CmiUnlock(nodeRed.lock);
}

Expand Down
139 changes: 109 additions & 30 deletions src/convcore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ int Cmi_mynodesize; // represents the number of PE's/threads on a single
int Cmi_numnodes; // represents the number of physical nodes/systems machine
int Cmi_nodestart;
std::vector<CmiHandlerInfo> **CmiHandlerTable; // array of handler vectors
std::atomic<int> numPEsReadyForExit {0};
ConverseNodeQueue<void *> *CmiNodeQueue;
CpvDeclare(Queue, CsdSchedQueue);
CsvDeclare(Queue, CsdNodeQueue);
Expand All @@ -49,6 +50,8 @@ int userDrivenMode;
int _replaySystem = 0;
static int CmiMemoryIs_flag=0;
CsvDeclare(CmiIpcManager*, coreIpcManager_);
int Cmi_usched;
int Cmi_initret;

CmiNodeLock CmiMemLock_lock;
CpvDeclare(int, isHelperOn);
Expand Down Expand Up @@ -114,7 +117,7 @@ void CmiCallHandler(int handler, void *msg) {
}
}

void converseRunPe(int rank) {
void converseRunPe(int rank, int everReturn) {
char **CmiMyArgv;
CmiMyArgv = CmiCopyArgs(Cmi_argv);

Expand Down Expand Up @@ -146,50 +149,110 @@ void converseRunPe(int rank) {
if (CmiTraceFn)
CmiTraceFn(Cmi_argv);

// call initial function and start scheduler
Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
CsdScheduler();
/*Converse modes:
* usched=0, initret/everReturn=0: normal mode, converse starts scheduler for you
* usched=1, initret/everReturn=0: user-driven scheduling, user calls scheduler
* in startfn, then exits
* usched=1, initret/everReturn=1: ConverseInit returns without calling startfn
* on rank 0, still calls startfn on other ranks (used by namd)
*/

if(!everReturn)
{
Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
if (Cmi_usched == 0) {
CsdScheduler();
}
CmiFreeArgs(CmiMyArgv);
//todo: add interoperate condition
ConverseExit();
}
else CmiFreeArgs(CmiMyArgv);

/*
// Wait for all PEs on this node to finish
CmiNodeBarrier();

// Finalize comm_backend
comm_backend::exitThread();
// free args
CmiFreeArgs(CmiMyArgv);
*/
}

void CmiStartThreads() {
// allocate global arrayss
Cmi_queues = new ConverseQueue<void *> *[Cmi_mynodesize];
CmiHandlerTable = new std::vector<CmiHandlerInfo> *[Cmi_mynodesize];
CmiNodeQueue = new ConverseNodeQueue<void *>();
//function to exit converse and terminate the program
//waits for all threads to call, then does cleanup on rank 0
void ConverseExit(int exitcode)
{
// increment number of PEs ready for exit
std::atomic_fetch_add_explicit(&numPEsReadyForExit, 1, std::memory_order_release);
// we need everyone to spin unlike old converse to be able to exit threads
while (std::atomic_load_explicit(&numPEsReadyForExit, std::memory_order_acquire) != CmiMyNodeSize()) {
// make progress while waiting so network progress can continue
comm_backend::progress();
}

_smp_mutex = CmiCreateLock();
CmiMemLock_lock = CmiCreateLock();
// At this point all threads on the node are ready to exit. We must perform
// the inter-node barrier while per-thread communication contexts are still
// alive so that progress() can drive completion. To coordinate that,
// rank 0 performs the inter-node barrier and then notifies other threads
// (via an atomic) that the barrier is complete. After the notification,
// every thread performs its per-thread cleanup (exitThread). Finally,
// rank 0 waits for all threads to finish exitThread before tearing down
// the global comm backend and exiting the process.

// make sure the queues are allocated before PEs start sending messages around
comm_backend::barrier();
static std::atomic<int> barrier_done{0};
static std::atomic<int> exitThread_done{0};

std::vector<std::thread> threads;
for (int i = 0; i < Cmi_mynodesize; i++) {
std::thread t(converseRunPe, i);
threads.push_back(std::move(t));
if (CmiMyRank() == 0) {
// participate in the global barrier (blocks until other nodes arrive)
comm_backend::barrier();
// let other local threads know barrier has completed
barrier_done.store(1, std::memory_order_release);
} else {
// other threads help make progress until rank 0 finishes the barrier
while (std::atomic_load_explicit(&barrier_done, std::memory_order_acquire) == 0) {
comm_backend::progress();
}
}

for (auto &thread : threads) {
thread.join();
// Now every thread can clean up its thread-local comm state.
comm_backend::exitThread();

// signal we've finished exitThread()
std::atomic_fetch_add_explicit(&exitThread_done, 1, std::memory_order_release);

if (CmiMyRank() == 0) {
// wait for all local threads to complete their per-thread cleanup. Use
// progress() to avoid deadlock if any backend progress is needed.
while (std::atomic_load_explicit(&exitThread_done, std::memory_order_acquire) != CmiMyNodeSize()) {
comm_backend::progress();
}

// safe to tear down global comm backend and process-wide structures now
comm_backend::exit();
delete[] Cmi_queues;
delete CmiNodeQueue;
delete[] CmiHandlerTable;
Cmi_queues = nullptr;
CmiNodeQueue = nullptr;
CmiHandlerTable = nullptr;
exit(exitcode);
} else {
// Non-rank-0 threads block here indefinitely; rank 0 will terminate the
// process once cleanup is done.
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
}

void CmiStartThreads() {

// Create threads for ranks 1 and up, run rank 0 on main thread (like original Converse)
for (int i = 1; i < Cmi_mynodesize; i++) {
std::thread t(converseRunPe, i, 0); // everReturn is 0 for ranks > 0, meaning these ranks will call the start function and not return from ConverseInit
t.detach();
}

// make sure all PEs are done before we free the queues.
comm_backend::barrier();
comm_backend::exit();
delete[] Cmi_queues;
delete CmiNodeQueue;
delete[] CmiHandlerTable;
Cmi_queues = nullptr;
CmiNodeQueue = nullptr;
CmiHandlerTable = nullptr;
}

// argument form: ./prog +pe <N>
Expand Down Expand Up @@ -253,6 +316,8 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched,
Cmi_argv = argv;
Cmi_startfn = fn;
CharmLibInterOperate = 0;
Cmi_usched = usched;
Cmi_initret = initret;

#ifdef CMK_HAS_PARTITION
CmiCreatePartitions(argv);
Expand All @@ -266,7 +331,21 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usched,
Cmi_nodestartGlobal = _Cmi_mynode_global * Cmi_mynodesize;
#endif

// allocate global arrays
Cmi_queues = new ConverseQueue<void *> *[Cmi_mynodesize];
CmiHandlerTable = new std::vector<CmiHandlerInfo> *[Cmi_mynodesize];
CmiNodeQueue = new ConverseNodeQueue<void *>();

_smp_mutex = CmiCreateLock();
CmiMemLock_lock = CmiCreateLock();

// make sure the queues are allocated before PEs start sending messages around
comm_backend::barrier();

//launch threads on rank 1+
CmiStartThreads();
//run rank 0 on main thread
converseRunPe(0, Cmi_initret);
}

// CMI STATE
Expand Down
2 changes: 1 addition & 1 deletion src/converse_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ typedef struct GroupDef_s {
#define DEBUGF(...) //CmiPrintf(__VA_ARGS__)

void CmiStartThreads(char **argv);
void converseRunPe(int rank);
void converseRunPe(int rank, int everReturn);
void collectiveInit(void);

// HANDLERS
Expand Down
2 changes: 2 additions & 0 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,8 @@ void CsdSchedulePoll() {

int CsdScheduler(int maxmsgs){
if (maxmsgs < 0) {
//reset stop flag
CmiGetState()->stopFlag = 0;
CsdScheduler(); //equivalent to CsdScheduleForever in old converse
}
else CsdSchedulePoll(); //not implementing CsdScheduleCount
Expand Down
2 changes: 1 addition & 1 deletion tests/reduction_node/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
add_reconverse_executable(nodereduction reduction.cpp)
add_test(NAME node-reduction-singlenode COMMAND nodereduction +pe 7)
add_test(NAME node-reduction-multinode COMMAND ${RECONVERSE_TEST_LAUNCHER} -n 4 $<TARGET_FILE:reduction> +pe 8)
add_test(NAME node-reduction-multinode COMMAND ${RECONVERSE_TEST_LAUNCHER} -n 4 $<TARGET_FILE:nodereduction> +pe 8)