diff --git a/.gitignore b/.gitignore index ad85b4c..e827c54 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ *.a *.d tracer/traceR +bin/traceR # hide backups *~ diff --git a/tracer/Makefile b/tracer/Makefile index 45b73e4..b4d5df0 100644 --- a/tracer/Makefile +++ b/tracer/Makefile @@ -24,7 +24,7 @@ all: traceR PREFIX = .. -SRCS = p2p-events.C coll-events.C tracer-driver.C +SRCS = p2p-events.C coll-events.C tracer-driver.C qos-manager.C OBJS = ${SRCS:.C=.o} traceR: ${OBJS} components diff --git a/tracer/coll-events.C b/tracer/coll-events.C index c9827ba..58a9524 100644 --- a/tracer/coll-events.C +++ b/tracer/coll-events.C @@ -8,6 +8,7 @@ extern "C" { #include "codes/model-net.h" +#include "codes/model-net-sched.h" #include "codes/lp-io.h" #include "codes/codes.h" #include "codes/codes_mapping.h" @@ -16,6 +17,9 @@ extern "C" { } #include "tracer-driver.h" +#include "qos-manager.h" + +extern QoSManager qosManager; // the indexing should match between the #define and the lookUpTable #define TRACER_A2A 0 @@ -155,6 +159,8 @@ void enqueue_coll_msg( m_local.proc_event_type = lookUpTable[index].local_event; m_local.executed.taskid = ns->my_pe->currentCollTask; + int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), dest); + model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio); model_net_event(net_id, "coll", pe_to_lpid(dest, ns->my_job), size, sendOffset + copyTime*(isEager?1:0), sizeof(proc_msg), (const void*)&m_remote, sizeof(proc_msg), &m_local, lp); @@ -241,6 +247,8 @@ void handle_coll_recv_post_event( size = m->msgId.size; m_remote.msgId.size = size; } + int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), m->msgId.pe); + model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio); model_net_event(net_id, "coll", pe_to_lpid(m->msgId.pe, ns->my_job), size, nic_delay, sizeof(proc_msg), (const void*)&m_remote, sizeof(proc_msg), &m_local, lp); diff --git a/tracer/p2p-events.C b/tracer/p2p-events.C index 5497107..ea00cc8 100644 --- a/tracer/p2p-events.C +++ b/tracer/p2p-events.C @@ -8,6 +8,7 @@ extern "C" { #include "codes/model-net.h" +#include "codes/model-net-sched.h" #include "codes/lp-io.h" #include "codes/codes.h" #include "codes/codes_mapping.h" @@ -16,6 +17,9 @@ extern "C" { } #include "tracer-driver.h" +#include "qos-manager.h" + +extern QoSManager qosManager; void handle_recv_event( proc_state * ns, @@ -841,6 +845,8 @@ int send_msg( #endif m_remote.iteration = iter; + int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), lpid_to_pe(dest_id)); + model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio); /* model_net_event params: int net_id, char* category, tw_lpid final_dest_lp, uint64_t message_size, tw_stime offset, int remote_event_size, @@ -877,6 +883,8 @@ void enqueue_msg( #endif m_remote.iteration = iter; + int prio = qosManager.getServiceLevel(ns->my_job, lpid_to_pe(lp->id), lpid_to_pe(dest_id)); + model_net_set_msg_param(MN_MSG_PARAM_SCHED, MN_SCHED_PARAM_PRIO, (void*)&prio); model_net_event(net_id, "p2p", dest_id, size, sendOffset, sizeof(proc_msg), (const void*)&m_remote, sizeof(proc_msg), m_local, lp); diff --git a/tracer/qos-manager.C b/tracer/qos-manager.C new file mode 100644 index 0000000..c0e0ff0 --- /dev/null +++ b/tracer/qos-manager.C @@ -0,0 +1,48 @@ +#include "qos-manager.h" + +#include +#include + +using namespace std; + +bool QoSManager::readQoSFileForJob(int job, const char filename[]) { + ifstream qosFile(filename); + if (!qosFile) { + fprintf(stderr, "Cannot read QoS file %s\n", filename); + return false; + } + ServiceLevel defaultSL; + qosFile >> defaultSL; + if(qosFile.fail()) { + fprintf(stderr, "Error reading default service level from QoS file %s\n", filename); + return false; + } + jobs[job].defaultSL = defaultSL; + + Rank rank; + ServiceLevel sl; + while(qosFile >> rank >> sl) { + jobs[job].serviceLevels[rank] = sl; + } + return true; +} + +QoSManager::ServiceLevel QoSManager::getServiceLevel(Job j, Rank s, Rank d) { + JobQoSMap::iterator job = jobs.find(j); + if(job != jobs.end()) { + ServiceLevelMap::iterator src = job->second.serviceLevels.find(s); + if (src != job->second.serviceLevels.end()) { + return src->second; + } + ServiceLevelMap::iterator dest = job->second.serviceLevels.find(d); + if (dest != job->second.serviceLevels.end()) { + return dest->second; + } + else { + return job->second.defaultSL; + } + } + else { + return overallDefaultSL; + } +} diff --git a/tracer/qos-manager.h b/tracer/qos-manager.h new file mode 100644 index 0000000..50f01ef --- /dev/null +++ b/tracer/qos-manager.h @@ -0,0 +1,30 @@ +#ifndef _QOS_MANAGER_H_ +#define _QOS_MANAGER_H_ + +#include + +class QoSManager { +public: + typedef int Job; + typedef int Rank; + typedef int ServiceLevel; + +private: + typedef std::map ServiceLevelMap; + struct JobQoS { + ServiceLevel defaultSL; + ServiceLevelMap serviceLevels; + }; + typedef std::map JobQoSMap; + + ServiceLevel overallDefaultSL; + JobQoSMap jobs; + +public: + QoSManager(ServiceLevel overallDefaultSL): overallDefaultSL(overallDefaultSL) {} + void setDefaultServiceLevel(ServiceLevel defaultSL) { overallDefaultSL = defaultSL; } + bool readQoSFileForJob(Job job, const char filename[]); + ServiceLevel getServiceLevel(Job job, Rank src, Rank dest); +}; + +#endif diff --git a/tracer/tracer-driver.C b/tracer/tracer-driver.C index 314fc94..e629c37 100644 --- a/tracer/tracer-driver.C +++ b/tracer/tracer-driver.C @@ -37,11 +37,13 @@ extern "C" { } #include "tracer-driver.h" +#include "qos-manager.h" char tracer_input[256]; /* filename for tracer input file */ CoreInf *global_rank; /* core to job ID and process ID */ JobInf *jobs; +QoSManager qosManager(0); int default_mapping; int total_ranks; tw_stime *jobTimes; @@ -179,6 +181,14 @@ int main(int argc, char **argv) printf("Eager limit is %f bytes\n", eager_limit); int ret; + int default_sl; + ret = configuration_get_value_int(&config, "PARAMS", "default_sl", NULL, + &default_sl); + if (ret == 0) { + qosManager.setDefaultServiceLevel(default_sl); + printf("Default service level is %d\n", default_sl); + } + /* set up the output directory */ if(lp_io_dir[0]) { ret = lp_io_prepare(lp_io_dir, 0, &handle, MPI_COMM_WORLD); @@ -241,24 +251,43 @@ int main(int argc, char **argv) total_ranks = 0; /* read per job information */ + #define LINE_SIZE 1024 + char line[LINE_SIZE]; + fgets(line, LINE_SIZE, jobIn); // Eat everything to the next newline for(int i = 0; i < num_jobs; i++) { + fgets(line, LINE_SIZE, jobIn); + FILE * lineStream = fmemopen(line, strlen(line), "r"); #if TRACER_BIGSIM_TRACES char tempTrace[200]; - fscanf(jobIn, "%s", tempTrace); + fscanf(lineStream, "%s", tempTrace); sprintf(jobs[i].traceDir, "%s%s", tempTrace, "/bgTrace"); #else - fscanf(jobIn, "%s", jobs[i].traceDir); + fscanf(lineStream, "%s", jobs[i].traceDir); #endif - fscanf(jobIn, "%s", jobs[i].map_file); - fscanf(jobIn, "%d", &jobs[i].numRanks); /* number of processes */ - fscanf(jobIn, "%d", &jobs[i].numIters); /* number of repetitions */ + fscanf(lineStream, "%s", jobs[i].map_file); + fscanf(lineStream, "%d", &jobs[i].numRanks); /* number of processes */ + fscanf(lineStream, "%d", &jobs[i].numIters); /* number of repetitions */ + + // See if there is a QoS file + char qosFile[LINE_SIZE] = ""; + if (fscanf(lineStream, "%s", qosFile) == 1) { + if(!qosManager.readQoSFileForJob(i, qosFile)) { + // The QoS Manager already printed an error message, so just quit + MPI_Abort(MPI_COMM_WORLD, 1); + } + } + total_ranks += jobs[i].numRanks; jobs[i].rankMap = (int*) malloc(jobs[i].numRanks * sizeof(int)); jobs[i].skipMsgId = -1; jobTimes[i] = 0; if(!rank) { - printf("Job %d - ranks %d, trace folder %s, rank file %s, iters %d\n", + printf("Job %d - ranks %d, trace folder %s, rank file %s, iters %d", i, jobs[i].numRanks, jobs[i].traceDir, jobs[i].map_file, jobs[i].numIters); + if(qosFile[0] != '\0') { + printf(", QoS file %s", qosFile); + } + printf("\n"); } } @@ -322,6 +351,7 @@ int main(int argc, char **argv) next = ' '; fscanf(jobIn, "%c", &next); } + fclose(jobIn); int ranks_till_now = 0; for(int i = 0; i < num_jobs && !dump_topo_only; i++) {