Skip to content

Commit

Permalink
Merge pull request #4 from shivramsrivastava/dev
Browse files Browse the repository at this point in the history
Pod Affinity and Anti-affinity feature support
  • Loading branch information
shivramsrivastava authored Jun 14, 2018
2 parents 8bece94 + 0ee4ac1 commit 7c616c5
Show file tree
Hide file tree
Showing 13 changed files with 917 additions and 80 deletions.
2 changes: 2 additions & 0 deletions src/base/task_desc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,6 @@ message TaskDescriptor {
repeated LabelSelector label_selectors = 33;
/* Affinity */
Affinity affinity = 34;
// Namespace
string task_namespace = 35;
}
2 changes: 1 addition & 1 deletion src/engine/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Coordinator::Coordinator()
job_table_, associated_resources_, local_resource_topology_,
object_store_, task_table_, knowledge_base, topology_manager_,
m_adapter_, NULL, uuid_, FLAGS_listen_uri, time_manager_,
trace_generator_);
trace_generator_, NULL, NULL);
} else {
// Unknown scheduler specified, error.
LOG(FATAL) << "Unknown or unrecognized scheduler '" << FLAGS_scheduler
Expand Down
50 changes: 44 additions & 6 deletions src/scheduling/event_driven_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ EventDrivenScheduler::EventDrivenScheduler(
ResourceID_t coordinator_res_id,
const string& coordinator_uri,
TimeInterface* time_manager,
TraceGenerator* trace_generator)
: SchedulerInterface(job_map, knowledge_base, resource_map, resource_topology,
object_store, task_map),
TraceGenerator* trace_generator,
unordered_map<string, unordered_map<string, vector<TaskID_t>>>* labels_map,
vector<TaskID_t> *affinity_antiaffinity_tasks)
: SchedulerInterface(job_map, knowledge_base, resource_map,
resource_topology, object_store, task_map, labels_map,
affinity_antiaffinity_tasks),
coordinator_uri_(coordinator_uri),
coordinator_res_id_(coordinator_res_id),
event_notifier_(event_notifier),
Expand Down Expand Up @@ -646,9 +649,44 @@ void EventDrivenScheduler::LazyGraphReduction(
current_task->state() == TaskDescriptor::BLOCKING) {
if (!will_block || (current_task->dependencies_size() == 0
&& current_task->outputs_size() == 0)) {
current_task->set_state(TaskDescriptor::RUNNABLE);
InsertTaskIntoRunnables(JobIDFromString(current_task->job_id()),
current_task->uid());
// Pod affinity/anti-affinity
if (current_task->has_affinity() &&
(current_task->affinity().has_pod_affinity() ||
current_task->affinity().has_pod_anti_affinity())) {
if (queue_based_schedule == false || one_task_runnable == true)
continue;
for (auto itr = affinity_antiaffinity_tasks_->begin();
itr != affinity_antiaffinity_tasks_->end(); itr++) {
}
for (auto task_itr = affinity_antiaffinity_tasks_->begin();
task_itr != affinity_antiaffinity_tasks_->end(); task_itr++) {
TaskDescriptor* tdp = FindPtrOrNull(*task_map_, *task_itr);
if (tdp) {
if ((tdp->state() == TaskDescriptor::RUNNABLE) &&
(one_task_runnable == false)) {
TaskID_t task_id = *task_itr;
tdp->set_state(TaskDescriptor::CREATED);
affinity_antiaffinity_tasks_->erase(task_itr);
affinity_antiaffinity_tasks_->push_back(task_id);
JobID_t tdp_job_id = JobIDFromString(tdp->job_id());
runnable_tasks_[tdp_job_id].erase(task_id);
task_itr = affinity_antiaffinity_tasks_->begin();
continue;
}
}
if (tdp->state() == TaskDescriptor::CREATED) {
tdp->set_state(TaskDescriptor::RUNNABLE);
InsertTaskIntoRunnables(JobIDFromString(tdp->job_id()),
tdp->uid());
one_task_runnable = true;
break;
}
}
} else {
current_task->set_state(TaskDescriptor::RUNNABLE);
InsertTaskIntoRunnables(JobIDFromString(current_task->job_id()),
current_task->uid());
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion src/scheduling/event_driven_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ class EventDrivenScheduler : public SchedulerInterface {
ResourceID_t coordinator_res_id,
const string& coordinator_uri,
TimeInterface* time_manager,
TraceGenerator* trace_generator);
TraceGenerator* trace_generator,
unordered_map<string, unordered_map<string, vector<TaskID_t>>>*
labels_map,
vector<TaskID_t> *affinity_antiaffinity_tasks);
~EventDrivenScheduler();
virtual void AddJob(JobDescriptor* jd_ptr);
ResourceID_t* BoundResourceForTask(TaskID_t task_id);
Expand Down Expand Up @@ -183,6 +186,9 @@ class EventDrivenScheduler : public SchedulerInterface {
shared_ptr<TopologyManager> topology_manager_;
TimeInterface* time_manager_;
TraceGenerator* trace_generator_;
//Pod affinity/anti-affinity
bool one_task_runnable;
bool queue_based_schedule;
};

} // namespace scheduler
Expand Down
95 changes: 94 additions & 1 deletion src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#include <grpc++/grpc++.h>
#include <chrono>

#include "base/resource_status.h"
#include "base/resource_topology_node_desc.pb.h"
Expand Down Expand Up @@ -57,6 +58,7 @@ DEFINE_string(firmament_scheduler_service_address, "127.0.0.1",
DEFINE_string(firmament_scheduler_service_port, "9090",
"The port of the scheduler service");
DEFINE_string(service_scheduler, "flow", "Scheduler to use: flow | simple");
DEFINE_uint64(queue_based_scheduling_time, 1, "Queue Based Schedule run time");

namespace firmament {

Expand All @@ -78,7 +80,8 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
job_map_, resource_map_,
top_level_res_status->mutable_topology_node(), obj_store_, task_map_,
knowledge_base_, topology_manager_, sim_messaging_adapter_, NULL,
top_level_res_id_, "", &wall_time_, trace_generator_);
top_level_res_id_, "", &wall_time_, trace_generator_, &labels_map_,
&affinity_antiaffinity_tasks_);
} else if (FLAGS_service_scheduler == "simple") {
scheduler_ = new SimpleScheduler(
job_map_, resource_map_,
Expand Down Expand Up @@ -142,6 +145,24 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
if (deltas.size()) {
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
}

// Schedule tasks having pod affinity/anti-affinity
chrono::high_resolution_clock::time_point start =
chrono::high_resolution_clock::now();
chrono::duration<unsigned int> time_spent(
chrono::duration_values<unsigned int>::zero());
while (affinity_antiaffinity_tasks_.size() &&
(time_spent.count() < FLAGS_queue_based_scheduling_time)) {
scheduler_->ScheduleAllQueueJobs(&sstat, &deltas);
chrono::high_resolution_clock::time_point end =
chrono::high_resolution_clock::now();
time_spent = chrono::duration_cast<chrono::seconds>(end - start);
}
if(deltas.size()) {
LOG(INFO) << "QueueBasedSchedule: Got " << deltas.size()
<< " scheduling deltas";
}

for (auto& d : deltas) {
// LOG(INFO) << "Delta: " << d.DebugString();
SchedulingDelta* ret_delta = reply->add_deltas();
Expand All @@ -162,6 +183,39 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
return Status::OK;
}

// Pod affinity/anti-affinity
void RemoveTaskFromLabelsMap(const TaskDescriptor td) {
for (const auto& label : td.labels()) {
unordered_map<string, vector<TaskID_t>>* label_values =
FindOrNull(labels_map_, label.key());
if (!label_values) {
vector<TaskID_t>* labels_map_tasks =
FindOrNull(*label_values, label.value());
if (labels_map_tasks) {
vector<TaskID_t>::iterator it_pos = find(
labels_map_tasks->begin(), labels_map_tasks->end(), td.uid());
if (it_pos != labels_map_tasks->end()) {
labels_map_tasks->erase(it_pos);
if (!labels_map_tasks->size()) {
label_values->erase(label.value());
if (label_values->empty()) labels_map_.erase(label.key());
}
}
}
}
}
if (td.has_affinity() && (td.affinity().has_pod_affinity() ||
td.affinity().has_pod_anti_affinity())) {
vector<TaskID_t>::iterator it =
find(affinity_antiaffinity_tasks_.begin(),
affinity_antiaffinity_tasks_.end(), td.uid());
if (it != affinity_antiaffinity_tasks_.end()) {
affinity_antiaffinity_tasks_.erase(it);
}
}
}


Status TaskCompleted(ServerContext* context, const TaskUID* tid_ptr,
TaskCompletedResponse* reply) override {
TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, tid_ptr->task_uid());
Expand All @@ -182,6 +236,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
return Status::OK;
}
td_ptr->set_finish_time(wall_time_.GetCurrentTimestamp());
RemoveTaskFromLabelsMap(*td_ptr);
TaskFinalReport report;
scheduler_->HandleTaskCompletion(td_ptr, &report);
kb_populator_->PopulateTaskFinalReport(*td_ptr, &report);
Expand Down Expand Up @@ -212,6 +267,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
}
RemoveTaskFromLabelsMap(*td_ptr);
scheduler_->HandleTaskFailure(td_ptr);
reply->set_type(TaskReplyType::TASK_FAILED_OK);
return Status::OK;
Expand All @@ -226,6 +282,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
reply->set_type(TaskReplyType::TASK_NOT_FOUND);
return Status::OK;
}
RemoveTaskFromLabelsMap(*td_ptr);
// TODO(jagadish): We need to remove below code once we start
// getting machine resource stats samples from poseidon i.e., heapster.
// Currently updating machine samples statically based on state of pod.
Expand Down Expand Up @@ -261,6 +318,38 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
return Status::OK;
}

