Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix std::flush with istream::readsome #50248

Merged
merged 13 commits into from
Feb 12, 2025
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -939,3 +939,6 @@ RAY_CONFIG(bool, enable_export_api_write, false)
// src/ray/protobuf/export_api/export_event.proto
// Example config: `export RAY_enable_export_api_write_config='EXPORT_ACTOR,EXPORT_TASK'`
RAY_CONFIG(std::vector<std::string>, enable_export_api_write_config, {})

// Configuration for pipe logger buffer size.
RAY_CONFIG(uint64_t, pipe_logger_read_buf_size, 1024)
4 changes: 3 additions & 1 deletion src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,15 @@ ray_cc_library(
deps = [
":compat",
":spdlog_fd_sink",
":spdlog_newliner_sink",
":stream_redirection_options",
":thread_utils",
":util",
"//src/ray/common:ray_config",
"@boost//:iostreams",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/container:inlined_vector",
"@com_google_absl//absl/strings",
"@com_github_spdlog//:spdlog",
],
)

Expand Down
51 changes: 37 additions & 14 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

#include "absl/container/inlined_vector.h"
#include "absl/strings/str_split.h"
#include "ray/common/ray_config.h"
#include "ray/util/spdlog_fd_sink.h"
#include "ray/util/spdlog_newliner_sink.h"
#include "ray/util/thread_utils.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/rotating_file_sink.h"
Expand Down Expand Up @@ -57,24 +59,39 @@ void StartStreamDump(
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

std::string newline;
const size_t buf_size = RayConfig::instance().pipe_logger_read_buf_size();
// Pre-allocate stream buffer to avoid excessive syscall.
// TODO(hjiang): Should resize without initialization.
std::string readsome_buffer(buf_size, '\0');

std::string cur_segment{"a"};
while (pipe_instream->read(cur_segment.data(), /*count=*/1)) {
// Read available bytes in non-blocking style.
while (true) {
auto bytes_read =
pipe_instream->readsome(readsome_buffer.data(), readsome_buffer.length());
if (bytes_read == 0) {
break;
}
std::string_view cur_readsome_buffer{readsome_buffer.data(),
static_cast<uint64_t>(bytes_read)};
cur_segment += cur_readsome_buffer;
}

// Exit at pipe read EOF.
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
// Already read all we have at the moment, stream into logger.
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_segment));
cur_segment.clear();
}

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
// Read later bytes in blocking style.
cur_segment = "a";
}

RAY_CHECK(pipe_instream->eof());
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}
// Reached EOF.
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}).detach();

std::thread([stream_dumper = stream_dumper,
Expand Down Expand Up @@ -127,7 +144,13 @@ std::shared_ptr<spdlog::logger> CreateLogger(
stream_redirect_opt.file_path);
}
file_sink->set_level(spdlog::level::info);
sinks.emplace_back(std::move(file_sink));
// Spdlog logger's formatter only applies for its sink (which is newliner sink here),
// but not internal sinks recursively (aka, rotation file sink won't be set); so have to
// manually set formatter here.
file_sink->set_formatter(std::make_unique<spdlog::pattern_formatter>(
"%v", spdlog::pattern_time_type::local, std::string("")));
auto newliner_sink = std::make_shared<spdlog_newliner_sink_st>(std::move(file_sink));
sinks.emplace_back(std::move(newliner_sink));

// Setup fd sink for stdout and stderr.
#if defined(__APPLE__) || defined(__linux__)
Expand Down
2 changes: 2 additions & 0 deletions src/ray/util/spdlog_newliner_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class spdlog_newliner_sink final : public spdlog::sinks::base_sink<Mutex> {
spdlog::details::log_msg new_log_msg;
new_log_msg.payload = std::string_view{buffer_.data(), buffer_.length()};
internal_sink_->log(std::move(new_log_msg));
buffer_.clear();
}
internal_sink_->flush();
}

