Skip to content

Commit

Permalink
[core] Fix std::flush with istream::readsome (#50248)
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny authored Feb 12, 2025
1 parent 6351f89 commit b11c571
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 24 deletions.
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -939,3 +939,6 @@ RAY_CONFIG(bool, enable_export_api_write, false)
// src/ray/protobuf/export_api/export_event.proto
// Example config: `export RAY_enable_export_api_write_config='EXPORT_ACTOR,EXPORT_TASK'`
RAY_CONFIG(std::vector<std::string>, enable_export_api_write_config, {})

// Configuration for pipe logger buffer size.
RAY_CONFIG(uint64_t, pipe_logger_read_buf_size, 1024)
4 changes: 3 additions & 1 deletion src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,15 @@ ray_cc_library(
deps = [
":compat",
":spdlog_fd_sink",
":spdlog_newliner_sink",
":stream_redirection_options",
":thread_utils",
":util",
"//src/ray/common:ray_config",
"@boost//:iostreams",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/container:inlined_vector",
"@com_google_absl//absl/strings",
"@com_github_spdlog//:spdlog",
],
)

Expand Down
51 changes: 37 additions & 14 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@

#include "absl/container/inlined_vector.h"
#include "absl/strings/str_split.h"
#include "ray/common/ray_config.h"
#include "ray/util/spdlog_fd_sink.h"
#include "ray/util/spdlog_newliner_sink.h"
#include "ray/util/thread_utils.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/rotating_file_sink.h"
Expand Down Expand Up @@ -57,24 +59,39 @@ void StartStreamDump(
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

std::string newline;
const size_t buf_size = RayConfig::instance().pipe_logger_read_buf_size();
// Pre-allocate stream buffer to avoid excessive syscall.
// TODO(hjiang): Should resize without initialization.
std::string readsome_buffer(buf_size, '\0');

std::string cur_segment{"a"};
while (pipe_instream->read(cur_segment.data(), /*count=*/1)) {
// Read available bytes in non-blocking style.
while (true) {
auto bytes_read =
pipe_instream->readsome(readsome_buffer.data(), readsome_buffer.length());
if (bytes_read == 0) {
break;
}
std::string_view cur_readsome_buffer{readsome_buffer.data(),
static_cast<uint64_t>(bytes_read)};
cur_segment += cur_readsome_buffer;
}

// Exit at pipe read EOF.
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
// Already read all we have at the moment, stream into logger.
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_segment));
cur_segment.clear();
}

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
// Read later bytes in blocking style.
cur_segment = "a";
}

RAY_CHECK(pipe_instream->eof());
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}
// Reached EOF.
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}).detach();

std::thread([stream_dumper = stream_dumper,
Expand Down Expand Up @@ -127,7 +144,13 @@ std::shared_ptr<spdlog::logger> CreateLogger(
stream_redirect_opt.file_path);
}
file_sink->set_level(spdlog::level::info);
sinks.emplace_back(std::move(file_sink));
// Spdlog logger's formatter only applies for its sink (which is newliner sink here),
// but not internal sinks recursively (aka, rotation file sink won't be set); so have to
// manually set formatter here.
file_sink->set_formatter(std::make_unique<spdlog::pattern_formatter>(
"%v", spdlog::pattern_time_type::local, std::string("")));
auto newliner_sink = std::make_shared<spdlog_newliner_sink_st>(std::move(file_sink));
sinks.emplace_back(std::move(newliner_sink));

// Setup fd sink for stdout and stderr.
#if defined(__APPLE__) || defined(__linux__)
Expand Down
2 changes: 2 additions & 0 deletions src/ray/util/spdlog_newliner_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ class spdlog_newliner_sink final : public spdlog::sinks::base_sink<Mutex> {
spdlog::details::log_msg new_log_msg;
new_log_msg.payload = std::string_view{buffer_.data(), buffer_.length()};
internal_sink_->log(std::move(new_log_msg));
buffer_.clear();
}
internal_sink_->flush();
}

