Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger latencies + monitoring (v5) #342

Merged
merged 24 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
ab5812e
latency test in RTCM
MRiganSUSX Aug 28, 2024
e6ef2a3
new latency class and first implementation
MRiganSUSX Aug 29, 2024
210456d
Merge branch 'mrigan/new_opmon' into mrigan/new_latency
MRiganSUSX Sep 10, 2024
54c6a8a
fixing merge mistake
MRiganSUSX Sep 10, 2024
8e270a6
add tcproc proto
MRiganSUSX Sep 10, 2024
84fe1fd
Merge branch 'develop' into mrigan/new_latency
MRiganSUSX Sep 10, 2024
a004b17
fixing running_flag globally
MRiganSUSX Sep 10, 2024
3a51e70
adding latency to tcproc, mlt
MRiganSUSX Sep 10, 2024
9155d64
fix for latency get functions and weird time cases
MRiganSUSX Sep 10, 2024
6bd087c
adding comments to proto files
MRiganSUSX Sep 10, 2024
0dcc5f2
reworking latency messages
MRiganSUSX Sep 11, 2024
64e03f1
making latency monitoring configurable
MRiganSUSX Sep 11, 2024
ec83c08
making latency monitoring configurable
MRiganSUSX Sep 12, 2024
a1cbfc6
trigger latency configuration propagation
MRiganSUSX Sep 13, 2024
8120c0a
adding the option for us precision in latency class
MRiganSUSX Sep 13, 2024
0822cc0
fixing incorrect tlog
MRiganSUSX Sep 13, 2024
dc4ea9c
fix for latency class
MRiganSUSX Sep 19, 2024
4201bdb
merging develop
MRiganSUSX Oct 7, 2024
bbddc50
some latency improvements
MRiganSUSX Oct 7, 2024
6b33efd
small rework of latency class
MRiganSUSX Oct 8, 2024
22f6687
latency: making micros the default
MRiganSUSX Oct 9, 2024
da978b7
simplifying latency monitoring
MRiganSUSX Oct 10, 2024
b05c8f2
Merge branch 'develop' into mrigan/new_latency
MRiganSUSX Oct 10, 2024
45905a7
update for latencies for standalone makers
MRiganSUSX Oct 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions include/trigger/Latency.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* @file Latency.hpp
*
* This is part of the DUNE DAQ Application Framework, copyright 2021.
* Licensing/copyright details are in the COPYING file that you should have
* received with this code.
*/

#ifndef TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_
#define TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_

#include <atomic>
#include <chrono>

namespace dunedaq {
namespace trigger {

class Latency {
public:

// Enumeration for selecting time units
enum class TimeUnit { Milliseconds, Microseconds };

// Constructor with optional time unit selection (defaults to Milliseconds)
Latency(TimeUnit time_unit = TimeUnit::Milliseconds)
: m_latency_in(0), m_latency_out(0), m_time_unit(time_unit)
{
// Set the clock tick conversion factor based on time unit
if (m_time_unit == TimeUnit::Milliseconds) {
m_clock_ticks_conversion = 16 * 1e-6; // For milliseconds: 1 tick = 16 * 10^-6 ms
} else {
m_clock_ticks_conversion = 16 * 1e-3;
}
// to convert 62.5MHz clock ticks to ms: 1/62500000 = 0.000000016 <- seconds per tick; 0.000016 <- ms per tick;
// 16*1e-6 <- sci notation
}

// Function to get the current system time in ms or ns based on time unit
uint64_t get_current_system_time() const
{
if (m_time_unit == TimeUnit::Milliseconds) {
return std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
} else {
return std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
}
}

// Function to update latency_in
void update_latency_in(uint64_t latency)
{
m_latency_in.store(latency * m_clock_ticks_conversion);
}

// Function to update latency_out
void update_latency_out(uint64_t latency)
{
m_latency_out.store(latency * m_clock_ticks_conversion);
}

// Function to get the value of latency_in
uint64_t get_latency_in() const
{
if (m_latency_in.load() != 0) {
// in edge cases the TP time was more recent then current sys time...
// this is a catch for that
uint64_t diff = abs(int64_t(get_current_system_time()) - int64_t(m_latency_in.load()));
return diff;
} else {
return 0;
}
}

// Function to get the value of latency_out
uint64_t get_latency_out() const
{
if (m_latency_out.load() != 0) {
uint64_t diff = abs(int64_t(get_current_system_time()) - int64_t(m_latency_out.load()));
return diff;
} else {
return 0;
}
}

private:
std::atomic<uint64_t> m_latency_in; // Member variable to store latency_in
std::atomic<uint64_t> m_latency_out; // Member variable to store latency_out
double m_clock_ticks_conversion; // Dynamically adjusted conversion factor for clock ticks
TimeUnit m_time_unit; // Member variable to store the selected time unit (ms or ns)
};

} // namespace trigger
} // namespace dunedaq

