Skip to content

[core] spdlog newliner sink #50333

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ ray_cc_library(
],
)

ray_cc_library(
name = "spdlog_newliner_sink",
hdrs = ["spdlog_newliner_sink.h"],
deps = [
":compat",
":util",
"@com_github_spdlog//:spdlog",
],
)

ray_cc_library(
name = "temporary_directory",
hdrs = ["temporary_directory.h"],
Expand Down
83 changes: 83 additions & 0 deletions src/ray/util/spdlog_newliner_sink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <spdlog/sinks/base_sink.h>

#include <iostream>
#include <string_view>

#include "absl/strings/str_split.h"
#include "ray/util/compat.h"
#include "ray/util/util.h"

namespace ray {

// A sink which streams only after a newliner or explicit flush.
template <typename Mutex>
class spdlog_newliner_sink final : public spdlog::sinks::base_sink<Mutex> {
public:
explicit spdlog_newliner_sink(spdlog::sink_ptr internal_sink)
: internal_sink_(std::move(internal_sink)) {}

protected:
void sink_it_(const spdlog::details::log_msg &msg) override {
if (msg.payload.size() == 0) {
return;
}

const std::string_view new_content{msg.payload.data(), msg.payload.size()};
auto pos = new_content.find('\n');
if (pos == std::string_view::npos) {
buffer_ += new_content;
return;
}
std::vector<std::string_view> segments = absl::StrSplit(new_content, '\n');
for (int idx = 0; idx < static_cast<int>(segments.size()) - 1; ++idx) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int -> size_t ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safer to use int, because segment could be empty, then possible to suffer underflow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it can be empty in our case but ok.

std::string cur_message = std::move(buffer_);
buffer_.clear();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to call clear on a moved string and use it later? Should we do buffer_ = "" instead?

Copy link
Contributor Author

@dentiny dentiny Feb 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it safe to call clear on a moved string and use it later?

What do you mean? I didn't access its value. clear is a well-defined function.

cur_message += segments[idx];
// Compensate the missing newliner we miss when split.
cur_message += '\n';

spdlog::details::log_msg new_log_msg;
new_log_msg.payload = std::string_view{cur_message.data(), cur_message.length()};
internal_sink_->log(new_log_msg);
}

// If the last character for payload is already newliner, we've already flushed out
// everything; otherwise need to keep left bytes in buffer.
if (msg.payload[msg.payload.size() - 1] != '\n') {
buffer_ = std::string{segments.back()};
}
}
void flush_() override {
if (!buffer_.empty()) {
spdlog::details::log_msg new_log_msg;
new_log_msg.payload = std::string_view{buffer_.data(), buffer_.length()};
internal_sink_->log(std::move(new_log_msg));
}
}

private:
spdlog::sink_ptr internal_sink_;
// Sink flushes in lines, buffer keeps unflushed bytes for the next new line.
std::string buffer_;
};

using spdlog_newliner_sink_mt = spdlog_newliner_sink<std::mutex>;
using spdlog_newliner_sink_st = spdlog_newliner_sink<spdlog::details::null_mutex>;

} // namespace ray
12 changes: 12 additions & 0 deletions src/ray/util/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,18 @@ ray_cc_test(
tags = ["team:core"],
)

ray_cc_test(
name = "spdlog_newliner_sink_test",
srcs = ["spdlog_newliner_sink_test.cc"],
deps = [
"//src/ray/util:spdlog_fd_sink",
"//src/ray/util:spdlog_newliner_sink",
"@com_google_googletest//:gtest_main",
],
size = "small",
tags = ["team:core"],
)

ray_cc_test(
name = "temporary_directory_test",
srcs = ["temporary_directory_test.cc"],
Expand Down
110 changes: 110 additions & 0 deletions src/ray/util/tests/spdlog_newliner_sink_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/util/spdlog_newliner_sink.h"

#include <gtest/gtest.h>

#include <string_view>

#include "ray/util/compat.h"
#include "ray/util/spdlog_fd_sink.h"
#include "spdlog/sinks/basic_file_sink.h"

namespace ray {

namespace {

std::shared_ptr<spdlog::logger> CreateLogger() {
auto fd_formatter = std::make_unique<spdlog::pattern_formatter>(
"%v", spdlog::pattern_time_type::local, std::string(""));
auto fd_sink = std::make_shared<non_owned_fd_sink_st>(GetStdoutFd());
// We have to manually set the formatter, since it's not managed by logger.
fd_sink->set_formatter(std::move(fd_formatter));

auto sink = std::make_shared<spdlog_newliner_sink_st>(std::move(fd_sink));
auto logger_formatter = std::make_unique<spdlog::pattern_formatter>(
"%v", spdlog::pattern_time_type::local, std::string(""));
auto logger = std::make_shared<spdlog::logger>(/*name=*/"logger", std::move(sink));
logger->set_formatter(std::move(logger_formatter));
return logger;
}

TEST(NewlinerSinkTest, AppendAndFlushTest) {
// Case-1: string with newliner at the end.
{
auto logger = CreateLogger();
constexpr std::string_view kContent = "hello\n";

testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
}

// Case-2: string with no newliner at the end.
{
auto logger = CreateLogger();
constexpr std::string_view kContent = "hello";

testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
}

// Case-3: newliner in the middle, with trailing newliner.
{
auto logger = CreateLogger();
constexpr std::string_view kContent = "hello\nworld\n";

testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
}

// Case-4: newliner in the middle, without trailing newliner.
{
auto logger = CreateLogger();
constexpr std::string_view kContent = "hello\nworld";

testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent);
logger->flush();
const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
}

// Case-5: multiple writes.
{
auto logger = CreateLogger();
constexpr std::string_view kContent1 = "hello\nworld";
constexpr std::string_view kContent2 = "hello\nworld\n";

testing::internal::CaptureStdout();
logger->log(spdlog::level::info, kContent1);
logger->log(spdlog::level::info, kContent2);
logger->flush();
const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, absl::StrFormat("%s%s", kContent1, kContent2));
}
}

} // namespace

} // namespace ray