Skip to content

Commit

Permalink
Communicate task completion to Scheduler interface. Add two schedulers
Browse files Browse the repository at this point in the history
  • Loading branch information
sacheendra committed Sep 12, 2024
1 parent ad8051f commit ae25b88
Show file tree
Hide file tree
Showing 13 changed files with 500 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface Task : Resource {
*/
public val launchedAt: Instant?

/**
* Time the task was skipped by the scheduler because no host with enough resources was available.
*/
public var timesSkipped: Int

/**
* Request the task to be started.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.time.Duration;
import java.time.Instant;
import java.time.InstantSource;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -100,7 +100,7 @@ public final class ComputeService implements AutoCloseable {
/**
* The tasks that should be launched by the service.
*/
private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
private final LinkedHashSet<ServiceTask> taskQueue = new LinkedHashSet<>();

/**
* The active tasks in the system.
Expand Down Expand Up @@ -175,6 +175,8 @@ public void onStateChanged(@NotNull Host host, @NotNull Task task, @NotNull Task
hv.provisionedCores -= flavor.getCoreCount();
hv.instanceCount--;
hv.availableMemory += flavor.getMemorySize();

scheduler.removeTask(task, hv);
} else {
LOGGER.error("Unknown host {}", host);
}
Expand Down Expand Up @@ -309,17 +311,17 @@ public void close() {
/**
* Enqueue the specified [task] to be scheduled onto a host.
*/
SchedulingRequest schedule(ServiceTask task) {
void schedule(ServiceTask task) {
LOGGER.debug("Enqueueing task {} to be assigned to host", task.getUid());

long now = clock.millis();
SchedulingRequest request = new SchedulingRequest(task, now);
task.submitTime = now;
task.isCancelled = false;

task.launchedAt = Instant.ofEpochMilli(now);
taskQueue.add(request);
taskQueue.add(task);
tasksPending++;
requestSchedulingCycle();
return request;
}

void delete(ServiceFlavor flavor) {
Expand Down Expand Up @@ -354,29 +356,28 @@ private void requestSchedulingCycle() {
*/
private void doSchedule() {
// reorder tasks
Iterator<ServiceTask> iterator = taskQueue.iterator();

while (!taskQueue.isEmpty()) {
SchedulingRequest request = taskQueue.peek();
while (iterator.hasNext()) {
ServiceTask task = iterator.next();

if (request.isCancelled) {
taskQueue.poll();
if (task.isCancelled) {
iterator.remove();
tasksPending--;
continue;
}

final ServiceTask task = request.task;
// Check if all dependencies are met
// otherwise continue

final ServiceFlavor flavor = task.getFlavor();
final HostView hv = scheduler.select(request.task);
final HostView hv = scheduler.select(iterator);

if (hv == null || !hv.getHost().canFit(task)) {
LOGGER.trace("Task {} selected for scheduling but no capacity available for it at the moment", task);

if (flavor.getMemorySize() > maxMemory || flavor.getCoreCount() > maxCores) {
// Remove the incoming image
taskQueue.poll();
iterator.remove();
tasksPending--;
attemptsFailure++;

Expand All @@ -392,7 +393,7 @@ private void doSchedule() {
Host host = hv.getHost();

// Remove request from queue
taskQueue.poll();
iterator.remove();
tasksPending--;

LOGGER.info("Assigned task {} to host {}", task, host);
Expand All @@ -413,6 +414,7 @@ private void doSchedule() {
activeTasks.put(task, host);
} catch (Exception cause) {
LOGGER.error("Failed to deploy VM", cause);
scheduler.removeTask(task, hv);
attemptsError++;
}
}
Expand Down Expand Up @@ -601,19 +603,4 @@ public void rescheduleTask(@NotNull Task task, @NotNull SimWorkload workload) {
internalTask.start();
}
}

/**
* A request to schedule a {@link ServiceTask} onto one of the {@link Host}s.
*/
static class SchedulingRequest {
final ServiceTask task;
final long submitTime;

boolean isCancelled;

SchedulingRequest(ServiceTask task, long submitTime) {
this.task = task;
this.submitTime = submitTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class HostView {
long availableMemory;
int provisionedCores;

// Additional metadata to be used by scheduler
public int priorityIndex = -1;
public int listIndex = -1;

/**
* Construct a {@link HostView} instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public final class ServiceTask implements Task {
private TaskState state = TaskState.TERMINATED;
Instant launchedAt = null;
Host host = null;
private ComputeService.SchedulingRequest request = null;

// Scheduling request related fields
long submitTime;
boolean isCancelled;
int timesSkipped;

ServiceTask(
ComputeService service,
Expand All @@ -75,6 +79,9 @@ public final class ServiceTask implements Task {
this.image = image;
this.labels = labels;
this.meta = meta;

this.submitTime = -1;
this.isCancelled = false;
}

@NotNull
Expand Down Expand Up @@ -153,8 +160,8 @@ public void start() {
default:
LOGGER.info("User requested to start task {}", uid);
setState(TaskState.PROVISIONING);
assert request == null : "Scheduling request already active";
request = service.schedule(this);
assert submitTime == -1 : "Scheduling request already active";
service.schedule(this);
break;
}
}
Expand Down Expand Up @@ -246,10 +253,16 @@ void setState(TaskState state) {
* Cancel the provisioning request if active.
*/
private void cancelProvisioningRequest() {
final ComputeService.SchedulingRequest request = this.request;
if (request != null) {
this.request = null;
request.isCancelled = true;
}
this.isCancelled = true;
}

@Override
public int getTimesSkipped() {
return timesSkipped;
}

@Override
public void setTimesSkipped(int i) {
timesSkipped = i;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ public interface ComputeScheduler {
public fun removeHost(host: HostView)

/**
* Select a host for the specified [task].
* Select a host for the specified [iter].
* We implicity assume that the task has been scheduled onto the host.
*
* @param task The server to select a host for.
* @param iter The server to select a host for.
* @return The host to schedule the server on or `null` if no server is available.
*/
public fun select(task: Task): HostView?
public fun select(iter: MutableIterator<Task>): HostView?

/**
* Inform the scheduler that a task has been removed from the host.
* Could be due to completion or failure.
*/
public fun removeTask(task: Task, hv: HostView)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package org.opendc.compute.service.scheduler

import org.opendc.compute.api.Task
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
Expand Down Expand Up @@ -65,7 +66,8 @@ public class FilterScheduler(
hosts.remove(host)
}

override fun select(task: Task): HostView? {
override fun select(iter: MutableIterator<Task>): HostView? {
val task = iter.next()
val hosts = hosts
val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } }

Expand Down Expand Up @@ -108,4 +110,8 @@ public class FilterScheduler(
else -> subset[random.nextInt(maxSize)]
}
}

override fun removeTask(task: Task, hv: HostView) {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.opendc.compute.service.scheduler

import org.opendc.compute.api.Task
import org.opendc.compute.service.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import java.util.SplittableRandom
import java.util.random.RandomGenerator

public class MemorizingScheduler(
private val filters: List<HostFilter>,
private val random: RandomGenerator = SplittableRandom(0),
private val maxTimesSkipped: Int = 7
): ComputeScheduler {

// We assume that there will be max 200 tasks per host.
// The index of a host list is the number of tasks on that host.
private val hostsQueue = List(200, { mutableListOf<HostView>() })
private var minAvailableHost = 0
private var numHosts = 0

override fun addHost(host: HostView) {
val zeroQueue = hostsQueue[0]
zeroQueue.add(host)
host.listIndex = zeroQueue.size - 1
numHosts++
minAvailableHost = 0
}

override fun removeHost(host: HostView) {
val priorityIdx = host.priorityIndex
val listIdx = host.listIndex
val chosenList = hostsQueue[priorityIdx]

if (listIdx == chosenList.size - 1) {
chosenList.removeLast()
if (listIdx == minAvailableHost) {
for (i in minAvailableHost+1 .. hostsQueue.lastIndex) {
if (hostsQueue[i].size > 0) {
minAvailableHost = i
break
}
}
}
} else {
val lastItem = chosenList.removeLast()
chosenList[listIdx] = lastItem
lastItem.listIndex = listIdx
}
numHosts--
}

override fun select(iter: MutableIterator<Task>): HostView? {
if (numHosts == 0) {
return null
}

val chosenList = hostsQueue[minAvailableHost]
val nextList = hostsQueue[minAvailableHost + 1]
var chosenHost = chosenList.elementAt(random.nextInt(chosenList.size))

var taskFound = false
for (task in iter) {
val satisfied = filters.all { filter -> filter.test(chosenHost, task) }
if (satisfied) {
iter.remove()
taskFound = true
break
} else if (task.timesSkipped >= maxTimesSkipped) {
// Don't skip over it. Wait till a suitable host becomes available
val q = hostsQueue[minAvailableHost]
for (h in q) {
val satisfied = filters.all { filter -> filter.test(h, task) }
if (satisfied) {
chosenHost = h
taskFound = true
break
}
}
if (!taskFound) {
return null
}
} else {
task.timesSkipped++
}
}
if (!taskFound) return null // No task found that fits in the host

val listIdx = chosenHost.listIndex

if (listIdx == chosenList.size - 1) {
chosenList.removeLast()
if (chosenList.isEmpty()) minAvailableHost++
} else {
val lastItem = chosenList.removeLast()
chosenList[listIdx] = lastItem
lastItem.listIndex = listIdx
}
nextList.add(chosenHost)
chosenHost.priorityIndex = minAvailableHost + 1
chosenHost.listIndex = nextList.size - 1

return chosenHost
}

override fun removeTask(task: Task, hv: HostView) {
val priorityIdx = hv.priorityIndex
val listIdx = hv.listIndex
val chosenList = hostsQueue[priorityIdx]
val nextList = hostsQueue[priorityIdx - 1]

if (listIdx == chosenList.size - 1) {
chosenList.removeLast()
if (listIdx == minAvailableHost) {
minAvailableHost--
}
} else {
val lastItem = chosenList.removeLast()
chosenList[listIdx] = lastItem
lastItem.listIndex = listIdx
}
nextList.add(hv)
hv.priorityIndex = priorityIdx - 1
hv.listIndex = nextList.size - 1
}
}
Loading

0 comments on commit ae25b88

Please sign in to comment.