From 0953813c91ab83902cc2d4b2942c2f14158c4f55 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 5 Feb 2025 05:22:37 +0000 Subject: [PATCH] pipe readsome Signed-off-by: dentiny --- src/ray/util/pipe_logger.cc | 71 +++++++++++++++++++++----- src/ray/util/pipe_logger.h | 6 +++ src/ray/util/tests/pipe_logger_test.cc | 25 +++++++-- 3 files changed, 86 insertions(+), 16 deletions(-) diff --git a/src/ray/util/pipe_logger.cc b/src/ray/util/pipe_logger.cc index 886b066216ea..422d7d2a9e5f 100644 --- a/src/ray/util/pipe_logger.cc +++ b/src/ray/util/pipe_logger.cc @@ -34,6 +34,21 @@ namespace ray { namespace { +// Default pipe log read buffer size. +constexpr size_t kDefaultPipeLogReadBufSize = 1024; + +size_t GetPipeLogReadSizeOrDefault() { + // TODO(hjiang): Write a util function `GetEnvOrDefault`. + const char *var_value = std::getenv(kPipeLogReadBufSizeEnv.data()); + if (var_value != nullptr) { + size_t read_buf_size = 0; + if (absl::SimpleAtoi(var_value, &read_buf_size) && read_buf_size > 0) { + return read_buf_size; + } + } + return kDefaultPipeLogReadBufSize; +} + struct StreamDumper { absl::Mutex mu; bool stopped ABSL_GUARDED_BY(mu) = false; @@ -56,24 +71,56 @@ void StartStreamDump( stream_dumper = stream_dumper]() { SetThreadName("PipeReaderThd"); - std::string newline; + const size_t buf_size = GetPipeLogReadSizeOrDefault(); + // Pre-allocate stream buffer to avoid excessive syscall. + // TODO(hjiang): Should resize without initialization. + std::string readsome_buffer(buf_size, '\0'); + // Logging are written in lines, `last_line` records part of the strings left in + // last `read` syscall. + std::string last_line; + + std::string cur_new_line{"a"}; + while (pipe_instream->read(cur_new_line.data(), /*count=*/1)) { + // Read available bytes in non-blocking style. + while (true) { + auto bytes_read = + pipe_instream->readsome(readsome_buffer.data(), readsome_buffer.length()); + if (bytes_read == 0) { + break; + } + std::string_view cur_readsome_buffer{readsome_buffer.data(), + static_cast(bytes_read)}; + cur_new_line += cur_readsome_buffer; + } + + // After we read a chunk of bytes continuously, split into lines, send to write + // thread and multiple sinks. + std::vector newlines = absl::StrSplit(cur_new_line, '\n'); + for (size_t idx = 0; !newlines.empty() && idx < newlines.size() - 1; ++idx) { + std::string cur_segment = std::move(last_line); + last_line.clear(); + cur_segment += newlines[idx]; + cur_segment += '\n'; + + absl::MutexLock lock(&stream_dumper->mu); + stream_dumper->content.emplace_back(std::move(cur_segment)); + } - // Exit at pipe read EOF. - while (std::getline(*pipe_instream, newline)) { - // Backfill newliner for current segment. - if (!pipe_instream->eof()) { - newline += '\n'; + // Record last segment. + if (!newlines.empty()) { + last_line = newlines.back(); } - absl::MutexLock lock(&stream_dumper->mu); - stream_dumper->content.emplace_back(std::move(newline)); + // Read later bytes in blocking style. + cur_new_line = "a"; } - RAY_CHECK(pipe_instream->eof()); - { - absl::MutexLock lock(&stream_dumper->mu); - stream_dumper->stopped = true; + // Reached EOF. + absl::MutexLock lock(&stream_dumper->mu); + if (!last_line.empty()) { + stream_dumper->content.emplace_back(std::move(last_line)); } + stream_dumper->stopped = true; }).detach(); std::thread([stream_dumper = stream_dumper, diff --git a/src/ray/util/pipe_logger.h b/src/ray/util/pipe_logger.h index 4d75742d3f76..e1e3d5c694cb 100644 --- a/src/ray/util/pipe_logger.h +++ b/src/ray/util/pipe_logger.h @@ -32,6 +32,12 @@ namespace ray { +// Environmenr variable, which indicates the pipe size of read. +// +// TODO(hjiang): Should document the env variable after end-to-end integration has +// finished. +inline constexpr std::string_view kPipeLogReadBufSizeEnv = "RAY_PIPE_LOG_READ_BUF_SIZE"; + // File handle requires active destruction via owner calling [Close]. class RedirectionFileHandle { public: diff --git a/src/ray/util/tests/pipe_logger_test.cc b/src/ray/util/tests/pipe_logger_test.cc index f8bf32248c5c..ff35ba32b1dd 100644 --- a/src/ray/util/tests/pipe_logger_test.cc +++ b/src/ray/util/tests/pipe_logger_test.cc @@ -35,7 +35,18 @@ namespace { constexpr std::string_view kLogLine1 = "hello\n"; constexpr std::string_view kLogLine2 = "world\n"; -TEST(PipeLoggerTest, RedirectionTest) { +class PipeReadBufferSizeSetter { + public: + PipeReadBufferSizeSetter(size_t pipe_buffer_size) { + setEnv(kPipeLogReadBufSizeEnv.data(), absl::StrFormat("%d", pipe_buffer_size).data()); + } + ~PipeReadBufferSizeSetter() { unsetEnv(kPipeLogReadBufSizeEnv.data()); } +}; + +class PipeLoggerTest : public ::testing::TestWithParam {}; + +TEST_P(PipeLoggerTest, RedirectionTest) { + PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()}; ScopedTemporaryDirectory scoped_directory; const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4(); @@ -59,7 +70,8 @@ TEST(PipeLoggerTest, RedirectionTest) { EXPECT_EQ(*actual_content, expected_content); } -TEST(PipeLoggerTestWithTee, RedirectionWithTee) { +TEST_P(PipeLoggerTest, RedirectionWithTee) { + PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()}; ScopedTemporaryDirectory scoped_directory; const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4(); @@ -90,7 +102,8 @@ TEST(PipeLoggerTestWithTee, RedirectionWithTee) { EXPECT_EQ(*actual_content, absl::StrFormat("%s%s", kLogLine1, kLogLine2)); } -TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) { +TEST_P(PipeLoggerTest, RotatedRedirectionWithTee) { + PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()}; ScopedTemporaryDirectory scoped_directory; const auto uuid = GenerateUUIDV4(); const auto test_file_path = scoped_directory.GetDirectory() / uuid; @@ -134,7 +147,9 @@ TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) { // Testing senario: log to stdout and file; check whether these two sinks generate // expected output. -TEST(PipeLoggerCompatTest, CompatibilityTest) { +TEST_P(PipeLoggerTest, CompatibilityTest) { + PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()}; + // Testing-1: No newliner in the middle nor at the end. { constexpr std::string_view kContent = "hello"; @@ -315,6 +330,8 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) { } } +INSTANTIATE_TEST_SUITE_P(PipeLoggerTest, PipeLoggerTest, testing::Values(1024, 3)); + } // namespace } // namespace ray