#endif // TRIGGER_INCLUDE_TRIGGER_LATENCY_HPP_
18 changes: 18 additions & 0 deletions plugins/CustomTCMaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ CustomTCMaker::generate_opmon_data()
info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() );

this->publish(std::move(info));

if ( m_running_flag.load() && m_latency_monitoring.load() ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ( m_running_flag.load() && m_latency_monitoring.load() ) {
if ( m_latency_monitoring.load() && m_running_flag.load() ) {

Not an important comment, I'm only writing this because I've learned something new a few weeks back and want to share :) Unnecessary micro-optimisation, but in C++ order inside if statement can matter of operations like &&, || (not true for all operators).
So in your case, whilst we're running, we will always load 2 booleans, even if we don't want monitoring. With the suggestion above, if monitoring is off, we will only load 1 boolean. Matters more task-heavy workflows so probably not here, but maybe in e.g. TPDataProcessor where every little counts.

opmon::TriggerLatency lat_info;

lat_info.set_latency_in( m_latency_instance.get_latency_in() );
lat_info.set_latency_out( m_latency_instance.get_latency_out() );
ArturSztuc marked this conversation as resolved.
Show resolved Hide resolved

this->publish(std::move(lat_info));
}
}

void
Expand All @@ -125,13 +134,20 @@ CustomTCMaker::do_configure(const nlohmann::json& /*obj*/)
//// This parameter controls how many new timestamps are calculated when needed
//// Currently precalculates events for the next 60 seconds
//m_sorting_size_limit = 60 * m_conf0>clock_frequency_hz;

m_latency_monitoring.store( m_conf->get_latency_monitoring_conf()->get_enable_latency_monitoring() );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do_configure is deprecated in v5.
All the config should be inside of void init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg);

Technically there's also a new conf, but only in non-DAQModule classes I think.

}

void
CustomTCMaker::do_start(const nlohmann::json& obj)
{
m_running_flag.store(true);

// OpMon.
m_tc_made_count.store(0);
m_tc_sent_count.store(0);
m_tc_failed_sent_count.store(0);

auto start_params = obj.get<rcif::cmd::StartParams>();

std::string timestamp_method = m_conf->get_timestamp_method();
Expand Down Expand Up @@ -237,6 +253,7 @@ CustomTCMaker::send_trigger_candidates()
}

triggeralgs::TriggerCandidate candidate = create_candidate(m_next_trigger_timestamp, m_tc_timestamps.front().first);
if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( candidate.time_candidate );
m_tc_made_count++;

TLOG_DEBUG(1) << get_name() << " at timestamp " << m_timestamp_estimator->get_timestamp_estimate()
Expand All @@ -245,6 +262,7 @@ CustomTCMaker::send_trigger_candidates()
TCWrapper tcw(candidate);
try {
m_trigger_candidate_sink->send(std::move(tcw), std::chrono::milliseconds(10));
if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( candidate.time_candidate );
m_tc_sent_count++;
m_tc_sent_count_type[m_tc_timestamps.front().first] += 1;
} catch (const ers::Issue& e) {
Expand Down
9 changes: 9 additions & 0 deletions plugins/CustomTCMaker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "appmodel/CustomTCMaker.hpp"
#include "appmodel/CustomTCMakerConf.hpp"
#include "appmodel/LatencyMonitoringConf.hpp"
#include "confmodel/Connection.hpp"

#include "appfwk/ModuleConfiguration.hpp"
Expand All @@ -26,7 +27,9 @@
#include "utilities/TimestampEstimator.hpp"
#include "triggeralgs/TriggerCandidate.hpp"
#include "trigger/TCWrapper.hpp"
#include "trigger/Latency.hpp"
#include "trigger/opmon/customtcmaker_info.pb.h"
#include "trigger/opmon/latency_info.pb.h"

#include <memory>
#include <random>
Expand Down Expand Up @@ -116,6 +119,12 @@ class CustomTCMaker : public dunedaq::appfwk::DAQModule
std::atomic<metric_counter_type> m_tc_sent_count{ 0 };
std::atomic<metric_counter_type> m_tc_failed_sent_count{ 0 };
void print_opmon_stats();

// Create an instance of the Latency class
std::atomic<bool> m_latency_monitoring{ false };
dunedaq::trigger::Latency m_latency_instance;
std::atomic<metric_counter_type> m_latency_in{ 0 };
std::atomic<metric_counter_type> m_latency_out{ 0 };
};
} // namespace trigger
} // namespace dunedaq
Expand Down
27 changes: 27 additions & 0 deletions plugins/MLTModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ MLTModule::init(std::shared_ptr<appfwk::ModuleConfiguration> mcfg)
m_decision_output = get_iom_sender<dfmessages::TriggerDecision>(con->UID());
}

