From 016501517363f590c9826b6649ba1008145d8288 Mon Sep 17 00:00:00 2001 From: dentiny Date: Wed, 5 Mar 2025 05:00:44 +0000 Subject: [PATCH] add cgroup in worker Signed-off-by: dentiny --- BUILD.bazel | 4 +++ src/ray/common/BUILD | 1 + src/ray/common/cgroup/BUILD | 10 +++++++ src/ray/common/cgroup/cgroup_context.h | 4 --- src/ray/common/cgroup/cgroup_manager.cc | 39 +++++++++++++++++++++++++ src/ray/common/cgroup/cgroup_manager.h | 26 +++++++++++++++++ src/ray/common/task/task_util.cc | 29 ++++++++++++++++++ src/ray/common/task/task_util.h | 6 ++++ src/ray/raylet/local_task_manager.cc | 36 ++++++++++++++++------- src/ray/raylet/local_task_manager.h | 6 ++-- 10 files changed, 144 insertions(+), 17 deletions(-) create mode 100644 src/ray/common/cgroup/cgroup_manager.cc create mode 100644 src/ray/common/cgroup/cgroup_manager.h create mode 100644 src/ray/common/task/task_util.cc diff --git a/BUILD.bazel b/BUILD.bazel index a4ac6caa8328e..bd4248ceedcf8 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1047,6 +1047,10 @@ ray_cc_library( ":scheduler", ":stats_lib", ":worker_rpc", + "//src/ray/common/cgroup:base_cgroup_setup", + "//src/ray/common/cgroup:cgroup_context", + "//src/ray/common/cgroup:cgroup_manager", + "//src/ray/common/cgroup:scoped_cgroup_handle", "//src/ray/protobuf:agent_manager_cc_proto", "//src/ray/protobuf:common_cc_proto", "//src/ray/protobuf:runtime_env_agent_cc_proto", diff --git a/src/ray/common/BUILD b/src/ray/common/BUILD index d732e0817058b..5c629d20471c2 100644 --- a/src/ray/common/BUILD +++ b/src/ray/common/BUILD @@ -161,6 +161,7 @@ ray_cc_library( "scheduling/scheduling_ids.cc", "task/task.cc", "task/task_spec.cc", + "task/task_util.cc", ], hdrs = [ "bundle_location_index.h", diff --git a/src/ray/common/cgroup/BUILD b/src/ray/common/cgroup/BUILD index 74cb31eba8b18..e58f12be1573d 100644 --- a/src/ray/common/cgroup/BUILD +++ b/src/ray/common/cgroup/BUILD @@ -58,3 +58,13 @@ ray_cc_library( "@com_google_absl//absl/synchronization", ], ) + +ray_cc_library( + name = "cgroup_manager", + srcs = ["cgroup_manager.cc"], + hdrs = ["cgroup_manager.h"], + deps = [ + ":base_cgroup_setup", + "@com_github_gflags_gflags//:gflags", + ], +) diff --git a/src/ray/common/cgroup/cgroup_context.h b/src/ray/common/cgroup/cgroup_context.h index 1efb40d01a1e8..72e3ad0127139 100644 --- a/src/ray/common/cgroup/cgroup_context.h +++ b/src/ray/common/cgroup/cgroup_context.h @@ -23,10 +23,6 @@ namespace ray { // Context used to setup cgroupv2 for a task / actor. struct AppProcCgroupMetadata { - // Directory for cgroup, which is applied to application process. - // - // TODO(hjiang): Revisit if we could save some CPU/mem with string view. - std::string cgroup_directory; // A unique id to uniquely identity a certain task / actor attempt. std::string id; // PID for the process. diff --git a/src/ray/common/cgroup/cgroup_manager.cc b/src/ray/common/cgroup/cgroup_manager.cc new file mode 100644 index 0000000000000..eafee94e3e0f0 --- /dev/null +++ b/src/ray/common/cgroup/cgroup_manager.cc @@ -0,0 +1,39 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/common/cgroup/cgroup_manager.h" + +#include + +DEFINE_bool(enable_cgroup, + false, + "A global config flag to decide whether to use cgroup, it applies for all " + "ray system components (like raylet and GCS) and application process."); + +namespace ray { + +// Here the possible types of cgroup setup classes are small, so we use if-else branch +// instead of registry pattern. +BaseCgroupSetup &GetCgroupSetup() { + if (FLAGS_enable_cgroup) { + // TODO(hjiang): Enable real cgroup setup after PR: + // https://github.com/ray-project/ray/pull/49941 + static auto noop_cgroup_setup = std::make_unique(); + return *noop_cgroup_setup; + } + static auto noop_cgroup_setup = std::make_unique(); + return *noop_cgroup_setup; +} + +} // namespace ray diff --git a/src/ray/common/cgroup/cgroup_manager.h b/src/ray/common/cgroup/cgroup_manager.h new file mode 100644 index 0000000000000..2fcf57ff342b8 --- /dev/null +++ b/src/ray/common/cgroup/cgroup_manager.h @@ -0,0 +1,26 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include "ray/common/cgroup/base_cgroup_setup.h" + +namespace ray { + +// A util function which gets cgroup setup. +BaseCgroupSetup &GetCgroupSetup(); + +} // namespace ray diff --git a/src/ray/common/task/task_util.cc b/src/ray/common/task/task_util.cc new file mode 100644 index 0000000000000..d93c64540973c --- /dev/null +++ b/src/ray/common/task/task_util.cc @@ -0,0 +1,29 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/common/task/task_util.h" + +#include +#include + +#include "absl/strings/str_format.h" + +namespace ray { + +std::string GetTaskAttemptId(const TaskID &task_id) { + static std::atomic global_attempt_id = 0; // Global attempt id. + return absl::StrFormat("%s_%u", task_id.Hex(), global_attempt_id++); +} + +} // namespace ray diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index 8d2f904672485..96ce85fe37b5b 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -14,6 +14,8 @@ #pragma once +#include + #include "ray/common/buffer.h" #include "ray/common/ray_object.h" #include "ray/common/task/task_spec.h" @@ -304,4 +306,8 @@ class TaskSpecBuilder { std::shared_ptr message_; }; +// Get a unique attempt id for the given task. +// Task attempt is is formatted as _. +std::string GetTaskAttemptId(const TaskID &task_id); + } // namespace ray diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 08be91eb189e9..3e8b33f0ab68e 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -18,6 +18,9 @@ #include +#include "ray/common/cgroup/cgroup_context.h" +#include "ray/common/cgroup/cgroup_manager.h" +#include "ray/common/task/task_util.h" #include "ray/stats/metric_defs.h" #include "ray/util/logging.h" @@ -354,7 +357,12 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { // confident we're ready to dispatch the task after all checks have // passed. sched_cls_info.next_update_time = std::numeric_limits::max(); - sched_cls_info.running_tasks.insert(spec.TaskId()); + auto new_task_iter = + sched_cls_info.running_tasks + .emplace(spec.TaskId(), std::make_unique()) + .first; + auto *scoped_cgroup_handler = new_task_iter->second.get(); + // The local node has the available resources to run the task, so we should run // it. work->allocated_instances = allocated_instances; @@ -365,13 +373,15 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { // task might be hanging. worker_pool_.PopWorker( spec, - [this, task_id, scheduling_class, work, is_detached_actor, owner_address]( - const std::shared_ptr worker, - PopWorkerStatus status, - const std::string &runtime_env_setup_error_message) -> bool { - // TODO(hjiang): After getting the ready-to-use worker and task id, we're - // able to get physical execution context. - // + [this, + scoped_cgroup_handler, + task_id, + scheduling_class, + work, + is_detached_actor, + owner_address](const std::shared_ptr worker, + PopWorkerStatus status, + const std::string &runtime_env_setup_error_message) -> bool { // ownership chain: raylet has-a node manager, node manager has-a local task // manager. // @@ -379,6 +389,14 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() { // - Attempt id: could pass a global attempt id generator from raylet // - Cgroup application folder: could pass from raylet + if (worker != nullptr) { + AppProcCgroupMetadata cgroup_metadata; + cgroup_metadata.id = GetTaskAttemptId(task_id); + cgroup_metadata.pid = worker->GetProcess().GetId(); + *scoped_cgroup_handler = + GetCgroupSetup().ApplyCgroupContext(cgroup_metadata); + } + return PoppedWorkerHandler(worker, status, task_id, @@ -721,8 +739,6 @@ void LocalTaskManager::RemoveFromRunningTasksIfExists(const RayTask &task) { auto sched_cls = task.GetTaskSpecification().GetSchedulingClass(); auto it = info_by_sched_cls_.find(sched_cls); if (it != info_by_sched_cls_.end()) { - // TODO(hjiang): After remove the task id from `running_tasks`, corresponding cgroup - // will be updated. it->second.running_tasks.erase(task.GetTaskSpecification().TaskId()); if (it->second.running_tasks.size() == 0) { info_by_sched_cls_.erase(it); diff --git a/src/ray/raylet/local_task_manager.h b/src/ray/raylet/local_task_manager.h index b50e14fcf8b94..aa7f6cc273066 100644 --- a/src/ray/raylet/local_task_manager.h +++ b/src/ray/raylet/local_task_manager.h @@ -16,6 +16,8 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" +#include "ray/common/cgroup/base_cgroup_setup.h" +#include "ray/common/cgroup/scoped_cgroup_handle.h" #include "ray/common/ray_object.h" #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" @@ -297,9 +299,7 @@ class LocalTaskManager : public ILocalTaskManager { capacity(cap), next_update_time(std::numeric_limits::max()) {} /// Track the running task ids in this scheduling class. - /// - /// TODO(hjiang): Store cgroup manager along with task id as the value for map. - absl::flat_hash_set running_tasks; + absl::flat_hash_map> running_tasks; /// The total number of tasks that can run from this scheduling class. const uint64_t capacity; /// The next time that a new task of this scheduling class may be dispatched.