Skip to content

Commit

Permalink
Feat[bmqtool]: collect ACK latencies
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeny Malygin <[email protected]>
  • Loading branch information
678098 committed Jan 30, 2025
1 parent c9ee151 commit 0d063fe
Show file tree
Hide file tree
Showing 9 changed files with 275 additions and 219 deletions.
2 changes: 1 addition & 1 deletion src/applications/bmqtool/bmqtool.m.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ int main(int argc, const char* argv[])
? &ta
: bslma::Default::allocator();

Application app(&parameters, &shutdownSemaphore, allocator);
Application app(parameters, &shutdownSemaphore, allocator);
if (app.start() != 0) {
return 2; // RETURN
}
Expand Down
323 changes: 185 additions & 138 deletions src/applications/bmqtool/m_bmqtool_application.cpp

Large diffs are not rendered by default.

47 changes: 34 additions & 13 deletions src/applications/bmqtool/m_bmqtool_application.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <bdlbb_blob.h>
#include <bdlbb_pooledblobbufferfactory.h>
#include <bdlmt_eventscheduler.h>
#include <bdlmt_throttle.h>
#include <bsl_list.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
Expand Down Expand Up @@ -83,9 +84,10 @@ class Application : public bmqa::SessionEventHandler {
bslma::Allocator* d_allocator_p;
// Held, not owned

Parameters* d_parameters_p;
// Command-line parameters. Held, not
// owned
/// Run parameters
/// Copy is made to ensure lifetime of the Parameters object used by this
/// Application
const Parameters d_parameters;

bslmt::Semaphore* d_shutdownSemaphore_p;
// Semaphore holding the main thread
Expand Down Expand Up @@ -132,11 +134,26 @@ class Application : public bmqa::SessionEventHandler {
Interactive d_interactive;
// CLI handler.

bsl::list<bsls::Types::Int64> d_latencies;
// List of all message latencies (in
// ns). Only populated when requested
// to generate a latency report (with
// --latency-report).
/// A throttle object used to control confirm latency logging on a consumer
bdlmt::Throttle d_confirmLatencyThrottle;

/// List of all confirm message latencies (in ns).
/// Confirm message latency is the end-to-end time to deliver a message,
/// starting from producer post and ending on a consumer.
/// Only populated when requested to generate a latency report (with
/// --latency-report).
bsl::list<bsls::Types::Int64> d_confirmLatencies;

/// A throttle object used to control ack latency logging on a consumer
bdlmt::Throttle d_ackLatencyThrottle;

/// List of all ack message latencies (in ns).
/// Ack message latency is the time between posting a message and getting
/// an ACK for it, meaning that the message was at least replicated with
/// a needed quorum (delivery might not have happened yet).
/// Only populated when requested to generate a latency report (with
/// --latency-report).
bsl::list<bsls::Types::Int64> d_ackLatencies;

bsls::AtomicBool d_autoReadInProgress;
// Auto-consume mode only. True if a
Expand Down Expand Up @@ -198,8 +215,12 @@ class Application : public bmqa::SessionEventHandler {
/// Print the final stats to the standard output, at exit time.
void printFinalStats();

/// Generate the latency report.
void generateLatencyReport();
/// Generate the latency report from the specified `latencies`.
/// The specified `name` represents the origin of the latencies, it is
/// either end-to-end latency (producer->consumer) or ack latency
/// (producer->cluster->producer-ack).
void generateLatencyReport(const bsl::list<bsls::Types::Int64>& latencies,
const bslstl::StringRef& name);

/// Do any `pre` run initialization, such as connecting to bmqbrkr,
/// opening a queue, preparing the blob to publish, ... Return 0 on
Expand All @@ -219,9 +240,9 @@ class Application : public bmqa::SessionEventHandler {
// CREATORS

/// Constructor
Application(m_bmqtool::Parameters* parameters,
bslmt::Semaphore* shutdownSemaphore,
bslma::Allocator* allocator);
Application(const m_bmqtool::Parameters& parameters,
bslmt::Semaphore* shutdownSemaphore,
bslma::Allocator* allocator);

~Application() BSLS_KEYWORD_OVERRIDE;

Expand Down
19 changes: 9 additions & 10 deletions src/applications/bmqtool/m_bmqtool_interactive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ void Interactive::processCommand(const StartCommand& command)
<< "<-- session.start(5.0) => " << bmqt::GenericResult::Enum(rc)
<< " (" << rc << ")";

if (d_parameters_p->noSessionEventHandler()) {
if (d_parameters.noSessionEventHandler()) {
BALL_LOG_INFO << "Creating processing threads";
for (int i = 0; i < d_parameters_p->numProcessingThreads(); ++i) {
for (int i = 0; i < d_parameters.numProcessingThreads(); ++i) {
bslmt::ThreadUtil::Handle threadHandle;
rc = bslmt::ThreadUtil::create(
&threadHandle,
Expand Down Expand Up @@ -218,7 +218,7 @@ void Interactive::processCommand(const StopCommand& command)

BALL_LOG_INFO << "<-- session.stop()";

if (d_parameters_p->noSessionEventHandler()) {
if (d_parameters.noSessionEventHandler()) {
// Join on all threads
BALL_LOG_INFO << "Joining event handler threads";
for (size_t i = 0; i < d_eventHandlerThreads.size(); ++i) {
Expand Down Expand Up @@ -733,7 +733,7 @@ void Interactive::processCommand(const BatchPostCommand& command)
}

bsl::shared_ptr<PostingContext> postingContext =
d_poster_p->createPostingContext(d_session_p, &parameters, queueId);
d_poster_p->createPostingContext(d_session_p, parameters, queueId);

while (postingContext->pendingPost()) {
postingContext->postNext();
Expand Down Expand Up @@ -887,20 +887,19 @@ void Interactive::eventHandlerThread()
BALL_LOG_INFO << "EventHandlerThread terminated";
}

Interactive::Interactive(Parameters* parameters,
Interactive::Interactive(const Parameters& parameters,
Poster* poster,
bslma::Allocator* allocator)
: d_session_p(0)
, d_sessionEventHandler_p(0)
, d_parameters_p(parameters)
, d_parameters(parameters)
, d_uris(allocator)
, d_eventHandlerThreads(allocator)
, d_producerIdProperty("** NONE **", allocator)
, d_poster_p(poster)
, d_allocator_p(allocator)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(parameters);
BSLS_ASSERT_SAFE(poster);

bdls::ProcessUtil::getProcessName(&d_producerIdProperty); // ignore rc
Expand Down Expand Up @@ -1088,7 +1087,7 @@ void Interactive::onMessage(const bmqa::Message& message)

void Interactive::onOpenQueueStatus(const bmqa::OpenQueueStatus& status)
{
if (d_parameters_p->verbosity() != ParametersVerbosity::e_SILENT) {
if (d_parameters.verbosity() != ParametersVerbosity::e_SILENT) {
BALL_LOG_INFO << "==> OPEN_QUEUE_RESULT received: " << status;
}

Expand All @@ -1112,7 +1111,7 @@ void Interactive::onOpenQueueStatus(const bmqa::OpenQueueStatus& status)
void Interactive::onConfigureQueueStatus(
const bmqa::ConfigureQueueStatus& status)
{
if (d_parameters_p->verbosity() != ParametersVerbosity::e_SILENT) {
if (d_parameters.verbosity() != ParametersVerbosity::e_SILENT) {
BALL_LOG_INFO << "==> CONFIGURE_QUEUE_RESULT received: " << status;
}

Expand All @@ -1126,7 +1125,7 @@ void Interactive::onConfigureQueueStatus(

void Interactive::onCloseQueueStatus(const bmqa::CloseQueueStatus& status)
{
if (d_parameters_p->verbosity() != ParametersVerbosity::e_SILENT) {
if (d_parameters.verbosity() != ParametersVerbosity::e_SILENT) {
BALL_LOG_INFO << "==> CLOSE_QUEUE_RESULT received: " << status;
}

Expand Down
7 changes: 4 additions & 3 deletions src/applications/bmqtool/m_bmqtool_interactive.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ class Interactive {
// Session handler to use for events
// processing, if using custom event handler

Parameters* d_parameters_p;
// Parameters to use
/// Parameters to use, the object under this reference is owned by
/// the Application that uses this Interactive object
const Parameters& d_parameters;

bslmt::Mutex d_mutex;
// Mutex to protect below map
Expand Down Expand Up @@ -171,7 +172,7 @@ class Interactive {

/// Constructor using the specified `parameters`, 'poster',
/// and `allocator`.
Interactive(Parameters* parameters,
Interactive(const Parameters& parameters,
Poster* poster,
bslma::Allocator* allocator);

Expand Down
2 changes: 1 addition & 1 deletion src/applications/bmqtool/m_bmqtool_parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ bool Parameters::from(bsl::ostream& stream,
return true;
}

void Parameters::dump(bsl::ostream& stream)
void Parameters::dump(bsl::ostream& stream) const
{
print(stream, 0, -1);
}
Expand Down
6 changes: 3 additions & 3 deletions src/applications/bmqtool/m_bmqtool_parameters.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,14 @@ class Parameters {
/// error in the specified `stream` and return false on failure.
bool from(bsl::ostream& stream, const CommandLineParameters& params);

/// Do a nicer pretty print of all the parameters aligned.
void dump(bsl::ostream& stream);

/// Validate the consistency of all settings.
bool validate(bsl::string* error);

// ACCESSORS

/// Do a nicer pretty print of all the parameters aligned.
void dump(bsl::ostream& stream) const;

/// Format this object to the specified output `stream` at the (absolute
/// value of) the optionally specified indentation `level` and return a
/// reference to `stream`. If `level` is specified, optionally specify
Expand Down
76 changes: 33 additions & 43 deletions src/applications/bmqtool/m_bmqtool_poster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ BALL_LOG_SET_NAMESPACE_CATEGORY("BMQTOOL.POSTER");

PostingContext::PostingContext(
bmqa::Session* session,
Parameters* parameters,
const Parameters& parameters,
const bmqa::QueueId& queueId,
FileLogger* fileLogger,
bmqst::StatContext* statContext,
Expand All @@ -48,30 +48,28 @@ PostingContext::PostingContext(
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_timeBufferFactory_p(timeBufferFactory)
, d_parameters_p(parameters)
, d_parameters(parameters)
, d_session_p(session)
, d_fileLogger(fileLogger)
, d_statContext_p(statContext)
, d_remainingEvents(d_parameters_p->eventsCount())
, d_remainingEvents(d_parameters.eventsCount())
, d_numMessagesPosted(0)
, d_msgUntilNextTimestamp(0)
, d_blob(bufferFactory, d_allocator_p)
, d_queueId(queueId, d_allocator_p)
, d_properties(d_allocator_p)
, d_autoIncrementedValue(0)
{
BSLS_ASSERT_SAFE(session);
BSLS_ASSERT_SAFE(parameters);
BSLS_ASSERT_SAFE(d_queueId.isValid());

InputUtil::populateProperties(&d_properties,
d_parameters_p->messageProperties());
d_parameters.messageProperties());

// Prepare the blob that we will post over and over again
if (d_parameters_p->sequentialMessagePattern().empty()) {
int msgPayloadSize = d_parameters_p->msgSize();
if (d_parameters.sequentialMessagePattern().empty()) {
int msgPayloadSize = d_parameters.msgSize();

if (d_parameters_p->latency() != ParametersLatency::e_NONE) {
if (d_parameters.latency() != ParametersLatency::e_NONE) {
// To optimize, if asked to insert latency, we put in a
// first blob of 8 bytes that will be swapped out at every
// post with a new timestamp value.
Expand All @@ -97,7 +95,7 @@ bool PostingContext::pendingPost() const
{
// eventsCount() == 0 means endless posting,
// otherwise check if remainingEvents is positive
return d_parameters_p->eventsCount() == 0 || d_remainingEvents > 0;
return d_parameters.eventsCount() == 0 || d_remainingEvents > 0;
}

void PostingContext::postNext()
Expand All @@ -107,9 +105,9 @@ void PostingContext::postNext()
bmqa::MessageEventBuilder eventBuilder;
d_session_p->loadMessageEventBuilder(&eventBuilder);

for (int evtId = 0; evtId < d_parameters_p->postRate() && pendingPost();
for (int evtId = 0; evtId < d_parameters.postRate() && pendingPost();
++evtId) {
if (d_parameters_p->eventSize() == 0) {
if (d_parameters.eventSize() == 0) {
// To get nice stats chart with round numbers in bench mode, we
// usually start with eventSize == 0; however posting Events
// with 0 message in them cause an assert or an error to spew,
Expand All @@ -118,64 +116,56 @@ void PostingContext::postNext()
}

eventBuilder.reset();
for (bsl::uint64_t msgId = 0; msgId < d_parameters_p->eventSize();
for (bsl::uint64_t msgId = 0; msgId < d_parameters.eventSize();
++msgId, ++d_numMessagesPosted) {
bmqa::Message& msg = eventBuilder.startMessage();
int length = 0;

// Set a correlationId if queue is open in ACK mode
if (bmqt::QueueFlagsUtil::isAck(d_parameters_p->queueFlags())) {
msg.setCorrelationId(bmqt::CorrelationId::autoValue());
if (bmqt::QueueFlagsUtil::isAck(d_parameters.queueFlags())) {
bsls::Types::Int64 nowNs =
(d_parameters.latency() != ParametersLatency::e_NONE)
? StatUtil::getNowAsNs(d_parameters.latency())
: 0;

// Correlation Ids might be non-unique, and we use this quality
// to store possibly overlapping send timestamps. It allows us
// to calculate ack latencies.
bmqt::CorrelationId cId(nowNs);
msg.setCorrelationId(cId);
}

if (!d_parameters_p->sequentialMessagePattern().empty()) {
if (!d_parameters.sequentialMessagePattern().empty()) {
char buffer[128];
length = snprintf(
buffer,
sizeof(buffer),
d_parameters_p->sequentialMessagePattern().c_str(),
d_parameters.sequentialMessagePattern().c_str(),
d_numMessagesPosted);
msg.setDataRef(buffer, length);
}
else {
// Insert latency if required...
if (d_parameters_p->latency() != ParametersLatency::e_NONE) {
bdlb::BigEndianInt64 timeNs;

if (d_msgUntilNextTimestamp != 0) {
--d_msgUntilNextTimestamp;
timeNs = bdlb::BigEndianInt64::make(0);
}
else {
// Insert the timestamp
timeNs = bdlb::BigEndianInt64::make(
StatUtil::getNowAsNs(d_parameters_p->latency()));

// Update the number of messages until next
// timestamp:
int nbMsgPerSec = d_parameters_p->eventSize() *
d_parameters_p->postRate() * 1000 /
d_parameters_p->postInterval();
d_msgUntilNextTimestamp = nbMsgPerSec *
k_LATENCY_INTERVAL_MS / 1000;
}
if (d_parameters.latency() != ParametersLatency::e_NONE) {
bdlb::BigEndianInt64 postTime = bdlb::BigEndianInt64::make(
StatUtil::getNowAsNs(d_parameters.latency()));

bdlbb::BlobBuffer buffer;
d_timeBufferFactory_p->allocate(&buffer);
buffer.setSize(sizeof(bdlb::BigEndianInt64));
bsl::memcpy(buffer.buffer().get(),
&timeNs,
sizeof(timeNs));
&postTime,
sizeof(postTime));
d_blob.swapBufferRaw(0, &buffer);
}
msg.setDataRef(&d_blob);

length = d_blob.length();
}

if (!d_parameters_p->autoIncrementedField().empty()) {
if (!d_parameters.autoIncrementedField().empty()) {
d_properties.setPropertyAsInt64(
d_parameters_p->autoIncrementedField(),
d_parameters.autoIncrementedField(),
d_autoIncrementedValue++);
}

Expand Down Expand Up @@ -218,7 +208,7 @@ void PostingContext::postNext()
d_statContext_p->adjustValue(k_STAT_EVT,
eventImpl->rawEvent().blob()->length());

if (d_parameters_p->eventsCount() > 0) {
if (d_parameters.eventsCount() > 0) {
--d_remainingEvents;
}
}
Expand All @@ -240,7 +230,7 @@ Poster::Poster(FileLogger* fileLogger,

bsl::shared_ptr<PostingContext>
Poster::createPostingContext(bmqa::Session* session,
Parameters* parameters,
const Parameters& parameters,
const bmqa::QueueId& queueId)
{
return bsl::make_shared<PostingContext>(session,
Expand Down
Loading

0 comments on commit 0d063fe

Please sign in to comment.