// Latency related
m_latency_monitoring.store( mtrg->get_configuration()->get_latency_monitoring_conf()->get_enable_latency_monitoring() );

// Now do the configuration: dummy for now
m_configured_flag.store(true);
}
Expand Down Expand Up @@ -106,6 +109,21 @@ MLTModule::generate_opmon_data()
td_info.set_inhibited(counts.inhibited.exchange(0));
this->publish( std::move(td_info), {{"type", name}} );
}

// latency
if ( m_running_flag.load() && m_latency_monitoring.load() ) {
// TC in, TD out
opmon::TriggerLatency lat_info;
lat_info.set_latency_in( m_latency_instance.get_latency_in() );
lat_info.set_latency_out( m_latency_instance.get_latency_out() );
this->publish(std::move(lat_info));

// vs readout window requests
opmon::ModuleLevelTriggerRequestLatency lat_request_info;
lat_request_info.set_latency_window_start( m_latency_requests_instance.get_latency_in() );
lat_request_info.set_latency_window_end( m_latency_requests_instance.get_latency_out() );
ArturSztuc marked this conversation as resolved.
Show resolved Hide resolved
this->publish(std::move(lat_request_info));
}
}

void
Expand Down Expand Up @@ -202,6 +220,7 @@ void
MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )
{
m_td_msg_received_count++;
if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( decision.trigger_timestamp );

auto trigger_types = unpack_types(decision.trigger_type);
for ( const auto t : trigger_types ) {
Expand All @@ -223,6 +242,13 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )

try {
m_decision_output->send(std::move(decision), std::chrono::milliseconds(1));

// readout window latency update
if (m_latency_monitoring.load()) {
m_latency_requests_instance.update_latency_in( decision.components.front().window_begin );
m_latency_requests_instance.update_latency_out( decision.components.front().window_end );
}

m_td_sent_count++;

for ( const auto t : trigger_types ) {
Expand Down Expand Up @@ -260,6 +286,7 @@ MLTModule::trigger_decisions_callback(dfmessages::TriggerDecision& decision )
}

}
if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( decision.trigger_timestamp );
m_td_total_count++;
}

Expand Down
12 changes: 12 additions & 0 deletions plugins/MLTModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include "trigger/Issues.hpp"
#include "trigger/LivetimeCounter.hpp"
#include "trigger/TokenManager.hpp"
#include "trigger/Latency.hpp"
#include "trigger/opmon/moduleleveltrigger_info.pb.h"
#include "trigger/opmon/latency_info.pb.h"

#include "appfwk/DAQModule.hpp"

Expand All @@ -26,6 +28,7 @@
#include "appmodel/TCReadoutMap.hpp"
#include "appmodel/ROIGroupConf.hpp"
#include "appmodel/SourceIDConf.hpp"
#include "appmodel/LatencyMonitoringConf.hpp"

#include "confmodel/Connection.hpp"

Expand Down Expand Up @@ -251,6 +254,15 @@ class MLTModule : public dunedaq::appfwk::DAQModule
return m_trigger_counters[type];
}

// Create an instance of the Latency class
std::atomic<bool> m_latency_monitoring{ false };
dunedaq::trigger::Latency m_latency_instance;
dunedaq::trigger::Latency m_latency_requests_instance{dunedaq::trigger::Latency::TimeUnit::Microseconds};
std::atomic<metric_counter_type> m_latency_in{ 0 };
std::atomic<metric_counter_type> m_latency_out{ 0 };
std::atomic<metric_counter_type> m_latency_window_start{ 0 };
std::atomic<metric_counter_type> m_latency_window_end{ 0 };

void print_opmon_stats();
};
} // namespace trigger
Expand Down
18 changes: 18 additions & 0 deletions plugins/RandomTCMakerModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,22 @@ RandomTCMakerModule::generate_opmon_data()
info.set_tc_failed_sent_count( m_tc_failed_sent_count.load() );

this->publish(std::move(info));

if ( m_running_flag.load() && m_latency_monitoring.load() ) {
opmon::TriggerLatency lat_info;

lat_info.set_latency_in( m_latency_instance.get_latency_in() );
lat_info.set_latency_out( m_latency_instance.get_latency_out() );
ArturSztuc marked this conversation as resolved.
Show resolved Hide resolved

this->publish(std::move(lat_info));
}
}

void
RandomTCMakerModule::do_configure(const nlohmann::json& /*obj*/)
{
//m_conf = obj.get<randomtriggercandidatemaker::Conf>();
m_latency_monitoring.store( m_conf->get_latency_monitoring_conf()->get_enable_latency_monitoring() );
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto re. do_configure being deprecated in v5, should move to init.

}