private:
Expand Down
46 changes: 41 additions & 5 deletions src/ray/util/stream_redirection_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,34 @@ namespace ray {

namespace {

struct RedirectionHandleWrapper {
RedirectionFileHandle redirection_file_handle;
// Used for restoration.
MEMFD_TYPE_NON_UNIQUE saved_stream_handle;
};

// TODO(hjiang): Revisit later, should be able to save some heap allocation with
// absl::InlinedVector.
//
// Maps from original stream file handle (i.e. stdout/stderr) to its stream redirector.
absl::flat_hash_map<int, RedirectionFileHandle> redirection_file_handles;
absl::flat_hash_map<int, RedirectionHandleWrapper> redirection_file_handles;

// Block synchronize on stream redirection related completion, should be call **EXACTLY
// ONCE** at program termination.
std::once_flag stream_exit_once_flag;
void SyncOnStreamRedirection() {
for (auto &[_, handle] : redirection_file_handles) {
handle.Close();
for (auto &[stream_fd, handle] : redirection_file_handles) {
// Restore old stream fd.
#if defined(__APPLE__) || defined(__linux__)
RAY_CHECK_NE(dup2(handle.saved_stream_handle, stream_fd), -1)
<< "Fails to restore file descritor " << strerror(errno);
#elif defined(_WIN32)
int duped_fd = _open_osfhandle(reinterpret_cast<intptr_t>(handle.saved_stream_handle),
_O_WRONLY);
RAY_CHECK_NE(_dup2(duped_fd, stream_fd), -1) << "Fails to duplicate file descritor.";
#endif

handle.redirection_file_handle.Close();
}
}

Expand All @@ -57,25 +73,45 @@ void RedirectStream(int stream_fd, const StreamRedirectionOption &opt) {
RedirectionFileHandle handle = CreateRedirectionFileHandle(opt);

#if defined(__APPLE__) || defined(__linux__)
// Duplicate stream fd for later restoration.
MEMFD_TYPE_NON_UNIQUE duped_stream_fd = dup(stream_fd);
RAY_CHECK_NE(duped_stream_fd, -1)
<< "Fails to duplicate stream fd " << stream_fd << " because " << strerror(errno);

RAY_CHECK_NE(dup2(handle.GetWriteHandle(), stream_fd), -1)
<< "Fails to duplicate file descritor " << strerror(errno);
#elif defined(_WIN32)
// Duplicate stream fd for later restoration.
MEMFD_TYPE_NON_UNIQUE duped_stream_fd;
BOOL result = DuplicateHandle(GetCurrentProcess(),
(HANDLE)_get_osfhandle(stream_fd),
GetCurrentProcess(),
&duped_stream_fd,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result);

int pipe_write_fd =
_open_osfhandle(reinterpret_cast<intptr_t>(handle.GetWriteHandle()), _O_WRONLY);
RAY_CHECK_NE(_dup2(pipe_write_fd, stream_fd), -1)
<< "Fails to duplicate file descritor.";
#endif

RedirectionHandleWrapper handle_wrapper;
handle_wrapper.redirection_file_handle = std::move(handle);
handle_wrapper.saved_stream_handle = duped_stream_fd;

const bool is_new =
redirection_file_handles.emplace(stream_fd, std::move(handle)).second;
redirection_file_handles.emplace(stream_fd, std::move(handle_wrapper)).second;
RAY_CHECK(is_new) << "Redirection has been register for stream " << stream_fd;
}

void FlushOnRedirectedStream(int stream_fd) {
auto iter = redirection_file_handles.find(stream_fd);
RAY_CHECK(iter != redirection_file_handles.end())
<< "Stream with file descriptor " << stream_fd << " is not registered.";
iter->second.Flush();
iter->second.redirection_file_handle.Flush();
}

} // namespace
Expand Down
21 changes: 21 additions & 0 deletions src/ray/util/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ ray_cc_test(
"//src/ray/common/test:testing",
"//src/ray/util",
"//src/ray/util:pipe_logger",
"//src/ray/util:scoped_env_setter",
"@com_google_googletest//:gtest_main",
],
size = "small",
Expand All @@ -267,6 +268,26 @@ ray_cc_test(
],
)

ray_cc_test(
name = "stream_redirection_exit_test",
srcs = ["stream_redirection_exit_test.cc"],
deps = [
"//src/ray/common/test:testing",
"//src/ray/util",
"//src/ray/util:stream_redirection_utils",
"@com_google_googletest//:gtest_main",
],
size = "small",
tags = [
"team:core",
# TSAN fails to understand synchroization logic, from the stacktrace, it shows we flush
# ostream concurrently at pipe dumper thread and main thread, which we have ordered
# properly. Disable the complete test suite here since it always contains exactly one test
# case.
"no_tsan",
],
)

ray_cc_test(
name = "cmd_line_utils_test",
srcs = ["cmd_line_utils_test.cc"],
Expand Down
26 changes: 22 additions & 4 deletions src/ray/util/tests/pipe_logger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "ray/common/test/testing.h"
#include "ray/util/filesystem.h"
#include "ray/util/scoped_env_setter.h"
#include "ray/util/temporary_directory.h"
#include "ray/util/util.h"

Expand All @@ -34,7 +35,12 @@ namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world\n";

TEST(PipeLoggerTest, RedirectionTest) {
class PipeLoggerTest : public ::testing::TestWithParam<size_t> {};

TEST_P(PipeLoggerTest, RedirectionTest) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -53,7 +59,10 @@ TEST(PipeLoggerTest, RedirectionTest) {
EXPECT_EQ(*actual_content, expected_content);
}

TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
TEST_P(PipeLoggerTest, RedirectionWithTee) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -79,7 +88,10 @@ TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
EXPECT_EQ(*actual_content, absl::StrFormat("%s%s", kLogLine1, kLogLine2));
}

TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {
TEST_P(PipeLoggerTest, RotatedRedirectionWithTee) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};
ScopedTemporaryDirectory scoped_directory;
const auto uuid = GenerateUUIDV4();
const auto test_file_path = scoped_directory.GetDirectory() / uuid;
Expand Down Expand Up @@ -117,7 +129,11 @@ TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {

// Testing senario: log to stdout and file; check whether these two sinks generate
// expected output.
TEST(PipeLoggerCompatTest, CompatibilityTest) {
TEST_P(PipeLoggerTest, CompatibilityTest) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};

// Testing-1: No newliner in the middle nor at the end.
{
constexpr std::string_view kContent = "hello";
Expand Down Expand Up @@ -298,6 +314,8 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
}
}

INSTANTIATE_TEST_SUITE_P(PipeLoggerTest, PipeLoggerTest, testing::Values(1024, 3));

} // namespace

} // namespace ray
21 changes: 21 additions & 0 deletions src/ray/util/tests/spdlog_newliner_sink_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ std::shared_ptr<spdlog::logger> CreateLogger() {
return logger;
}

