Skip to content

Commit

Permalink
compatibility function for file descriptor
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Feb 1, 2025
1 parent 42fbdf9 commit f1e4cf0
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ ray_cc_library(
ray_cc_library(
name = "compat",
hdrs = ["compat.h"],
srcs = ["compat.cc"],
deps = [
":logging",
"//src/ray/common:status",
],
)

ray_cc_library(
Expand Down
76 changes: 76 additions & 0 deletions src/ray/util/compat.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/util/compat.h"

#include <cstring>

#include "ray/util/logging.h"

#if defined(__APPLE__) || defined(__linux__)
#include <unistd.h>
#elif defined(_WIN32)
#include <windows.h>
#endif

namespace ray {

#if defined(__APPLE__) || defined(__linux__)
Status CompleteWrite(MEMFD_TYPE_NON_UNIQUE fd, const char *data, size_t len) {
const ssize_t ret = write(fd, data, len);
if (ret == -1) {
return Status::IOError("") << "Fails to write to file because " << strerror(errno);
}
if (ret != static_cast<ssize_t>(len)) {
return Status::IOError("") << "Fails to write all requested bytes, requests to write "
<< len << " bytes, but actually write " << ret << " bytes";
}
return Status::OK();
}
void Flush(MEMFD_TYPE_NON_UNIQUE fd) {
RAY_CHECK_EQ(fdatasync(fd), 0) << "Fails to flush file because " << strerror(errno);
}
Status Close(MEMFD_TYPE_NON_UNIQUE fd) {
const int ret = close(fd);
if (ret != 0) {
return Status::IOError("") << "Fails to flush file because " << strerror(errno);
}
return Status::OK();
}
#elif defined(_WIN32)
Status CompleteWrite(MEMFD_TYPE_NON_UNIQUE fd, const char *data, size_t len) {
DWORD bytes_written;
BOOL success = WriteFile(fd, data, (DWORD)len, &bytes_written, NULL);
if (!success) {
return Status::IOError("") << "Fails to write to file";
}
if ((DWORD)len != bytes_written) {
return Status::IOError("") << "Fails to write all requested bytes, requests to write "
<< len << " bytes, but actually write " << bytes_written
<< " bytes";
}
return Status::OK();
}
void Flush(MEMFD_TYPE_NON_UNIQUE fd) {
RAY_CHECK(FlushFileBuffers(fd)) << "Fails to flush file";
}
Status Close(MEMFD_TYPE_NON_UNIQUE fd) {
if (!CloseHandle(fd)) {
return Status::IOError("") << "Fails to close file handle";
}
return Status::OK();
}
#endif

} // namespace ray
15 changes: 15 additions & 0 deletions src/ray/util/compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

#pragma once

#include "ray/common/status.h"

// Workaround for multithreading on XCode 9, see
// https://issues.apache.org/jira/browse/ARROW-1622 and
// https://github.com/tensorflow/tensorflow/issues/13220#issuecomment-331579775
Expand Down Expand Up @@ -72,3 +74,16 @@ mach_port_t pthread_mach_thread_np(pthread_t);
// since fd values can get re-used by the operating system.
#define MEMFD_TYPE std::pair<MEMFD_TYPE_NON_UNIQUE, int64_t>
#define INVALID_UNIQUE_FD_ID 0

namespace ray {
// Write the whole content into file descriptor, if any error happens, or actual written
// content is less than expected, IO error status will be returned.
Status CompleteWrite(MEMFD_TYPE_NON_UNIQUE fd, const char *data, size_t len);
// Flush the given file descriptor, if any error happens, error message is logged and
// process exits directly.
// Reference to fsyncgate: https://wiki.postgresql.org/wiki/Fsync_Errors
void Flush(MEMFD_TYPE_NON_UNIQUE fd);
// Close the given file descriptor, if any error happens, IO error status will be
// returned.
Status Close(MEMFD_TYPE_NON_UNIQUE fd);
} // namespace ray
12 changes: 12 additions & 0 deletions src/ray/util/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,15 @@ ray_cc_test(
size = "small",
tags = ["team:core"],
)

ray_cc_test(
name = "compat_test",
srcs = ["compat_test.cc"],
deps = [
"//src/ray/util:compat",
"//src/ray/util:temporary_directory",
"@com_google_googletest//:gtest_main",
],
size = "small",
tags = ["team:core"],
)
80 changes: 80 additions & 0 deletions src/ray/util/tests/compat_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/util/compat.h"

#include <gtest/gtest.h>

#include <string_view>

#include "ray/util/filesystem.h"
#include "ray/util/temporary_directory.h"

#if defined(__APPLE__) || defined(__linux__)
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#elif defined(_WIN32)
#include <windows.h>
#endif

namespace ray {

namespace {

constexpr std::string_view kContent = "helloworld";

#if defined(__APPLE__) || defined(__linux__)
TEST(CompatTest, WriteTest) {
ScopedTemporaryDirectory temp_dir;
const auto file = temp_dir.GetDirectory() / "file";
const auto file_path_str = file.native();
const int fd = open(file_path_str.data(), O_CREAT | O_WRONLY | O_TRUNC, 0644);
ASSERT_GT(fd, 0);

RAY_CHECK_OK(CompleteWrite(fd, kContent.data(), kContent.length()));
Flush(fd);
RAY_CHECK_OK(Close(fd));

RAY_ASSIGN_OR_CHECK(const auto content, ReadEntireFile(file_path_str));
EXPECT_EQ(content, kContent);
}
#elif defined(_WIN32)
TEST(CompatTest, WriteTest) {
ScopedTemporaryDirectory temp_dir;
const auto file = temp_dir.GetDirectory() / "file";
const auto file_path_str = file.native();
HANDLE handle = CreateFile(file_path_str.c_str(), // File path
GENERIC_WRITE, // Open for writing
0, // No sharing
NULL, // Default security
CREATE_ALWAYS, // Create new file (overwrite if exists)
FILE_ATTRIBUTE_NORMAL, // Normal file attributes
NULL // No template
);
ASSERT_NE(handle, INVALID_HANDLE_VALUE);

RAY_CHECK_OK(CompleteWrite(fd, kContent.data(), kContent.length()));
Flush(fd);
RAY_CHECK_OK(Close(fd));

RAY_ASSIGN_OR_CHECK(const auto content, ReadEntireFile(file_path_str));
EXPECT_EQ(content, kContent);
}
#endif

} // namespace

} // namespace ray

0 comments on commit f1e4cf0

Please sign in to comment.