Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix std::flush with istream::readsome #50248

Merged
merged 13 commits into from
Feb 12, 2025
71 changes: 59 additions & 12 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ namespace ray {

namespace {

// Default pipe log read buffer size.
constexpr size_t kDefaultPipeLogReadBufSize = 1024;

size_t GetPipeLogReadSizeOrDefault() {
// TODO(hjiang): Write a util function `GetEnvOrDefault`.
dentiny marked this conversation as resolved.
Show resolved Hide resolved
const char *var_value = std::getenv(kPipeLogReadBufSizeEnv.data());
if (var_value != nullptr) {
size_t read_buf_size = 0;
if (absl::SimpleAtoi(var_value, &read_buf_size) && read_buf_size > 0) {
return read_buf_size;
}
}
return kDefaultPipeLogReadBufSize;
}

struct StreamDumper {
absl::Mutex mu;
bool stopped ABSL_GUARDED_BY(mu) = false;
Expand All @@ -56,24 +71,56 @@ void StartStreamDump(
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

std::string newline;
const size_t buf_size = GetPipeLogReadSizeOrDefault();
// Pre-allocate stream buffer to avoid excessive syscall.
// TODO(hjiang): Should resize without initialization.
std::string readsome_buffer(buf_size, '\0');
// Logging are written in lines, `last_line` records part of the strings left in
// last `read` syscall.
std::string last_line;

std::string cur_new_line{"a"};
dentiny marked this conversation as resolved.
Show resolved Hide resolved
while (pipe_instream->read(cur_new_line.data(), /*count=*/1)) {
// Read available bytes in non-blocking style.
while (true) {
auto bytes_read =
pipe_instream->readsome(readsome_buffer.data(), readsome_buffer.length());
if (bytes_read == 0) {
break;
}
std::string_view cur_readsome_buffer{readsome_buffer.data(),
static_cast<uint64_t>(bytes_read)};
cur_new_line += cur_readsome_buffer;
}

// After we read a chunk of bytes continuously, split into lines, send to write
// thread and multiple sinks.
std::vector<std::string_view> newlines = absl::StrSplit(cur_new_line, '\n');
for (size_t idx = 0; !newlines.empty() && idx < newlines.size() - 1; ++idx) {
std::string cur_segment = std::move(last_line);
last_line.clear();
cur_segment += newlines[idx];
cur_segment += '\n';

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_segment));
}

// Exit at pipe read EOF.
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
// Record last segment.
if (!newlines.empty()) {
last_line = newlines.back();
}

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
// Read later bytes in blocking style.
cur_new_line = "a";
}

RAY_CHECK(pipe_instream->eof());
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
// Reached EOF.
absl::MutexLock lock(&stream_dumper->mu);
if (!last_line.empty()) {
stream_dumper->content.emplace_back(std::move(last_line));
}
stream_dumper->stopped = true;
}).detach();

std::thread([stream_dumper = stream_dumper,
Expand Down
6 changes: 6 additions & 0 deletions src/ray/util/pipe_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

namespace ray {

// Environmenr variable, which indicates the pipe size of read.
//
// TODO(hjiang): Should document the env variable after end-to-end integration has
// finished.
inline constexpr std::string_view kPipeLogReadBufSizeEnv = "RAY_PIPE_LOG_READ_BUF_SIZE";
dentiny marked this conversation as resolved.
Show resolved Hide resolved

// File handle requires active destruction via owner calling [Close].
class RedirectionFileHandle {
public:
Expand Down
25 changes: 21 additions & 4 deletions src/ray/util/tests/pipe_logger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,18 @@ namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world\n";

TEST(PipeLoggerTest, RedirectionTest) {
class PipeReadBufferSizeSetter {
dentiny marked this conversation as resolved.
Show resolved Hide resolved
public:
PipeReadBufferSizeSetter(size_t pipe_buffer_size) {
setEnv(kPipeLogReadBufSizeEnv.data(), absl::StrFormat("%d", pipe_buffer_size).data());
}
~PipeReadBufferSizeSetter() { unsetEnv(kPipeLogReadBufSizeEnv.data()); }
};

class PipeLoggerTest : public ::testing::TestWithParam<size_t> {};

TEST_P(PipeLoggerTest, RedirectionTest) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -59,7 +70,8 @@ TEST(PipeLoggerTest, RedirectionTest) {
EXPECT_EQ(*actual_content, expected_content);
}

TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
TEST_P(PipeLoggerTest, RedirectionWithTee) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand Down Expand Up @@ -90,7 +102,8 @@ TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
EXPECT_EQ(*actual_content, absl::StrFormat("%s%s", kLogLine1, kLogLine2));
}

TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {
TEST_P(PipeLoggerTest, RotatedRedirectionWithTee) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto uuid = GenerateUUIDV4();
const auto test_file_path = scoped_directory.GetDirectory() / uuid;
Expand Down Expand Up @@ -134,7 +147,9 @@ TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {

// Testing senario: log to stdout and file; check whether these two sinks generate
// expected output.
TEST(PipeLoggerCompatTest, CompatibilityTest) {
TEST_P(PipeLoggerTest, CompatibilityTest) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};

// Testing-1: No newliner in the middle nor at the end.
{
constexpr std::string_view kContent = "hello";
Expand Down Expand Up @@ -315,6 +330,8 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
}
}

INSTANTIATE_TEST_SUITE_P(PipeLoggerTest, PipeLoggerTest, testing::Values(1024, 3));

} // namespace

} // namespace ray