// Testing scenario: Keep writing to spdlog after flush, and check whether all written
// content is correctly reflected.
TEST(NewlinerSinkTest, WriteAfterFlush) {
auto logger = CreateLogger();
constexpr std::string_view kContent = "hello";

// First time write and flush.
testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);

// Write after flush.
testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
}

TEST(NewlinerSinkTest, AppendAndFlushTest) {
// Case-1: string with newliner at the end.
{
Expand Down
58 changes: 58 additions & 0 deletions src/ray/util/tests/stream_redirection_exit_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This test case only checks whether stream redirection process could exit normally.

#include <gtest/gtest.h>

#include <chrono>
#include <iostream>
#include <thread>

#include "ray/common/test/testing.h"
#include "ray/util/filesystem.h"
#include "ray/util/stream_redirection_utils.h"
#include "ray/util/util.h"

namespace ray {

namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world";
} // namespace

TEST(LoggingUtilTest, RedirectStderr) {
const std::string test_file_path = absl::StrFormat("%s.err", GenerateUUIDV4());

// Works via `dup`, so have to execute before we redirect via `dup2` and close stderr.
testing::internal::CaptureStderr();

// Redirect stderr for testing, so we could have stdout for debugging.
StreamRedirectionOption opts;
opts.file_path = test_file_path;
opts.tee_to_stderr = true;
RedirectStderr(opts);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we decouple RedirectStream with atexit() so that we can test better and don't need to create a file for each test.

Ideally we can manually call SyncOnStreamRedirection to close the redirection.

RedirectStderr();
// test
SyncOnStreamRedirection(); // restore to the old state
// some checks


RedirectStderr();
// another test
SyncOnStreamRedirection();
// some checks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That way you need to return redirection handler to the caller? Or return the exit terminator.


std::cerr << kLogLine1 << std::flush;
std::cerr << kLogLine2 << std::flush;

// TODO(hjiang): Current implementation is flaky intrinsically, sleep for a while to
// make sure pipe content has been read over to spdlog.
std::this_thread::sleep_for(std::chrono::seconds(2));
FlushOnRedirectedStderr();

// Make sure flush hook works fine and process terminates with no problem.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are not checking anything here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See documentation above.

// This test case only checks whether stream redirection process could exit normally.

}

} // namespace ray