Skip to content

Commit

Permalink
pipe readsome
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Feb 5, 2025
1 parent 27e5850 commit 0953813
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 16 deletions.
71 changes: 59 additions & 12 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ namespace ray {

namespace {

// Default pipe log read buffer size.
constexpr size_t kDefaultPipeLogReadBufSize = 1024;

size_t GetPipeLogReadSizeOrDefault() {
// TODO(hjiang): Write a util function `GetEnvOrDefault`.
const char *var_value = std::getenv(kPipeLogReadBufSizeEnv.data());
if (var_value != nullptr) {
size_t read_buf_size = 0;
if (absl::SimpleAtoi(var_value, &read_buf_size) && read_buf_size > 0) {
return read_buf_size;
}
}
return kDefaultPipeLogReadBufSize;
}

struct StreamDumper {
absl::Mutex mu;
bool stopped ABSL_GUARDED_BY(mu) = false;
Expand All @@ -56,24 +71,56 @@ void StartStreamDump(
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

std::string newline;
const size_t buf_size = GetPipeLogReadSizeOrDefault();
// Pre-allocate stream buffer to avoid excessive syscall.
// TODO(hjiang): Should resize without initialization.
std::string readsome_buffer(buf_size, '\0');
// Logging are written in lines, `last_line` records part of the strings left in
// last `read` syscall.
std::string last_line;

std::string cur_new_line{"a"};
while (pipe_instream->read(cur_new_line.data(), /*count=*/1)) {
// Read available bytes in non-blocking style.
while (true) {
auto bytes_read =
pipe_instream->readsome(readsome_buffer.data(), readsome_buffer.length());
if (bytes_read == 0) {
break;
}
std::string_view cur_readsome_buffer{readsome_buffer.data(),
static_cast<uint64_t>(bytes_read)};
cur_new_line += cur_readsome_buffer;
}

// After we read a chunk of bytes continuously, split into lines, send to write
// thread and multiple sinks.
std::vector<std::string_view> newlines = absl::StrSplit(cur_new_line, '\n');
for (size_t idx = 0; !newlines.empty() && idx < newlines.size() - 1; ++idx) {
std::string cur_segment = std::move(last_line);
last_line.clear();
cur_segment += newlines[idx];
cur_segment += '\n';

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_segment));
}

// Exit at pipe read EOF.
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
// Record last segment.
if (!newlines.empty()) {
last_line = newlines.back();
}

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
// Read later bytes in blocking style.
cur_new_line = "a";
}

RAY_CHECK(pipe_instream->eof());
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
// Reached EOF.
absl::MutexLock lock(&stream_dumper->mu);
if (!last_line.empty()) {
stream_dumper->content.emplace_back(std::move(last_line));
}
stream_dumper->stopped = true;
}).detach();

std::thread([stream_dumper = stream_dumper,
Expand Down
6 changes: 6 additions & 0 deletions src/ray/util/pipe_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@

namespace ray {

// Environmenr variable, which indicates the pipe size of read.
//
// TODO(hjiang): Should document the env variable after end-to-end integration has
// finished.
inline constexpr std::string_view kPipeLogReadBufSizeEnv = "RAY_PIPE_LOG_READ_BUF_SIZE";

// File handle requires active destruction via owner calling [Close].
class RedirectionFileHandle {
public:
Expand Down
25 changes: 21 additions & 4 deletions src/ray/util/tests/pipe_logger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,18 @@ namespace {
constexpr std::string_view kLogLine1 = "hello\n";
constexpr std::string_view kLogLine2 = "world\n";

TEST(PipeLoggerTest, RedirectionTest) {
class PipeReadBufferSizeSetter {
public:
PipeReadBufferSizeSetter(size_t pipe_buffer_size) {
setEnv(kPipeLogReadBufSizeEnv.data(), absl::StrFormat("%d", pipe_buffer_size).data());
}
~PipeReadBufferSizeSetter() { unsetEnv(kPipeLogReadBufSizeEnv.data()); }
};

class PipeLoggerTest : public ::testing::TestWithParam<size_t> {};

TEST_P(PipeLoggerTest, RedirectionTest) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand All @@ -59,7 +70,8 @@ TEST(PipeLoggerTest, RedirectionTest) {
EXPECT_EQ(*actual_content, expected_content);
}

TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
TEST_P(PipeLoggerTest, RedirectionWithTee) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto test_file_path = scoped_directory.GetDirectory() / GenerateUUIDV4();

Expand Down Expand Up @@ -90,7 +102,8 @@ TEST(PipeLoggerTestWithTee, RedirectionWithTee) {
EXPECT_EQ(*actual_content, absl::StrFormat("%s%s", kLogLine1, kLogLine2));
}

TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {
TEST_P(PipeLoggerTest, RotatedRedirectionWithTee) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};
ScopedTemporaryDirectory scoped_directory;
const auto uuid = GenerateUUIDV4();
const auto test_file_path = scoped_directory.GetDirectory() / uuid;
Expand Down Expand Up @@ -134,7 +147,9 @@ TEST(PipeLoggerTestWithTee, RotatedRedirectionWithTee) {

// Testing senario: log to stdout and file; check whether these two sinks generate
// expected output.
TEST(PipeLoggerCompatTest, CompatibilityTest) {
TEST_P(PipeLoggerTest, CompatibilityTest) {
PipeReadBufferSizeSetter pipe_read_buffer_size_setter{GetParam()};

// Testing-1: No newliner in the middle nor at the end.
{
constexpr std::string_view kContent = "hello";
Expand Down Expand Up @@ -315,6 +330,8 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
}
}

INSTANTIATE_TEST_SUITE_P(PipeLoggerTest, PipeLoggerTest, testing::Values(1024, 3));

} // namespace

} // namespace ray

0 comments on commit 0953813

Please sign in to comment.