Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix std::flush with istream::readsome #50248

Merged
merged 13 commits into from
Feb 12, 2025
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ ray_cc_library(
":stream_redirection_options",
":thread_utils",
":util",
":spdlog_newliner_sink",
"@boost//:iostreams",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/container:inlined_vector",
Expand Down
65 changes: 51 additions & 14 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "absl/container/inlined_vector.h"
#include "absl/strings/str_split.h"
#include "ray/util/spdlog_fd_sink.h"
#include "ray/util/spdlog_newliner_sink.h"
#include "ray/util/thread_utils.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/rotating_file_sink.h"
Expand All @@ -35,6 +36,21 @@ namespace ray {

namespace {

// Default pipe log read buffer size.
constexpr size_t kDefaultPipeLogReadBufSize = 1024;

size_t GetPipeLogReadSizeOrDefault() {
// TODO(hjiang): Write a util function `GetEnvOrDefault`.
dentiny marked this conversation as resolved.
Show resolved Hide resolved
const char *var_value = std::getenv(kPipeLogReadBufSizeEnv.data());
if (var_value != nullptr) {
size_t read_buf_size = 0;
if (absl::SimpleAtoi(var_value, &read_buf_size) && read_buf_size > 0) {
return read_buf_size;
}
}
return kDefaultPipeLogReadBufSize;
}

struct StreamDumper {
absl::Mutex mu;
bool stopped ABSL_GUARDED_BY(mu) = false;
Expand All @@ -57,24 +73,39 @@ void StartStreamDump(
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

std::string newline;
const size_t buf_size = GetPipeLogReadSizeOrDefault();
// Pre-allocate stream buffer to avoid excessive syscall.
// TODO(hjiang): Should resize without initialization.
std::string readsome_buffer(buf_size, '\0');

std::string cur_new_line{"a"};
dentiny marked this conversation as resolved.
Show resolved Hide resolved
while (pipe_instream->read(cur_new_line.data(), /*count=*/1)) {
// Read available bytes in non-blocking style.
while (true) {
auto bytes_read =
pipe_instream->readsome(readsome_buffer.data(), readsome_buffer.length());
if (bytes_read == 0) {
break;
}
std::string_view cur_readsome_buffer{readsome_buffer.data(),
static_cast<uint64_t>(bytes_read)};
cur_new_line += cur_readsome_buffer;
}

// Exit at pipe read EOF.
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
// Already read all we have at the moment, stream into logger.
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_new_line));
cur_new_line.clear();
}

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
// Read later bytes in blocking style.
cur_new_line = "a";
}

RAY_CHECK(pipe_instream->eof());
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}
// Reached EOF.
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}).detach();

