Skip to content

Commit

Permalink
add cgroup in worker
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Mar 6, 2025
1 parent 678a8d7 commit 0165015
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 17 deletions.
4 changes: 4 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,10 @@ ray_cc_library(
":scheduler",
":stats_lib",
":worker_rpc",
"//src/ray/common/cgroup:base_cgroup_setup",
"//src/ray/common/cgroup:cgroup_context",
"//src/ray/common/cgroup:cgroup_manager",
"//src/ray/common/cgroup:scoped_cgroup_handle",
"//src/ray/protobuf:agent_manager_cc_proto",
"//src/ray/protobuf:common_cc_proto",
"//src/ray/protobuf:runtime_env_agent_cc_proto",
Expand Down
1 change: 1 addition & 0 deletions src/ray/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ ray_cc_library(
"scheduling/scheduling_ids.cc",
"task/task.cc",
"task/task_spec.cc",
"task/task_util.cc",
],
hdrs = [
"bundle_location_index.h",
Expand Down
10 changes: 10 additions & 0 deletions src/ray/common/cgroup/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,13 @@ ray_cc_library(
"@com_google_absl//absl/synchronization",
],
)

ray_cc_library(
name = "cgroup_manager",
srcs = ["cgroup_manager.cc"],
hdrs = ["cgroup_manager.h"],
deps = [
":base_cgroup_setup",
"@com_github_gflags_gflags//:gflags",
],
)
4 changes: 0 additions & 4 deletions src/ray/common/cgroup/cgroup_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ namespace ray {

// Context used to setup cgroupv2 for a task / actor.
struct AppProcCgroupMetadata {
// Directory for cgroup, which is applied to application process.
//
// TODO(hjiang): Revisit if we could save some CPU/mem with string view.
std::string cgroup_directory;
// A unique id to uniquely identity a certain task / actor attempt.
std::string id;
// PID for the process.
Expand Down
39 changes: 39 additions & 0 deletions src/ray/common/cgroup/cgroup_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/common/cgroup/cgroup_manager.h"

#include <gflags/gflags.h>

DEFINE_bool(enable_cgroup,
false,
"A global config flag to decide whether to use cgroup, it applies for all "
"ray system components (like raylet and GCS) and application process.");

namespace ray {

// Here the possible types of cgroup setup classes are small, so we use if-else branch
// instead of registry pattern.
BaseCgroupSetup &GetCgroupSetup() {
if (FLAGS_enable_cgroup) {
// TODO(hjiang): Enable real cgroup setup after PR:
// https://github.com/ray-project/ray/pull/49941
static auto noop_cgroup_setup = std::make_unique<NoopCgroupSetup>();
return *noop_cgroup_setup;
}
static auto noop_cgroup_setup = std::make_unique<NoopCgroupSetup>();
return *noop_cgroup_setup;
}

} // namespace ray
26 changes: 26 additions & 0 deletions src/ray/common/cgroup/cgroup_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

#include "ray/common/cgroup/base_cgroup_setup.h"

namespace ray {

// A util function which gets cgroup setup.
BaseCgroupSetup &GetCgroupSetup();

} // namespace ray
29 changes: 29 additions & 0 deletions src/ray/common/task/task_util.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/common/task/task_util.h"

#include <atomic>
#include <cstdint>

#include "absl/strings/str_format.h"

namespace ray {

std::string GetTaskAttemptId(const TaskID &task_id) {
static std::atomic<uint64_t> global_attempt_id = 0; // Global attempt id.
return absl::StrFormat("%s_%u", task_id.Hex(), global_attempt_id++);
}

} // namespace ray
6 changes: 6 additions & 0 deletions src/ray/common/task/task_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <string>

#include "ray/common/buffer.h"
#include "ray/common/ray_object.h"
#include "ray/common/task/task_spec.h"
Expand Down Expand Up @@ -304,4 +306,8 @@ class TaskSpecBuilder {
std::shared_ptr<rpc::TaskSpec> message_;
};

// Get a unique attempt id for the given task.
// Task attempt is is formatted as <task-id>_<attempt-id>.
std::string GetTaskAttemptId(const TaskID &task_id);

} // namespace ray
36 changes: 26 additions & 10 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include <boost/range/join.hpp>

#include "ray/common/cgroup/cgroup_context.h"
#include "ray/common/cgroup/cgroup_manager.h"
#include "ray/common/task/task_util.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/logging.h"

Expand Down Expand Up @@ -354,7 +357,12 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
// confident we're ready to dispatch the task after all checks have
// passed.
sched_cls_info.next_update_time = std::numeric_limits<int64_t>::max();
sched_cls_info.running_tasks.insert(spec.TaskId());
auto new_task_iter =
sched_cls_info.running_tasks
.emplace(spec.TaskId(), std::make_unique<ScopedCgroupHandler>())
.first;
auto *scoped_cgroup_handler = new_task_iter->second.get();

// The local node has the available resources to run the task, so we should run
// it.
work->allocated_instances = allocated_instances;
Expand All @@ -365,20 +373,30 @@ void LocalTaskManager::DispatchScheduledTasksToWorkers() {
// task might be hanging.
worker_pool_.PopWorker(
spec,
[this, task_id, scheduling_class, work, is_detached_actor, owner_address](
const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
// TODO(hjiang): After getting the ready-to-use worker and task id, we're
// able to get physical execution context.
//
[this,
scoped_cgroup_handler,
task_id,
scheduling_class,
work,
is_detached_actor,
owner_address](const std::shared_ptr<WorkerInterface> worker,
PopWorkerStatus status,
const std::string &runtime_env_setup_error_message) -> bool {
// ownership chain: raylet has-a node manager, node manager has-a local task
// manager.
//
// - PID: could get from available worker
// - Attempt id: could pass a global attempt id generator from raylet
// - Cgroup application folder: could pass from raylet

if (worker != nullptr) {
AppProcCgroupMetadata cgroup_metadata;
cgroup_metadata.id = GetTaskAttemptId(task_id);
cgroup_metadata.pid = worker->GetProcess().GetId();
*scoped_cgroup_handler =
GetCgroupSetup().ApplyCgroupContext(cgroup_metadata);
}

return PoppedWorkerHandler(worker,
status,
task_id,
Expand Down Expand Up @@ -721,8 +739,6 @@ void LocalTaskManager::RemoveFromRunningTasksIfExists(const RayTask &task) {
auto sched_cls = task.GetTaskSpecification().GetSchedulingClass();
auto it = info_by_sched_cls_.find(sched_cls);
if (it != info_by_sched_cls_.end()) {
// TODO(hjiang): After remove the task id from `running_tasks`, corresponding cgroup
// will be updated.
it->second.running_tasks.erase(task.GetTaskSpecification().TaskId());
if (it->second.running_tasks.size() == 0) {
info_by_sched_cls_.erase(it);
Expand Down
6 changes: 3 additions & 3 deletions src/ray/raylet/local_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "ray/common/cgroup/base_cgroup_setup.h"
#include "ray/common/cgroup/scoped_cgroup_handle.h"
#include "ray/common/ray_object.h"
#include "ray/common/task/task.h"
#include "ray/common/task/task_common.h"
Expand Down Expand Up @@ -297,9 +299,7 @@ class LocalTaskManager : public ILocalTaskManager {
capacity(cap),
next_update_time(std::numeric_limits<int64_t>::max()) {}
/// Track the running task ids in this scheduling class.
///
/// TODO(hjiang): Store cgroup manager along with task id as the value for map.
absl::flat_hash_set<TaskID> running_tasks;
absl::flat_hash_map<TaskID, std::unique_ptr<ScopedCgroupHandler>> running_tasks;
/// The total number of tasks that can run from this scheduling class.
const uint64_t capacity;
/// The next time that a new task of this scheduling class may be dispatched.
Expand Down

0 comments on commit 0165015

Please sign in to comment.