private:
Expand Down
46 changes: 41 additions & 5 deletions src/ray/util/stream_redirection_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,34 @@ namespace ray {

namespace {

struct RedirectionHandleWrapper {
RedirectionFileHandle redirection_file_handle;
// Used for restoration.
MEMFD_TYPE_NON_UNIQUE saved_stream_handle;
};

// TODO(hjiang): Revisit later, should be able to save some heap allocation with
// absl::InlinedVector.
//
// Maps from original stream file handle (i.e. stdout/stderr) to its stream redirector.
absl::flat_hash_map<int, RedirectionFileHandle> redirection_file_handles;
absl::flat_hash_map<int, RedirectionHandleWrapper> redirection_file_handles;

// Block synchronize on stream redirection related completion, should be call **EXACTLY
// ONCE** at program termination.
std::once_flag stream_exit_once_flag;
void SyncOnStreamRedirection() {
for (auto &[_, handle] : redirection_file_handles) {
handle.Close();
for (auto &[stream_fd, handle] : redirection_file_handles) {
// Restore old stream fd.
#if defined(__APPLE__) || defined(__linux__)
RAY_CHECK_NE(dup2(handle.saved_stream_handle, stream_fd), -1)
<< "Fails to restore file descritor " << strerror(errno);
#elif defined(_WIN32)
int duped_fd = _open_osfhandle(reinterpret_cast<intptr_t>(handle.saved_stream_handle),
_O_WRONLY);
RAY_CHECK_NE(_dup2(duped_fd, stream_fd), -1) << "Fails to duplicate file descritor.";
#endif

handle.redirection_file_handle.Close();
}
}

Expand All @@ -57,25 +73,45 @@ void RedirectStream(int stream_fd, const StreamRedirectionOption &opt) {
RedirectionFileHandle handle = CreateRedirectionFileHandle(opt);

#if defined(__APPLE__) || defined(__linux__)
// Duplicate stream fd for later restoration.
MEMFD_TYPE_NON_UNIQUE duped_stream_fd = dup(stream_fd);
RAY_CHECK_NE(duped_stream_fd, -1)
<< "Fails to duplicate stream fd " << stream_fd << " because " << strerror(errno);

RAY_CHECK_NE(dup2(handle.GetWriteHandle(), stream_fd), -1)
<< "Fails to duplicate file descritor " << strerror(errno);
#elif defined(_WIN32)
// Duplicate stream fd for later restoration.
MEMFD_TYPE_NON_UNIQUE duped_stream_fd;
BOOL result = DuplicateHandle(GetCurrentProcess(),
(HANDLE)_get_osfhandle(stream_fd),
GetCurrentProcess(),
&duped_stream_fd,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result);

int pipe_write_fd =
_open_osfhandle(reinterpret_cast<intptr_t>(handle.GetWriteHandle()), _O_WRONLY);
RAY_CHECK_NE(_dup2(pipe_write_fd, stream_fd), -1)
<< "Fails to duplicate file descritor.";
#endif

RedirectionHandleWrapper handle_wrapper;
handle_wrapper.redirection_file_handle = std::move(handle);
handle_wrapper.saved_stream_handle = duped_stream_fd;

const bool is_new =
redirection_file_handles.emplace(stream_fd, std::move(handle)).second;
redirection_file_handles.emplace(stream_fd, std::move(handle_wrapper)).second;
RAY_CHECK(is_new) << "Redirection has been register for stream " << stream_fd;
}

void FlushOnRedirectedStream(int stream_fd) {
auto iter = redirection_file_handles.find(stream_fd);
RAY_CHECK(iter != redirection_file_handles.end())
<< "Stream with file descriptor " << stream_fd << " is not registered.";
iter->second.Flush();
iter->second.redirection_file_handle.Flush();
}

} // namespace
Expand Down
21 changes: 21 additions & 0 deletions src/ray/util/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ ray_cc_test(
"//src/ray/common/test:testing",
"//src/ray/util",
"//src/ray/util:pipe_logger",
"//src/ray/util:scoped_env_setter",
"@com_google_googletest//:gtest_main",
],
size = "small",
Expand All @@ -268,6 +269,26 @@ ray_cc_test(
],
)

ray_cc_test(
name = "stream_redirection_exit_test",
srcs = ["stream_redirection_exit_test.cc"],
deps = [
"//src/ray/common/test:testing",
"//src/ray/util",
"//src/ray/util:stream_redirection_utils",
"@com_google_googletest//:gtest_main",
],
size = "small",
tags = [
"team:core",
# TSAN fails to understand synchroization logic, from the stacktrace, it shows we flush
# ostream concurrently at pipe dumper thread and main thread, which we have ordered
# properly. Disable the complete test suite here since it always contains exactly one test
# case.
"no_tsan",
],
)

ray_cc_test(
name = "cmd_line_utils_test",
srcs = ["cmd_line_utils_test.cc"],
Expand Down
26 changes: 22 additions & 4 deletions src/ray/util/tests/pipe_logger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "ray/common/test/testing.h"
#include "ray/util/filesystem.h"
#include "ray/util/scoped_env_setter.h"
#include "ray/util/temporary_directory.h"
#include "ray/util/util.h"

Expand All @@ -34,7 +35,12 @@ namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world\n";

TEST(PipeLoggerTest, RedirectionTest) {
class PipeLoggerTest : public ::testing::TestWithParam<size_t> {};

TEST_P(PipeLoggerTest, RedirectionTest) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -53,7 +59,10 @@ TEST(PipeLoggerTest, RedirectionTest) {
EXPECT_EQ(*actual_content, expected_content);
}

TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
TEST_P(PipeLoggerTest, RedirectionWithTee) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -79,7 +88,10 @@ TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
EXPECT_EQ(*actual_content, absl::StrFormat("%s%s", kLogLine1, kLogLine2));
}

TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {
TEST_P(PipeLoggerTest, RotatedRedirectionWithTee) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};
ScopedTemporaryDirectory scoped_directory;
const auto uuid = GenerateUUIDV4();
const auto test_file_path = scoped_directory.GetDirectory() / uuid;
Expand Down Expand Up @@ -117,7 +129,11 @@ TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {

// Testing senario: log to stdout and file; check whether these two sinks generate
// expected output.
TEST(PipeLoggerCompatTest, CompatibilityTest) {
TEST_P(PipeLoggerTest, CompatibilityTest) {
const std::string pipe_buffer_size = absl::StrFormat("%d", GetParam());
ScopedEnvSetter scoped_env_setter{"RAY_pipe_logger_read_buf_size",
pipe_buffer_size.data()};

// Testing-1: No newliner in the middle nor at the end.
{
constexpr std::string_view kContent = "hello";
Expand Down Expand Up @@ -298,6 +314,8 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
}
}

INSTANTIATE_TEST_SUITE_P(PipeLoggerTest, PipeLoggerTest, testing::Values(1024, 3));

} // namespace

} // namespace ray
21 changes: 21 additions & 0 deletions src/ray/util/tests/spdlog_newliner_sink_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,27 @@ std::shared_ptr<spdlog::logger> CreateLogger() {
return logger;
}

// Testing scenario: Keep writing to spdlog after flush, and check whether all written
// content is correctly reflected.
TEST(NewlinerSinkTest, WriteAfterFlush) {
auto logger = CreateLogger();
constexpr std::string_view kContent = "hello";

// First time write and flush.
testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);

// Write after flush.
testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
}

TEST(NewlinerSinkTest, AppendAndFlushTest) {
// Case-1: string with newliner at the end.
{
Expand Down
58 changes: 58 additions & 0 deletions src/ray/util/tests/stream_redirection_exit_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This test case only checks whether stream redirection process could exit normally.

#include <gtest/gtest.h>

#include <chrono>
#include <iostream>
#include <thread>

#include "ray/common/test/testing.h"
#include "ray/util/filesystem.h"
#include "ray/util/stream_redirection_utils.h"
#include "ray/util/util.h"

namespace ray {

namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world";
} // namespace

TEST(LoggingUtilTest, RedirectStderr) {
const std::string test_file_path = absl::StrFormat("%s.err", GenerateUUIDV4());

// Works via `dup`, so have to execute before we redirect via `dup2` and close stderr.
testing::internal::CaptureStderr();

// Redirect stderr for testing, so we could have stdout for debugging.
StreamRedirectionOption opts;
opts.file_path = test_file_path;
opts.tee_to_stderr = true;
RedirectStderr(opts);

std::cerr << kLogLine1 << std::flush;
std::cerr << kLogLine2 << std::flush;

// TODO(hjiang): Current implementation is flaky intrinsically, sleep for a while to
// make sure pipe content has been read over to spdlog.
std::this_thread::sleep_for(std::chrono::seconds(2));
FlushOnRedirectedStderr();

// Make sure flush hook works fine and process terminates with no problem.
}

} // namespace ray

0 comments on commit b11c571

Please sign in to comment.