From f1e4cf09bcad986f544feba86951f1b6f9bbc641 Mon Sep 17 00:00:00 2001 From: dentiny Date: Sat, 1 Feb 2025 05:05:59 +0000 Subject: [PATCH] compatibility function for file descriptor Signed-off-by: dentiny --- src/ray/util/BUILD | 5 ++ src/ray/util/compat.cc | 76 +++++++++++++++++++++++++++++ src/ray/util/compat.h | 15 ++++++ src/ray/util/tests/BUILD | 12 +++++ src/ray/util/tests/compat_test.cc | 80 +++++++++++++++++++++++++++++++ 5 files changed, 188 insertions(+) create mode 100644 src/ray/util/compat.cc create mode 100644 src/ray/util/tests/compat_test.cc diff --git a/src/ray/util/BUILD b/src/ray/util/BUILD index 8aaea0862abe0..d98b45535e90d 100644 --- a/src/ray/util/BUILD +++ b/src/ray/util/BUILD @@ -256,6 +256,11 @@ ray_cc_library( ray_cc_library( name = "compat", hdrs = ["compat.h"], + srcs = ["compat.cc"], + deps = [ + ":logging", + "//src/ray/common:status", + ], ) ray_cc_library( diff --git a/src/ray/util/compat.cc b/src/ray/util/compat.cc new file mode 100644 index 0000000000000..91d88c09c228e --- /dev/null +++ b/src/ray/util/compat.cc @@ -0,0 +1,76 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/util/compat.h" + +#include + +#include "ray/util/logging.h" + +#if defined(__APPLE__) || defined(__linux__) +#include +#elif defined(_WIN32) +#include +#endif + +namespace ray { + +#if defined(__APPLE__) || defined(__linux__) +Status CompleteWrite(MEMFD_TYPE_NON_UNIQUE fd, const char *data, size_t len) { + const ssize_t ret = write(fd, data, len); + if (ret == -1) { + return Status::IOError("") << "Fails to write to file because " << strerror(errno); + } + if (ret != static_cast(len)) { + return Status::IOError("") << "Fails to write all requested bytes, requests to write " + << len << " bytes, but actually write " << ret << " bytes"; + } + return Status::OK(); +} +void Flush(MEMFD_TYPE_NON_UNIQUE fd) { + RAY_CHECK_EQ(fdatasync(fd), 0) << "Fails to flush file because " << strerror(errno); +} +Status Close(MEMFD_TYPE_NON_UNIQUE fd) { + const int ret = close(fd); + if (ret != 0) { + return Status::IOError("") << "Fails to flush file because " << strerror(errno); + } + return Status::OK(); +} +#elif defined(_WIN32) +Status CompleteWrite(MEMFD_TYPE_NON_UNIQUE fd, const char *data, size_t len) { + DWORD bytes_written; + BOOL success = WriteFile(fd, data, (DWORD)len, &bytes_written, NULL); + if (!success) { + return Status::IOError("") << "Fails to write to file"; + } + if ((DWORD)len != bytes_written) { + return Status::IOError("") << "Fails to write all requested bytes, requests to write " + << len << " bytes, but actually write " << bytes_written + << " bytes"; + } + return Status::OK(); +} +void Flush(MEMFD_TYPE_NON_UNIQUE fd) { + RAY_CHECK(FlushFileBuffers(fd)) << "Fails to flush file"; +} +Status Close(MEMFD_TYPE_NON_UNIQUE fd) { + if (!CloseHandle(fd)) { + return Status::IOError("") << "Fails to close file handle"; + } + return Status::OK(); +} +#endif + +} // namespace ray diff --git a/src/ray/util/compat.h b/src/ray/util/compat.h index 371192331084a..0df811c70cca0 100644 --- a/src/ray/util/compat.h +++ b/src/ray/util/compat.h @@ -31,6 +31,8 @@ #pragma once +#include "ray/common/status.h" + // Workaround for multithreading on XCode 9, see // https://issues.apache.org/jira/browse/ARROW-1622 and // https://github.com/tensorflow/tensorflow/issues/13220#issuecomment-331579775 @@ -72,3 +74,16 @@ mach_port_t pthread_mach_thread_np(pthread_t); // since fd values can get re-used by the operating system. #define MEMFD_TYPE std::pair #define INVALID_UNIQUE_FD_ID 0 + +namespace ray { +// Write the whole content into file descriptor, if any error happens, or actual written +// content is less than expected, IO error status will be returned. +Status CompleteWrite(MEMFD_TYPE_NON_UNIQUE fd, const char *data, size_t len); +// Flush the given file descriptor, if any error happens, error message is logged and +// process exits directly. +// Reference to fsyncgate: https://wiki.postgresql.org/wiki/Fsync_Errors +void Flush(MEMFD_TYPE_NON_UNIQUE fd); +// Close the given file descriptor, if any error happens, IO error status will be +// returned. +Status Close(MEMFD_TYPE_NON_UNIQUE fd); +} // namespace ray diff --git a/src/ray/util/tests/BUILD b/src/ray/util/tests/BUILD index d8365df7ea6b5..f5658ec42a38c 100644 --- a/src/ray/util/tests/BUILD +++ b/src/ray/util/tests/BUILD @@ -261,3 +261,15 @@ ray_cc_test( size = "small", tags = ["team:core"], ) + +ray_cc_test( + name = "compat_test", + srcs = ["compat_test.cc"], + deps = [ + "//src/ray/util:compat", + "//src/ray/util:temporary_directory", + "@com_google_googletest//:gtest_main", + ], + size = "small", + tags = ["team:core"], +) diff --git a/src/ray/util/tests/compat_test.cc b/src/ray/util/tests/compat_test.cc new file mode 100644 index 0000000000000..0519d1904b6b1 --- /dev/null +++ b/src/ray/util/tests/compat_test.cc @@ -0,0 +1,80 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/util/compat.h" + +#include + +#include + +#include "ray/util/filesystem.h" +#include "ray/util/temporary_directory.h" + +#if defined(__APPLE__) || defined(__linux__) +#include +#include +#include +#include +#elif defined(_WIN32) +#include +#endif + +namespace ray { + +namespace { + +constexpr std::string_view kContent = "helloworld"; + +#if defined(__APPLE__) || defined(__linux__) +TEST(CompatTest, WriteTest) { + ScopedTemporaryDirectory temp_dir; + const auto file = temp_dir.GetDirectory() / "file"; + const auto file_path_str = file.native(); + const int fd = open(file_path_str.data(), O_CREAT | O_WRONLY | O_TRUNC, 0644); + ASSERT_GT(fd, 0); + + RAY_CHECK_OK(CompleteWrite(fd, kContent.data(), kContent.length())); + Flush(fd); + RAY_CHECK_OK(Close(fd)); + + RAY_ASSIGN_OR_CHECK(const auto content, ReadEntireFile(file_path_str)); + EXPECT_EQ(content, kContent); +} +#elif defined(_WIN32) +TEST(CompatTest, WriteTest) { + ScopedTemporaryDirectory temp_dir; + const auto file = temp_dir.GetDirectory() / "file"; + const auto file_path_str = file.native(); + HANDLE handle = CreateFile(file_path_str.c_str(), // File path + GENERIC_WRITE, // Open for writing + 0, // No sharing + NULL, // Default security + CREATE_ALWAYS, // Create new file (overwrite if exists) + FILE_ATTRIBUTE_NORMAL, // Normal file attributes + NULL // No template + ); + ASSERT_NE(handle, INVALID_HANDLE_VALUE); + + RAY_CHECK_OK(CompleteWrite(fd, kContent.data(), kContent.length())); + Flush(fd); + RAY_CHECK_OK(Close(fd)); + + RAY_ASSIGN_OR_CHECK(const auto content, ReadEntireFile(file_path_str)); + EXPECT_EQ(content, kContent); +} +#endif + +} // namespace + +} // namespace ray