Skip to content

Commit

Permalink
multithreading support
Browse files Browse the repository at this point in the history
  • Loading branch information
hariharan-devarajan committed Sep 30, 2024
1 parent a9cf909 commit 7538d23
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 50 deletions.
7 changes: 6 additions & 1 deletion src/dftracer/df_logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,9 @@
template <>
std::shared_ptr<DFTLogger> dftracer::Singleton<DFTLogger>::instance = nullptr;
template <>
bool dftracer::Singleton<DFTLogger>::stop_creating_instances = false;
bool dftracer::Singleton<DFTLogger>::stop_creating_instances = false;

thread_local uint32_t DFTLogger::level = 0;
thread_local std::vector<int> DFTLogger::index_stack = std::vector<int>();
thread_local std::unordered_map<std::string, uint16_t>
DFTLogger::computed_hash = std::unordered_map<std::string, uint16_t>();
78 changes: 43 additions & 35 deletions src/dftracer/df_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <dftracer/writer/chrome_writer.h>
#include <libgen.h>
#include <sys/time.h>
#include <threads.h>
#include <time.h>
#include <unistd.h>

Expand Down Expand Up @@ -40,11 +41,11 @@ class DFTLogger {
bool is_init, dftracer_tid;
ProcessID process_id;
std::shared_ptr<dftracer::ChromeWriter> writer;
uint32_t level;
std::vector<int> index_stack;
std::unordered_map<std::string, uint16_t> computed_hash;
std::atomic_int index;
bool has_entry;
thread_local static uint32_t level;
thread_local static std::vector<int> index_stack;
thread_local static std::unordered_map<std::string, uint16_t> computed_hash;
#ifdef DFTRACER_MPI_ENABLE
bool mpi_event;
#endif
Expand Down Expand Up @@ -74,9 +75,6 @@ class DFTLogger {
DFTLogger(bool init_log = false)
: is_init(false),
dftracer_tid(false),
level(0),
index_stack(),
computed_hash(),
index(0),
has_entry(false),
#ifdef DFTRACER_MPI_ENABLE
Expand All @@ -99,7 +97,7 @@ class DFTLogger {
}
this->is_init = true;
}
~DFTLogger() { index_stack.clear(); }
~DFTLogger() {}
inline void update_log_file(std::string log_file, std::string exec_name,
std::string cmd, ProcessID process_id = -1) {
DFTRACER_LOG_DEBUG("DFTLogger.update_log_file %s", log_file.c_str());
Expand All @@ -122,18 +120,16 @@ class DFTLogger {
char thread_name[128];
auto size = sprintf(thread_name, "%lu", this->process_id);
thread_name[size] = '\0';
this->enter_event();
auto current_index = this->enter_event();
this->writer->log_metadata(
index_stack[level - 1], thread_name, METADATA_NAME_THREAD_NAME,
current_index, thread_name, METADATA_NAME_THREAD_NAME,
METADATA_NAME_THREAD_NAME, this->process_id, tid);
this->exit_event();
std::unordered_map<std::string, std::any>* meta = nullptr;
std::unordered_map<std::string, std::any> *meta = nullptr;
if (include_metadata) {
meta = new std::unordered_map<std::string, std::any>();
cmd_hash =
hash_and_store(cmd.data(), METADATA_NAME_STRING_HASH);
exec_hash =
hash_and_store(exec_name.data(), METADATA_NAME_STRING_HASH);
cmd_hash = hash_and_store(cmd.data(), METADATA_NAME_STRING_HASH);
exec_hash = hash_and_store(exec_name.data(), METADATA_NAME_STRING_HASH);

meta->insert_or_assign("version", DFTRACER_VERSION);
meta->insert_or_assign("exec_hash", exec_hash);
Expand All @@ -150,7 +146,7 @@ class DFTLogger {
this->log("start", "dftracer", this->get_time(), 0, meta);
this->exit_event();
if (include_metadata) {
delete(meta);
delete (meta);
}
if (enable_core_affinity) {
#ifdef DFTRACER_HWLOC_ENABLE
Expand All @@ -169,11 +165,11 @@ class DFTLogger {
if (dftracer_tid) {
tid = df_gettid() + this->process_id;
}
this->enter_event();
this->writer->log_metadata(index_stack[level - 1], "core_affinity",
all_stream.str().c_str(),
METADATA_NAME_PROCESS, this->process_id,
tid, false);
auto current_index = this->enter_event();

this->writer->log_metadata(
current_index, "core_affinity", all_stream.str().c_str(),
METADATA_NAME_PROCESS, this->process_id, tid, false);
this->exit_event();
}
}
Expand All @@ -184,17 +180,33 @@ class DFTLogger {
DFTRACER_LOG_INFO("Writing trace to %s", log_file.c_str());
}

inline void enter_event() {
inline int enter_event() {
index++;
level++;
index_stack.push_back(index.load());
int current_index = index.load();
index_stack.push_back(current_index);
return current_index;
}

inline void exit_event() {
level--;
index_stack.pop_back();
}

inline int get_parent() {
if (level > 1 && index_stack.size() > 1) {
return index_stack[level - 2];
}
return -1;
}

inline int get_current() {
if (level > 0 && index_stack.size() > 0) {
return index_stack[level - 1];
}
return -1;
}

inline TimeResolution get_time() {
DFTRACER_LOG_DEBUG("DFTLogger.get_time", "");
struct timeval tv {};
Expand All @@ -217,10 +229,7 @@ class DFTLogger {
}
if (metadata != nullptr) {
metadata->insert_or_assign("level", level);
int parent_index_value = -1;
if (level > 1) {
parent_index_value = index_stack[level - 2];
}
int parent_index_value = get_parent();
metadata->insert_or_assign("p_idx", parent_index_value);
}
#ifdef DFTRACER_MPI_ENABLE
Expand All @@ -231,17 +240,17 @@ class DFTLogger {
this->writer != nullptr) {
int rank = 0;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
this->enter_event();
auto current_index = this->enter_event();
this->writer->log_metadata(
index_stack[level - 1], "rank", std::to_string(rank).c_str(),
current_index, "rank", std::to_string(rank).c_str(),
METADATA_NAME_PROCESS, this->process_id, tid);
this->exit_event();
char process_name[1024];
auto size = sprintf(process_name, "Rank %d", rank);
process_name[size] = '\0';
this->enter_event();
current_index = this->enter_event();
this->writer->log_metadata(
index_stack[level - 1], process_name, METADATA_NAME_PROCESS_NAME,
current_index, process_name, METADATA_NAME_PROCESS_NAME,
METADATA_NAME_PROCESS_NAME, this->process_id, tid);
this->exit_event();

Expand All @@ -252,9 +261,8 @@ class DFTLogger {

if (this->writer != nullptr) {
if (include_metadata) {
this->writer->log(index_stack[level - 1], event_name, category,
start_time, duration, metadata, this->process_id,
tid);
this->writer->log(get_current(), event_name, category, start_time,
duration, metadata, this->process_id, tid);
} else {
this->writer->log(local_index, event_name, category, start_time,
duration, metadata, this->process_id, tid);
Expand Down Expand Up @@ -286,8 +294,8 @@ class DFTLogger {
if (dftracer_tid) {
tid = df_gettid();
}
this->enter_event();
this->writer->log_metadata(index_stack[level - 1], file,
auto current_index = this->enter_event();
this->writer->log_metadata(current_index, file,
std::to_string(hash).c_str(), name,
this->process_id, tid, false);
this->exit_event();
Expand Down
7 changes: 5 additions & 2 deletions src/dftracer/writer/chrome_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void dftracer::ChromeWriter::convert_json(
TimeResolution start_time, TimeResolution duration,
std::unordered_map<std::string, std::any> *metadata, ProcessID process_id,
ThreadID thread_id) {
auto previous_index = current_index;
size_t previous_index;

(void)previous_index;
char is_first_char[3] = " ";
Expand Down Expand Up @@ -218,6 +218,7 @@ void dftracer::ChromeWriter::convert_json(
}
{
std::unique_lock<std::shared_mutex> lock(mtx);
previous_index = current_index;
auto written_size = sprintf(
buffer.data() + current_index,
R"(%s{"id":%d,"name":"%s","cat":"%s","pid":%lu,"tid":%lu,"ts":%llu,"dur":%llu,"ph":"X","args":{"hhash":%d%s}})",
Expand All @@ -230,6 +231,7 @@ void dftracer::ChromeWriter::convert_json(
} else {
{
std::unique_lock<std::shared_mutex> lock(mtx);
previous_index = current_index;
auto written_size = sprintf(
buffer.data() + current_index,
R"(%s{"id":%d,"name":"%s","cat":"%s","pid":%lu,"tid":%lu,"ts":%llu,"dur":%llu,"ph":"X"})",
Expand All @@ -248,13 +250,14 @@ void dftracer::ChromeWriter::convert_json_metadata(
int index, ConstEventNameType name, ConstEventNameType value,
ConstEventNameType ph, ProcessID process_id, ThreadID thread_id,
bool is_string) {
auto previous_index = current_index;
size_t previous_index = 0;

(void)previous_index;
char is_first_char[3] = " ";
if (!is_first_write) is_first_char[0] = '\0';
{
std::unique_lock<std::shared_mutex> lock(mtx);
previous_index = current_index;
auto written_size = 0;
if (is_string) {
written_size = sprintf(
Expand Down
22 changes: 10 additions & 12 deletions src/dftracer/writer/chrome_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ChromeWriter {

FILE *fh;
uint16_t hostname_hash;
static const int MAX_LINE_SIZE = 16*1024L;
static const int MAX_LINE_SIZE = 16 * 1024L;
size_t write_buffer_size;

size_t current_index;
Expand All @@ -55,22 +55,20 @@ class ChromeWriter {

bool is_first_write;
inline size_t write_buffer_op(bool force = false) {
std::unique_lock<std::shared_mutex> lock(mtx);
if (current_index == 0 || (!force && current_index < write_buffer_size))
return 0;
DFTRACER_LOG_DEBUG("ChromeWriter.write_buffer_op %s",
this->filename.c_str());
size_t written_elements = 0;
{
std::unique_lock<std::shared_mutex> lock(mtx);
flockfile(fh);
written_elements = fwrite(buffer.data(), current_index, sizeof(char), fh);
current_index = 0;
funlockfile(fh);
}

flockfile(fh);
written_elements = fwrite(buffer.data(), current_index, sizeof(char), fh);
current_index = 0;
funlockfile(fh);
if (written_elements != 1) { // GCOVR_EXCL_START
DFTRACER_LOG_ERROR(
"unable to log write only %ld of %d trying to write %d with error code "
"unable to log write only %ld of %d trying to write %zu with error "
"code "
"%d",
written_elements, 1, current_index, errno);
} // GCOVR_EXCL_STOP
Expand Down Expand Up @@ -111,8 +109,8 @@ class ChromeWriter {
ProcessID process_id, ThreadID tid);

void log_metadata(int index, ConstEventNameType name,
ConstEventNameType value, ConstEventNameType ph, ProcessID process_id,
ThreadID tid, bool is_string = true);
ConstEventNameType value, ConstEventNameType ph,
ProcessID process_id, ThreadID tid, bool is_string = true);

void finalize(bool has_entry);
};
Expand Down

0 comments on commit 7538d23

Please sign in to comment.