std::thread([stream_dumper = stream_dumper,
Expand Down Expand Up @@ -127,7 +158,13 @@ std::shared_ptr<spdlog::logger> CreateLogger(
stream_redirect_opt.file_path);
}
file_sink->set_level(spdlog::level::info);
sinks.emplace_back(std::move(file_sink));
// Spdlog logger's formatter only applies for its sink (which is newliner sink here),
// but not internal sinks recursively (aka, rotation file sink won't be set); so have to
// manually set formatter here.
file_sink->set_formatter(std::make_unique<spdlog::pattern_formatter>(
"%v", spdlog::pattern_time_type::local, std::string("")));
auto newliner_sink = std::make_shared<spdlog_newliner_sink_st>(std::move(file_sink));
sinks.emplace_back(std::move(newliner_sink));

// Setup fd sink for stdout and stderr.
#if defined(__APPLE__) || defined(__linux__)
Expand Down
6 changes: 6 additions & 0 deletions src/ray/util/pipe_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

namespace ray {

// Environmenr variable, which indicates the pipe size of read.
//
// TODO(hjiang): Should document the env variable after end-to-end integration has
// finished.
inline constexpr std::string_view kPipeLogReadBufSizeEnv = "RAY_PIPE_LOG_READ_BUF_SIZE";
dentiny marked this conversation as resolved.
Show resolved Hide resolved

// File handle requires active destruction via owner calling [Close].
class RedirectionFileHandle {
public:
Expand Down
2 changes: 2 additions & 0 deletions src/ray/util/spdlog_newliner_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class spdlog_newliner_sink final : public spdlog::sinks::base_sink<Mutex> {
spdlog::details::log_msg new_log_msg;
new_log_msg.payload = std::string_view{buffer_.data(), buffer_.length()};
internal_sink_->log(std::move(new_log_msg));
buffer_.clear();
}
internal_sink_->flush();
}

private:
Expand Down
4 changes: 3 additions & 1 deletion src/ray/util/stream_redirection_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ absl::flat_hash_map<int, RedirectionFileHandle> redirection_file_handles;
// ONCE** at program termination.
std::once_flag stream_exit_once_flag;
void SyncOnStreamRedirection() {
for (auto &[_, handle] : redirection_file_handles) {
for (auto &[stream_fd, handle] : redirection_file_handles) {
// `dup2` leaves two pipe write fd, have to close them both.
RAY_CHECK_OK(Close(stream_fd));
handle.Close();
}
}
Expand Down
20 changes: 20 additions & 0 deletions src/ray/util/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,26 @@ ray_cc_test(
],
)

ray_cc_test(
name = "stream_redirection_exit_test",
srcs = ["stream_redirection_exit_test.cc"],
deps = [
"//src/ray/common/test:testing",
"//src/ray/util",
"//src/ray/util:stream_redirection_utils",
"@com_google_googletest//:gtest_main",
],
size = "small",
tags = [
"team:core",
# TSAN fails to understand synchroization logic, from the stacktrace, it shows we flush
# ostream concurrently at pipe dumper thread and main thread, which we have ordered
# properly. Disable the complete test suite here since it always contains exactly one test
# case.
"no_tsan",
],
)

ray_cc_test(
name = "cmd_line_utils_test",
srcs = ["cmd_line_utils_test.cc"],
Expand Down
25 changes: 21 additions & 4 deletions src/ray/util/tests/pipe_logger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world\n";

TEST(PipeLoggerTest, RedirectionTest) {
class PipeReadBufferSizeSetter {
dentiny marked this conversation as resolved.
Show resolved Hide resolved
public:
PipeReadBufferSizeSetter(size_t pipe_buffer_size) {
setEnv(kPipeLogReadBufSizeEnv.data(), absl::StrFormat("%d", pipe_buffer_size).data());
}
~PipeReadBufferSizeSetter() { unsetEnv(kPipeLogReadBufSizeEnv.data()); }
};

class PipeLoggerTest : public ::testing::TestWithParam<size_t> {};

TEST_P(PipeLoggerTest, RedirectionTest) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -53,7 +64,8 @@ TEST(PipeLoggerTest, RedirectionTest) {
EXPECT_EQ(*actual_content, expected_content);
}

TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
TEST_P(PipeLoggerTest, RedirectionWithTee) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -79,7 +91,8 @@ TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
EXPECT_EQ(*actual_content, absl::StrFormat("%s%s", kLogLine1, kLogLine2));
}

TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {
TEST_P(PipeLoggerTest, RotatedRedirectionWithTee) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto uuid = GenerateUUIDV4();
const auto test_file_path = scoped_directory.GetDirectory() / uuid;
Expand Down Expand Up @@ -117,7 +130,9 @@ TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {

// Testing senario: log to stdout and file; check whether these two sinks generate
// expected output.
TEST(PipeLoggerCompatTest, CompatibilityTest) {
TEST_P(PipeLoggerTest, CompatibilityTest) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};

// Testing-1: No newliner in the middle nor at the end.
{
constexpr std::string_view kContent = "hello";
Expand Down Expand Up @@ -298,6 +313,8 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
}
}

INSTANTIATE_TEST_SUITE_P(PipeLoggerTest, PipeLoggerTest, testing::Values(1024, 3));

} // namespace

} // namespace ray
21 changes: 21 additions & 0 deletions src/ray/util/tests/spdlog_newliner_sink_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ std::shared_ptr<spdlog::logger> CreateLogger() {
return logger;
}

// Testing senario: Keep writing to spdlog after flush, and check whether all written
dentiny marked this conversation as resolved.
Show resolved Hide resolved
// content is correctly reflected.
TEST(NewlinerSinkTest, WriteAfterFlush) {
auto logger = CreateLogger();
constexpr std::string_view kContent = "hello";

// First time write and flush.
testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);

// Write after flush.
testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
}

TEST(NewlinerSinkTest, AppendAndFlushTest) {
// Case-1: string with newliner at the end.
{
Expand Down
58 changes: 58 additions & 0 deletions src/ray/util/tests/stream_redirection_exit_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This test case only checks whether stream redirection process could exit normally.

#include <gtest/gtest.h>

#include <chrono>
#include <iostream>
#include <thread>

#include "ray/common/test/testing.h"
#include "ray/util/filesystem.h"
#include "ray/util/stream_redirection_utils.h"
#include "ray/util/util.h"

namespace ray {

namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world";
} // namespace

TEST(LoggingUtilTest, RedirectStderr) {
const std::string test_file_path = absl::StrFormat("%s.err", GenerateUUIDV4());

// Works via `dup`, so have to execute before we redirect via `dup2` and close stderr.
testing::internal::CaptureStderr();

// Redirect stderr for testing, so we could have stdout for debugging.
StreamRedirectionOption opts;
opts.file_path = test_file_path;
opts.tee_to_stderr = true;
RedirectStderr(opts);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we decouple RedirectStream with atexit() so that we can test better and don't need to create a file for each test.

Ideally we can manually call SyncOnStreamRedirection to close the redirection.

RedirectStderr();
// test
SyncOnStreamRedirection(); // restore to the old state
// some checks


RedirectStderr();
// another test
SyncOnStreamRedirection();
// some checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way you need to return redirection handler to the caller? Or return the exit terminator.


std::cerr << kLogLine1 << std::flush;
std::cerr << kLogLine2 << std::flush;

// TODO(hjiang): Current implementation is flaky intrinsically, sleep for a while to
// make sure pipe content has been read over to spdlog.
std::this_thread::sleep_for(std::chrono::seconds(2));
FlushOnRedirectedStderr();

// Make sure flush hook works fine and process terminates with no problem.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not checking anything here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See documentation above.

// This test case only checks whether stream redirection process could exit normally.

}

} // namespace ray