Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update scheduler interface to accept updates for completed tasks. Add two new schedulers #252

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface Task : Resource {
*/
public val launchedAt: Instant?

/**
* Time the task was skipped by the scheduler because no host with enough resources was available.
*/
public var timesSkipped: Int

/**
* Request the task to be started.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class StartStopHostFault(
val tasks = host.instances

val sortedTasks = tasks.sortedBy { it.name }
val snapshots = sortedTasks.map { (it.meta["workload"] as SimWorkload).snapshot() }
val snapshots = sortedTasks.map { (it.meta["workload"] as SimWorkload).getSnapshot() }
host.fail()

for ((task, snapshot) in sortedTasks.zip(snapshots)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class TraceBasedFailureModel(
service: ComputeService,
random: RandomGenerator,
pathToTrace: String,
private val repeat: Boolean = false,
private val repeat: Boolean = true,
) : FailureModel(context, clock, service, random) {
private val failureList = loadTrace(pathToTrace)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.time.Duration;
import java.time.Instant;
import java.time.InstantSource;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -100,7 +100,7 @@ public final class ComputeService implements AutoCloseable {
/**
* The tasks that should be launched by the service.
*/
private final Deque<SchedulingRequest> taskQueue = new ArrayDeque<>();
private final LinkedHashSet<ServiceTask> taskQueue = new LinkedHashSet<>();

/**
* The active tasks in the system.
Expand Down Expand Up @@ -175,6 +175,8 @@ public void onStateChanged(@NotNull Host host, @NotNull Task task, @NotNull Task
hv.provisionedCores -= flavor.getCoreCount();
hv.instanceCount--;
hv.availableMemory += flavor.getMemorySize();

scheduler.removeTask(task, hv);
} else {
LOGGER.error("Unknown host {}", host);
}
Expand Down Expand Up @@ -309,17 +311,17 @@ public void close() {
/**
* Enqueue the specified [task] to be scheduled onto a host.
*/
SchedulingRequest schedule(ServiceTask task) {
void schedule(ServiceTask task) {
LOGGER.debug("Enqueueing task {} to be assigned to host", task.getUid());

long now = clock.millis();
SchedulingRequest request = new SchedulingRequest(task, now);
task.submitTime = now;
task.isCancelled = false;

task.launchedAt = Instant.ofEpochMilli(now);
taskQueue.add(request);
taskQueue.add(task);
tasksPending++;
requestSchedulingCycle();
return request;
}

void delete(ServiceFlavor flavor) {
Expand Down Expand Up @@ -354,29 +356,28 @@ private void requestSchedulingCycle() {
*/
private void doSchedule() {
// reorder tasks
Iterator<ServiceTask> iterator = taskQueue.iterator();

while (!taskQueue.isEmpty()) {
SchedulingRequest request = taskQueue.peek();
while (iterator.hasNext()) {
ServiceTask task = iterator.next();

if (request.isCancelled) {
taskQueue.poll();
if (task.isCancelled) {
iterator.remove();
tasksPending--;
continue;
}

final ServiceTask task = request.task;
// Check if all dependencies are met
// otherwise continue

final ServiceFlavor flavor = task.getFlavor();
final HostView hv = scheduler.select(request.task);
final HostView hv = scheduler.select(iterator);

if (hv == null || !hv.getHost().canFit(task)) {
LOGGER.trace("Task {} selected for scheduling but no capacity available for it at the moment", task);

if (flavor.getMemorySize() > maxMemory || flavor.getCoreCount() > maxCores) {
// Remove the incoming image
taskQueue.poll();
iterator.remove();
tasksPending--;
attemptsFailure++;

Expand All @@ -392,7 +393,7 @@ private void doSchedule() {
Host host = hv.getHost();

// Remove request from queue
taskQueue.poll();
iterator.remove();
tasksPending--;

LOGGER.info("Assigned task {} to host {}", task, host);
Expand All @@ -413,6 +414,7 @@ private void doSchedule() {
activeTasks.put(task, host);
} catch (Exception cause) {
LOGGER.error("Failed to deploy VM", cause);
scheduler.removeTask(task, hv);
attemptsError++;
}
}
Expand Down Expand Up @@ -601,19 +603,4 @@ public void rescheduleTask(@NotNull Task task, @NotNull SimWorkload workload) {
internalTask.start();
}
}

/**
* A request to schedule a {@link ServiceTask} onto one of the {@link Host}s.
*/
static class SchedulingRequest {
final ServiceTask task;
final long submitTime;

boolean isCancelled;

SchedulingRequest(ServiceTask task, long submitTime) {
this.task = task;
this.submitTime = submitTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class HostView {
long availableMemory;
int provisionedCores;

// Additional metadata to be used by scheduler
public int priorityIndex = -1;
public int listIndex = -1;

/**
* Construct a {@link HostView} instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public final class ServiceTask implements Task {
private TaskState state = TaskState.TERMINATED;
Instant launchedAt = null;
Host host = null;
private ComputeService.SchedulingRequest request = null;

// Scheduling request related fields
long submitTime;
boolean isCancelled;
int timesSkipped;

ServiceTask(
ComputeService service,
Expand All @@ -75,6 +79,9 @@ public final class ServiceTask implements Task {
this.image = image;
this.labels = labels;
this.meta = meta;

this.submitTime = -1;
this.isCancelled = false;
}

@NotNull
Expand Down Expand Up @@ -153,8 +160,8 @@ public void start() {
default:
LOGGER.info("User requested to start task {}", uid);
setState(TaskState.PROVISIONING);
assert request == null : "Scheduling request already active";
request = service.schedule(this);
assert submitTime == -1 : "Scheduling request already active";
service.schedule(this);
break;
}
}
Expand Down Expand Up @@ -246,10 +253,16 @@ void setState(TaskState state) {
* Cancel the provisioning request if active.
*/
private void cancelProvisioningRequest() {
final ComputeService.SchedulingRequest request = this.request;
if (request != null) {
this.request = null;
request.isCancelled = true;
}
this.isCancelled = true;
}

@Override
public int getTimesSkipped() {
return timesSkipped;
}

@Override
public void setTimesSkipped(int i) {
timesSkipped = i;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ public interface ComputeScheduler {
public fun removeHost(host: HostView)

/**
* Select a host for the specified [task].
* Select a host for the specified [iter].
* We implicity assume that the task has been scheduled onto the host.
*
* @param task The server to select a host for.
* @param iter The server to select a host for.
* @return The host to schedule the server on or `null` if no server is available.
*/
public fun select(task: Task): HostView?
public fun select(iter: MutableIterator<Task>): HostView?

/**
* Inform the scheduler that a task has been removed from the host.
* Could be due to completion or failure.
*/
public fun removeTask(task: Task, hv: HostView)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package org.opendc.compute.service.scheduler

import org.opendc.compute.api.Task
import org.opendc.compute.service.ComputeService
import org.opendc.compute.service.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import org.opendc.compute.service.scheduler.weights.HostWeigher
Expand Down Expand Up @@ -65,7 +66,8 @@ public class FilterScheduler(
hosts.remove(host)
}

override fun select(task: Task): HostView? {
override fun select(iter: MutableIterator<Task>): HostView? {
val task = iter.next()
val hosts = hosts
val filteredHosts = hosts.filter { host -> filters.all { filter -> filter.test(host, task) } }

Expand Down Expand Up @@ -108,4 +110,8 @@ public class FilterScheduler(
else -> subset[random.nextInt(maxSize)]
}
}

override fun removeTask(task: Task, hv: HostView) {
TODO("Not yet implemented")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.opendc.compute.service.scheduler

import org.opendc.compute.api.Task
import org.opendc.compute.service.HostView
import org.opendc.compute.service.scheduler.filters.HostFilter
import java.util.SplittableRandom
import java.util.random.RandomGenerator

public class MemorizingScheduler(
private val filters: List<HostFilter>,
private val random: RandomGenerator = SplittableRandom(0),
private val maxTimesSkipped: Int = 7
): ComputeScheduler {

// We assume that there will be max 200 tasks per host.
// The index of a host list is the number of tasks on that host.
private val hostsQueue = List(200, { mutableListOf<HostView>() })
private var minAvailableHost = 0
private var numHosts = 0

override fun addHost(host: HostView) {
val zeroQueue = hostsQueue[0]
zeroQueue.add(host)
host.listIndex = zeroQueue.size - 1
numHosts++
minAvailableHost = 0
}

override fun removeHost(host: HostView) {
val priorityIdx = host.priorityIndex
val listIdx = host.listIndex
val chosenList = hostsQueue[priorityIdx]

if (listIdx == chosenList.size - 1) {
chosenList.removeLast()
if (listIdx == minAvailableHost) {
for (i in minAvailableHost+1 .. hostsQueue.lastIndex) {
if (hostsQueue[i].size > 0) {
minAvailableHost = i
break
}
}
}
} else {
val lastItem = chosenList.removeLast()
chosenList[listIdx] = lastItem
lastItem.listIndex = listIdx
}
numHosts--
}

override fun select(iter: MutableIterator<Task>): HostView? {
if (numHosts == 0) {
return null
}

val chosenList = hostsQueue[minAvailableHost]
val nextList = hostsQueue[minAvailableHost + 1]
var chosenHost = chosenList.elementAt(random.nextInt(chosenList.size))

var taskFound = false
for (task in iter) {
val satisfied = filters.all { filter -> filter.test(chosenHost, task) }
if (satisfied) {
iter.remove()
taskFound = true
break
} else if (task.timesSkipped >= maxTimesSkipped) {
// Don't skip over it. Wait till a suitable host becomes available
val q = hostsQueue[minAvailableHost]
for (h in q) {
val satisfied = filters.all { filter -> filter.test(h, task) }
if (satisfied) {
chosenHost = h
taskFound = true
break
}
}
if (!taskFound) {
return null
}
} else {
task.timesSkipped++
}
}
if (!taskFound) return null // No task found that fits in the host

val listIdx = chosenHost.listIndex

if (listIdx == chosenList.size - 1) {
chosenList.removeLast()
if (chosenList.isEmpty()) minAvailableHost++
} else {
val lastItem = chosenList.removeLast()
chosenList[listIdx] = lastItem
lastItem.listIndex = listIdx
}
nextList.add(chosenHost)
chosenHost.priorityIndex = minAvailableHost + 1
chosenHost.listIndex = nextList.size - 1

return chosenHost
}

override fun removeTask(task: Task, hv: HostView) {
val priorityIdx = hv.priorityIndex
val listIdx = hv.listIndex
val chosenList = hostsQueue[priorityIdx]
val nextList = hostsQueue[priorityIdx - 1]

if (listIdx == chosenList.size - 1) {
chosenList.removeLast()
if (listIdx == minAvailableHost) {
minAvailableHost--
}
} else {
val lastItem = chosenList.removeLast()
chosenList[listIdx] = lastItem
lastItem.listIndex = listIdx
}
nextList.add(hv)
hv.priorityIndex = priorityIdx - 1
hv.listIndex = nextList.size - 1
}
}
Loading
Loading