void
Expand All @@ -91,6 +101,11 @@ RandomTCMakerModule::do_start(const nlohmann::json& obj)

m_running_flag.store(true);

// OpMon.
m_tc_made_count.store(0);
m_tc_sent_count.store(0);
m_tc_failed_sent_count.store(0);

std::string timestamp_method = m_conf->get_timestamp_method();
if (timestamp_method == "kTimeSync") {
TLOG_DEBUG(0) << "Creating TimestampEstimator";
Expand Down Expand Up @@ -210,13 +225,16 @@ RandomTCMakerModule::send_trigger_candidates()
}
next_trigger_timestamp = m_timestamp_estimator->get_timestamp_estimate();
triggeralgs::TriggerCandidate candidate = create_candidate(next_trigger_timestamp);

if (m_latency_monitoring.load()) m_latency_instance.update_latency_in( candidate.time_candidate );
m_tc_made_count++;

TLOG_DEBUG(1) << get_name() << " at timestamp " << m_timestamp_estimator->get_timestamp_estimate()
<< ", pushing a candidate with timestamp " << candidate.time_candidate;
TCWrapper tcw(candidate);
try{
m_trigger_candidate_sink->send(std::move(tcw), std::chrono::milliseconds(10));
if (m_latency_monitoring.load()) m_latency_instance.update_latency_out( candidate.time_candidate );
m_tc_sent_count++;
} catch (const ers::Issue& e) {
ers::error(e);
Expand Down
9 changes: 9 additions & 0 deletions plugins/RandomTCMakerModule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "appmodel/RandomTCMakerConf.hpp"
#include "appmodel/RandomTCMakerModule.hpp"
#include "appmodel/LatencyMonitoringConf.hpp"

#include "daqdataformats/SourceID.hpp"
#include "dfmessages/TimeSync.hpp"
Expand All @@ -27,7 +28,9 @@
#include "utilities/TimestampEstimator.hpp"
#include "triggeralgs/TriggerCandidate.hpp"
#include "trigger/TCWrapper.hpp"
#include "trigger/Latency.hpp"
#include "trigger/opmon/randomtcmaker_info.pb.h"
#include "trigger/opmon/latency_info.pb.h"

#include <memory>
#include <random>
Expand Down Expand Up @@ -102,6 +105,12 @@ class RandomTCMakerModule : public dunedaq::appfwk::DAQModule
std::atomic<metric_counter_type> m_tc_sent_count{ 0 };
std::atomic<metric_counter_type> m_tc_failed_sent_count{ 0 };
void print_opmon_stats();

// Create an instance of the Latency class
std::atomic<bool> m_latency_monitoring{ false };
dunedaq::trigger::Latency m_latency_instance;
std::atomic<metric_counter_type> m_latency_in{ 0 };
std::atomic<metric_counter_type> m_latency_out{ 0 };
};
} // namespace trigger
} // namespace dunedaq
Expand Down
4 changes: 0 additions & 4 deletions plugins/TriggerDataHandlerModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ TriggerDataHandlerModule::create_readout(const appmodel::DataHandlerModule* modc
std::string raw_dt = modconf->get_module_configuration()->get_input_data_type();
TLOG() << "Choosing specializations for DataHandlingModel with data_type:" << raw_dt << ']';

TLOG() << "modconf: " << modconf;
TLOG() << modconf->class_name();
TLOG() << modconf->get_module_configuration();

// IF TriggerPrimitive (TP)
if (raw_dt.find("TriggerPrimitive") != std::string::npos) {
TLOG(TLVL_WORK_STEPS) << "Creating readout for TriggerPrimitive";
Expand Down
13 changes: 13 additions & 0 deletions schema/trigger/opmon/latency_info.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package dunedaq.trigger.opmon;

// Message for latency variables
// Latency represents the difference between current system (clock) time and the data time of particular (TX) data object
// Units are ms
ArturSztuc marked this conversation as resolved.
Show resolved Hide resolved
// Used by many trigger modules
message TriggerLatency {
uint32 latency_in = 1;
uint32 latency_out = 2;
}

8 changes: 8 additions & 0 deletions schema/trigger/opmon/moduleleveltrigger_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,11 @@ message TriggerDecisionInfo {
uint32 paused = 4; // Number of paused (triggers are paused)
uint32 inhibited = 5; // Number of inhibited (DFO is busy)
}

// Message for MLT TD requests latency vars
// Latency represents the difference between current system (clock) time and the requested TD readout window (start/end)
// Units are currently us (but use an enum and can be changed)
message ModuleLevelTriggerRequestLatency {
uint32 latency_window_start = 1;
uint32 latency_window_end = 2;
}
Loading
Loading