Skip to content

Commit

Permalink
Support printing srt statistics in json format (#62)
Browse files Browse the repository at this point in the history
  • Loading branch information
IvanPysmenni authored Nov 23, 2022
1 parent 4053355 commit c197de3
Show file tree
Hide file tree
Showing 21 changed files with 262 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@
[submodule "submodule/spdlog"]
path = submodule/spdlog
url = https://github.com/gabime/spdlog.git
[submodule "submodule/nlohmann_json"]
path = submodule/nlohmann_json
url = https://github.com/nlohmann/json.git
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ SET(ENABLE_SHARED OFF)
SET(ENABLE_ENCRYPTION ON)
add_subdirectory(submodule/srt)

set(JSON_BuildTests OFF CACHE INTERNAL "")
add_subdirectory(submodule/nlohmann_json)

#set_target_properties(srt-live-transmit
# PROPERTIES
# ARCHIVE_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib"
Expand Down
1 change: 1 addition & 0 deletions submodule/nlohmann_json
Submodule nlohmann_json added at bc889a
3 changes: 2 additions & 1 deletion xtransmit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ set (VIRTUAL_srtsupport $<TARGET_OBJECTS:srtsupport_virtual>)
target_link_libraries(srt-xtransmit
PRIVATE CLI11::CLI11
PRIVATE spdlog::spdlog
PRIVATE ${TARGET_srt}_static ${VIRTUAL_srtsupport} ${LINKSTDCPP_FS})
PRIVATE ${TARGET_srt}_static ${VIRTUAL_srtsupport} ${LINKSTDCPP_FS}
PRIVATE nlohmann_json::nlohmann_json)



Expand Down
3 changes: 2 additions & 1 deletion xtransmit/file-receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ void start_filereceiver(future<shared_srt> connection, const config& cfg,
{
this_thread::sleep_for(interval);

logfile_stats << sock->statistics_csv(print_header) << flush;
logfile_stats << sock->get_statistics(cfg.stats_format, print_header) << flush;
print_header = false;
}
};
Expand Down Expand Up @@ -223,6 +223,7 @@ CLI::App* xtransmit::file::receive::add_subcommand(CLI::App& app, config& cfg, s
sc_file_recv->add_option("dst", cfg.dst_path, "Destination path to file/folder");
sc_file_recv->add_option("--segment", cfg.segment_size, "Size of the transmission segment");
sc_file_recv->add_option("--statsfile", cfg.stats_file, "output stats report filename");
sc_file_recv->add_option("--statsformat", cfg.stats_format, "output stats report format (json, csv)");
sc_file_recv->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)")
->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE));

Expand Down
1 change: 1 addition & 0 deletions xtransmit/file-receive.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace xtransmit::file::receive
size_t segment_size = 1456 * 1000;
int stats_freq_ms = 0;
std::string stats_file;
std::string stats_format = "csv";
};


Expand Down
3 changes: 2 additions & 1 deletion xtransmit/file-send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ void start_filesender(future<shared_srt> connection, const config& cfg,
{
this_thread::sleep_for(interval);

logfile_stats << sock->statistics_csv(print_header) << flush;
logfile_stats << sock->get_statistics(cfg.stats_format, print_header) << flush;
print_header = false;
}
};
Expand Down Expand Up @@ -297,6 +297,7 @@ CLI::App* xtransmit::file::send::add_subcommand(CLI::App& app, config& cfg, stri
sc_file_send->add_flag("--printout", cfg.only_print, "Print files found in a folder ad subfolders. No transfer.");
sc_file_send->add_option("--segment", cfg.segment_size, "Size of the transmission segment");
sc_file_send->add_option("--statsfile", cfg.stats_file, "output stats report filename");
sc_file_send->add_option("--statsformat", cfg.stats_format, "output stats report format (json, csv)");
sc_file_send->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)")
->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE));

Expand Down
1 change: 1 addition & 0 deletions xtransmit/file-send.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace xtransmit::file::send
bool only_print = false; // Do not transfer, just enumerate files and print to stdout
int stats_freq_ms = 0;
std::string stats_file;
std::string stats_format = "csv";
};