// Pod affinity/anti-affinity
// Adding labels of task to the labels_map_
void AddTaskToLabelsMap(const TaskDescriptor& td) {
TaskID_t task_id = td.uid();
for (const auto& label : td.labels()) {
unordered_map<string, vector<TaskID_t>>* label_values =
FindOrNull(labels_map_, label.key());
if (!label_values) {
vector<TaskID_t> tasks;
tasks.push_back(task_id);
unordered_map<string, vector<TaskID_t>> values;
CHECK(InsertIfNotPresent(&values, label.value(), tasks));
CHECK(InsertIfNotPresent(&labels_map_, label.key(), values));
} else {
vector<TaskID_t>* labels_map_tasks =
FindOrNull(*label_values, label.value());
if (!labels_map_tasks) {
vector<TaskID_t> value_tasks;
value_tasks.push_back(task_id);
CHECK(
InsertIfNotPresent(&(*label_values), label.value(), value_tasks));
} else {
labels_map_tasks->push_back(task_id);
}
}
}
if (td.has_affinity() && (td.affinity().has_pod_affinity() ||
td.affinity().has_pod_anti_affinity())) {
affinity_antiaffinity_tasks_.push_back(task_id);
}
}

Status TaskSubmitted(ServerContext* context,
const TaskDescription* task_desc_ptr,
TaskSubmittedResponse* reply) override {
Expand All @@ -275,6 +364,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
reply->set_type(TaskReplyType::TASK_STATE_NOT_CREATED);
return Status::OK;
}
AddTaskToLabelsMap(task_desc_ptr->task_descriptor());
JobID_t job_id = JobIDFromString(task_desc_ptr->task_descriptor().job_id());
JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id);
// LOG(INFO) << "Job id is " << job_id ;
Expand Down Expand Up @@ -534,6 +624,9 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
unordered_map<JobID_t, uint64_t, boost::hash<boost::uuids::uuid>>
job_num_tasks_to_remove_;
KnowledgeBasePopulator* kb_populator_;
//Pod affinity/anti-affinity
unordered_map<string, unordered_map<string, vector<TaskID_t>>> labels_map_;
vector<TaskID_t> affinity_antiaffinity_tasks_;

ResourceStatus* CreateTopLevelResource() {
ResourceID_t res_id = GenerateResourceID();
Expand Down
Loading

0 comments on commit 7c616c5

Please sign in to comment.