From b97d824b478a5855005a3aa5e6ae1098fb4424b1 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Sat, 19 Oct 2024 12:51:27 +0800 Subject: [PATCH] sender log Signed-off-by: guo-shaoge --- dbms/src/Common/LooseBoundedMPMCQueue.h | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/dbms/src/Common/LooseBoundedMPMCQueue.h b/dbms/src/Common/LooseBoundedMPMCQueue.h index 6611d9bf37e..c526c1ff43d 100644 --- a/dbms/src/Common/LooseBoundedMPMCQueue.h +++ b/dbms/src/Common/LooseBoundedMPMCQueue.h @@ -50,10 +50,20 @@ class LooseBoundedMPMCQueue , push_callback(std::move(push_callback_)) {} + String getAddr() const + { + std::stringstream ss; + ss << this; + return ss.str(); + } + void registerPipeReadTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_reader_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeReadTask queue size: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + queue.size(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (queue.empty() && status == MPMCQueueStatus::NORMAL) { pipe_reader_cv.registerTask(std::move(task)); @@ -66,7 +76,10 @@ class LooseBoundedMPMCQueue void registerPipeWriteTask(TaskPtr && task) { { + const auto pipe_task_cnt = pipe_writer_cv.getTaskCnt(); std::lock_guard lock(mu); + LOG_DEBUG(Logger::get(), "gjt debug registerPipeWriteTask isFullWithoutLock: {}, status: {}, pipe_task_cnt: {}, workqueue: {}", + isFullWithoutLock(), status == MPMCQueueStatus::NORMAL, pipe_task_cnt, getAddr()); if (isFullWithoutLock() && status == MPMCQueueStatus::NORMAL) { pipe_writer_cv.registerTask(std::move(task)); @@ -259,12 +272,16 @@ class LooseBoundedMPMCQueue private: void notifyOneReader() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneReader: {}", + getAddr()); reader_cv.notify_one(); pipe_reader_cv.notifyOne(); } void notifyOneWriter() { + LOG_DEBUG(Logger::get(), "gjt debug notifyOneWriter: {}", + getAddr()); writer_cv.notify_one(); pipe_writer_cv.notifyOne(); }