Expand Down
1 change: 1 addition & 0 deletions xtransmit/generate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ CLI::App* xtransmit::generate::add_subcommand(CLI::App& app, config& cfg, std::v
sc_generate->add_option("--duration", cfg.duration, "Sending duration in seconds (supresses --num option)")
->transform(CLI::AsNumberWithUnit(to_sec, CLI::AsNumberWithUnit::CASE_SENSITIVE));
sc_generate->add_option("--statsfile", cfg.stats_file, "output stats report filename");
sc_generate->add_option("--statsformat", cfg.stats_format, "output stats report format (json, csv)");
sc_generate->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)")
->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE));
sc_generate->add_flag("--twoway", cfg.two_way, "Both send and receive data");
Expand Down
2 changes: 1 addition & 1 deletion xtransmit/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void common_run(const vector<string>& urls, const stats_config& cfg, bool reconn
// make_unique is not supported by GCC 4.8, only starting from GCC 4.9 :(
try {
stats = unique_ptr<socket::stats_writer>(
new socket::stats_writer(cfg.stats_file, milliseconds(cfg.stats_freq_ms)));
new socket::stats_writer(cfg.stats_file, cfg.stats_format, milliseconds(cfg.stats_freq_ms)));
}
catch (const socket::exception& e)
{
Expand Down
1 change: 1 addition & 0 deletions xtransmit/misc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ struct stats_config
{
int stats_freq_ms = 0;
std::string stats_file;
std::string stats_format = "csv";
};


Expand Down
1 change: 1 addition & 0 deletions xtransmit/receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ CLI::App* xtransmit::receive::add_subcommand(CLI::App& app, config& cfg, std::ve
sc_receive->add_option("-i,--input,src", src_urls, "Source URI");
sc_receive->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload");
sc_receive->add_option("--statsfile", cfg.stats_file, "output stats report filename");
sc_receive->add_option("--statsformat", cfg.stats_format, "output stats report format (json, csv)");
sc_receive->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)")
->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE));
sc_receive->add_flag("--printmsg", cfg.print_notifications, "print message into to stdout");
Expand Down
3 changes: 2 additions & 1 deletion xtransmit/route.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void xtransmit::route::run(const vector<string>& src_urls, const vector<string>&
const bool write_stats = cfg.stats_file != "" && cfg.stats_freq_ms > 0;
// make_unique is not supported by GCC 4.8, only starting from GCC 4.9 :(
unique_ptr<socket::stats_writer> stats = write_stats
? unique_ptr<socket::stats_writer>(new socket::stats_writer(cfg.stats_file, milliseconds(cfg.stats_freq_ms)))
? unique_ptr<socket::stats_writer>(new socket::stats_writer(cfg.stats_file, cfg.stats_format, milliseconds(cfg.stats_freq_ms)))
: nullptr;

shared_sock dst = create_connection(parsed_dst_urls);
Expand Down Expand Up @@ -125,6 +125,7 @@ CLI::App* xtransmit::route::add_subcommand(CLI::App& app, config& cfg, vector<st
sc_route->add_option("--msgsize", cfg.message_size, "Size of a buffer to receive message payload");
sc_route->add_flag("--bidir", cfg.bidir, "Enable bidirectional transmission");
sc_route->add_option("--statsfile", cfg.stats_file, "output stats report filename");
sc_route->add_option("--statsformat", cfg.stats_format, "output stats report format (json, csv)");
sc_route->add_option("--statsfreq", cfg.stats_freq_ms, "output stats report frequency (ms)")
->transform(CLI::AsNumberWithUnit(to_ms, CLI::AsNumberWithUnit::CASE_SENSITIVE));

Expand Down
1 change: 1 addition & 0 deletions xtransmit/route.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace xtransmit {
bool bidir = false;
int stats_freq_ms = 0;
std::string stats_file;
std::string stats_format = "csv";
};


Expand Down
2 changes: 1 addition & 1 deletion xtransmit/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class isocket
*
* @throws socket::exception Thrown on failure.
*/
virtual const std::string statistics_csv(bool print_header) const { return std::string(); }
virtual const std::string get_statistics(std::string statistic_format, bool print_header) const { return std::string(); }


virtual SOCKET id() const = 0;
Expand Down
13 changes: 8 additions & 5 deletions xtransmit/socket_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ using namespace std;
using namespace xtransmit;
using namespace std::chrono;

xtransmit::socket::stats_writer::stats_writer(const std::string& filename, const std::chrono::milliseconds& interval)
xtransmit::socket::stats_writer::stats_writer(const std::string& filename, const std::string& format, const std::chrono::milliseconds& interval)
: m_logfile(filename.c_str())
, m_format(format)
, m_interval(interval)
{
if (!m_logfile)
Expand Down Expand Up @@ -77,6 +78,7 @@ future<void> xtransmit::socket::stats_writer::launch()
auto print_stats = [](map<SOCKET, shared_sock>& sock_vector,
ofstream& out,
mutex& stats_lock,
string format,
bool print_header)
{
#ifdef ENABLE_CXX17
Expand All @@ -97,8 +99,8 @@ future<void> xtransmit::socket::stats_writer::launch()
try
{
if (print_header)
out << s->statistics_csv(true);
out << s->statistics_csv(false) << flush;
out << s->get_statistics(format, true);
out << s->get_statistics(format, false) << flush;
print_header = false;
}
catch (const socket::exception& e)
Expand Down Expand Up @@ -128,19 +130,20 @@ future<void> xtransmit::socket::stats_writer::launch()

auto stats_func = [&print_stats](map<SOCKET, shared_sock>& sock_vector,
ofstream& out,
string& format,
const milliseconds interval,
mutex& stats_lock,
const atomic_bool& stop_stats) {
bool print_header = true;

while (!stop_stats)
{
print_header = print_stats(sock_vector, out, stats_lock, print_header);
print_header = print_stats(sock_vector, out, stats_lock, format, print_header);

// No lock on stats_lock while sleeping
this_thread::sleep_for(interval);
}
};

return async(::launch::async, stats_func, ref(m_sock), ref(m_logfile), m_interval, ref(m_lock), ref(m_stop));
return async(::launch::async, stats_func, ref(m_sock), ref(m_logfile), ref(m_format), m_interval, ref(m_lock), ref(m_stop));
}
3 changes: 2 additions & 1 deletion xtransmit/socket_stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace socket
class stats_writer
{
public:
stats_writer(const std::string& filename, const std::chrono::milliseconds& interval);
stats_writer(const std::string& filename, const std::string& format, const std::chrono::milliseconds& interval);
~stats_writer();

public:
Expand All @@ -36,6 +36,7 @@ class stats_writer
using shared_sock = std::shared_ptr<socket::isocket>;
std::atomic<bool> m_stop;
std::ofstream m_logfile;
std::string m_format;
std::map<SOCKET, shared_sock> m_sock;
std::future<void> m_stat_future;
const std::chrono::milliseconds m_interval;
Expand Down
74 changes: 72 additions & 2 deletions xtransmit/srt_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include "socketoptions.hpp"
#include "apputil.hpp"

// nlohmann_json
#include <nlohmann/json.hpp>

using namespace std;
using namespace xtransmit;
using shared_srt = shared_ptr<socket::srt>;
Expand Down Expand Up @@ -560,11 +563,78 @@ const string socket::srt::stats_to_csv(int socketid, const SRT_TRACEBSTATS& stat
#undef HAS_UNIQUE_PKTS
}

const string socket::srt::statistics_csv(bool print_header) const
const nlohmann::json socket::srt::stats_to_json(int socketid, const SRT_TRACEBSTATS& stats)
{
nlohmann::json root;

#define HAS_PKT_REORDER_TOL (SRT_VERSION_MAJOR >= 1) && (SRT_VERSION_MINOR >= 4) && (SRT_VERSION_PATCH > 0)
// pktSentUnique, pktRecvUnique were added in SRT v1.4.2
#define HAS_UNIQUE_PKTS (SRT_VERSION_MAJOR == 1) && ((SRT_VERSION_MINOR > 4) || ((SRT_VERSION_MINOR == 4) && (SRT_VERSION_PATCH >= 2)))

#ifdef HAS_PUT_TIME
root["Timepoint"] = print_timestamp_now();
#endif

root["Time"] = stats.msTimeStamp;
root["SocketID"] = socketid;
root["pktFlowWindow"] = stats.pktFlowWindow;
root["pktCongestionWindow"] = stats.pktCongestionWindow;
root["pktFlightSize"] = stats.pktFlightSize;
root["msRTT"] = stats.msRTT;
root["mbpsBandwidth"] = stats.mbpsBandwidth;
root["mbpsMaxBW"] = stats.mbpsMaxBW;
root["pktSent"] = stats.pktSent;
root["pktSndLoss"] = stats.pktSndLoss;
root["pktSndDrop"] = stats.pktSndDrop;
root["pktRetrans"] = stats.pktRetrans;
root["byteSent"] = stats.byteSent;
root["byteAvailSndBuf"] = stats.byteAvailSndBuf;
root["byteSndDrop"] = stats.byteSndDrop;
root["mbpsSendRate"] = stats.mbpsSendRate;
root["usPktSndPeriod"] = stats.usPktSndPeriod;
root["msSndBuf"] = stats.msSndBuf;
root["pktRecv"] = stats.pktRecv;
root["pktRcvLoss"] = stats.pktRcvLoss;
root["pktRcvDrop"] = stats.pktRcvDrop;
root["mbpsRecvRate"] = stats.mbpsRecvRate;
root["msRcvBuf"] = stats.msRcvBuf;
root["msRcvTsbPdDelay"] = stats.msRcvTsbPdDelay;

#if HAS_PKT_REORDER_TOL
root["pktReorderTolerance"] = stats.pktReorderTolerance;
#endif

#if HAS_UNIQUE_PKTS
root["pktSentUnique"] = stats.pktSentUnique;
root["pktRecvUnique"] = stats.pktRecvUnique;
#endif

return root;
}

const string socket::srt::get_statistics(string stats_format, bool print_header) const
{
SRT_TRACEBSTATS stats;
if (SRT_ERROR == srt_bstats(m_bind_socket, &stats, true))
raise_exception("statistics");

return stats_to_csv(m_bind_socket, stats, print_header);
if(stats_format == "json")
{
if(print_header){
// JSON format doesn't have header. Return empty string
return "";
}
nlohmann::json root;
root["ConnStats"] = stats_to_json(m_bind_socket, stats);
root["LinksStats"] = nullptr; // No need for the array of links because only one link exists.
return root.dump() + "\n";
}
else
{
if(stats_format != "csv")
{
spdlog::warn(LOG_SOCK_SRT "{} format is not supported. csv format will be used instead", stats_format);
}
return stats_to_csv(m_bind_socket, stats, print_header);
}
}
14 changes: 9 additions & 5 deletions xtransmit/srt_socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "uriparser.hpp"
#include "netinet_any.h"

// nlohmann_json
#include <nlohmann/json_fwd.hpp>

namespace xtransmit
{
namespace socket
Expand Down Expand Up @@ -91,11 +94,12 @@ class srt
bool is_caller() const final { return m_mode == CALLER; }

public:
SOCKET id() const final { return m_bind_socket; }
int statistics(SRT_TRACEBSTATS& stats, bool instant = true);
bool supports_statistics() const final { return true; }
const std::string statistics_csv(bool print_header) const final;
static const std::string stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, bool print_header);
SOCKET id() const final { return m_bind_socket; }
int statistics(SRT_TRACEBSTATS& stats, bool instant = true);
bool supports_statistics() const final { return true; }
const std::string get_statistics(std::string stats_format, bool print_header) const final;
static const std::string stats_to_csv(int socketid, const SRT_TRACEBSTATS& stats, bool print_header);
static const nlohmann::json stats_to_json(int socketid, const SRT_TRACEBSTATS& stats);

private:
void raise_exception(const string&& place) const;
Expand Down
Loading

0 comments on commit c197de3

Please sign in to comment.