Skip to content

Commit

Permalink
rewritten the checkpointing model (atlarge-research#250)
Browse files Browse the repository at this point in the history
* Updated the checkpointing system to use SimTrace. The checkpoint model can now also scale, which means the interval between checkpoints can increase or decrease over time.

* spotless kotlin

* Fixed tests

* spotless apply
  • Loading branch information
DanteNiewenhuis authored and sacheendra committed Sep 12, 2024
1 parent 304cbfb commit 920f02d
Show file tree
Hide file tree
Showing 18 changed files with 536 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class StartStopHostFault(
val tasks = host.instances

val sortedTasks = tasks.sortedBy { it.name }
val snapshots = sortedTasks.map { (it.meta["workload"] as SimWorkload).snapshot() }
val snapshots = sortedTasks.map { (it.meta["workload"] as SimWorkload).getSnapshot() }
host.fail()

for ((task, snapshot) in sortedTasks.zip(snapshots)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class TraceBasedFailureModel(
service: ComputeService,
random: RandomGenerator,
pathToTrace: String,
private val repeat: Boolean = false,
private val repeat: Boolean = true,
) : FailureModel(context, clock, service, random) {
private val failureList = loadTrace(pathToTrace)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ internal object DefaultWorkloadMapper : SimWorkloadMapper {

override fun createWorkload(task: Task): SimWorkload {
val workload = delegate.createWorkload(task)

// FIXME: look at connecting this to frontend. Probably not needed since the duration is so small
val bootWorkload = SimWorkloads.runtime(Duration.ofMillis(1), 0.8, 0L, 0L)
return SimWorkloads.chain(bootWorkload, workload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ public suspend fun ComputeService.replay(
delay(max(0, (start - now - simulationOffset)))
}

val checkpointTime = checkpointModelSpec?.checkpointTime ?: 0L
val checkpointWait = checkpointModelSpec?.checkpointWait ?: 0L
val checkpointInterval = checkpointModelSpec?.checkpointInterval ?: 0L
val checkpointDuration = checkpointModelSpec?.checkpointDuration ?: 0L
val checkpointIntervalScaling = checkpointModelSpec?.checkpointIntervalScaling ?: 1.0

// val workload = SimRuntimeWorkload(
// entry.duration,
Expand All @@ -131,7 +132,7 @@ public suspend fun ComputeService.replay(
// checkpointWait
// )

val workload = entry.trace.createWorkload(start, checkpointTime, checkpointWait)
val workload = entry.trace.createWorkload(start, checkpointInterval, checkpointDuration, checkpointIntervalScaling)
val meta = mutableMapOf<String, Any>("workload" to workload)

launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ public fun runScenario(
addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id)

val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
service.replay(timeSource, tasks, failureModelSpec = scenario.failureModelSpec, seed = seed)
service.replay(
timeSource,
tasks,
failureModelSpec = scenario.failureModelSpec,
checkpointModelSpec = scenario.checkpointModelSpec,
seed = seed,
)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kotlinx.serialization.Serializable

@Serializable
public data class CheckpointModelSpec(
val checkpointWait: Long = 60 * 60 * 1000,
val checkpointTime: Long = 5 * 60 * 1000,
val checkpointInterval: Long = 60 * 60 * 1000,
val checkpointDuration: Long = 5 * 60 * 1000,
val checkpointIntervalScaling: Double = 1.0,
)
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,25 @@ public class SimTFDevice(
output = null
}

override fun makeSnapshot(now: Long) {}

override fun setOffset(now: Long) {}

override fun snapshot(): SimWorkload = throw UnsupportedOperationException()
override fun getSnapshot(): SimWorkload = throw UnsupportedOperationException()

override fun createCheckpointModel() {}

override fun getCheckpointInterval(): Long {
return -1
}

override fun getCheckpointDuration(): Long {
return -1
}

override fun getCheckpointIntervalScaling(): Double {
return -1.0
}

override fun onUpdate(
ctx: FlowStage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public abstract static class Context implements SimMachineContext {
private final Map<String, Object> meta;
private final Consumer<Exception> completion;
private boolean isClosed;
private SimWorkload snapshot;

/**
* Construct a new {@link Context} instance.
Expand All @@ -131,8 +132,13 @@ public final Map<String, Object> getMeta() {
}

@Override
public SimWorkload snapshot() {
return workload.snapshot();
public void makeSnapshot(long now) {
this.snapshot = workload.getSnapshot();
}

@Override
public SimWorkload getSnapshot(long now) {
return this.snapshot;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public interface SimMachineContext {
*
* @throws UnsupportedOperationException if the workload does not support snapshotting.
*/
SimWorkload snapshot();
void makeSnapshot(long now);

SimWorkload getSnapshot(long now);

/**
* Reset all resources of the machine.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,35 @@ public void onStop(SimMachineContext ctx) {
}

@Override
public SimWorkload snapshot() {
public void makeSnapshot(long now) {
throw new UnsupportedOperationException("Unable to snapshot hypervisor");
}

@Override
public SimWorkload getSnapshot() {
throw new UnsupportedOperationException("Unable to snapshot hypervisor");
}

@Override
public void createCheckpointModel() {
throw new UnsupportedOperationException("Unable to create a checkpointing system for a hypervisor");
}

@Override
public long getCheckpointInterval() {
return -1;
}

@Override
public long getCheckpointDuration() {
return -1;
}

@Override
public double getCheckpointIntervalScaling() {
return -1;
}

/**
* The context which carries the state when the hypervisor is running on a machine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package org.opendc.simulator.compute.workload;

import java.time.InstantSource;
import java.util.List;
import java.util.Map;
import org.opendc.simulator.compute.SimMachineContext;
Expand All @@ -30,6 +31,8 @@
import org.opendc.simulator.compute.SimProcessingUnit;
import org.opendc.simulator.compute.SimStorageInterface;
import org.opendc.simulator.flow2.FlowGraph;
import org.opendc.simulator.flow2.FlowStage;
import org.opendc.simulator.flow2.FlowStageLogic;

/**
* A {@link SimWorkload} that composes two {@link SimWorkload}s.
Expand All @@ -40,6 +43,13 @@ final class SimChainWorkload implements SimWorkload {

private Context activeContext;

private long checkpointInterval = 0;
private long checkpointDuration = 0;

private double checkpointIntervalScaling = 1.0;
private CheckPointModel checkpointModel;
private SimChainWorkload snapshot;

/**
* Construct a {@link SimChainWorkload} instance.
*
Expand All @@ -48,6 +58,13 @@ final class SimChainWorkload implements SimWorkload {
*/
SimChainWorkload(SimWorkload[] workloads, int activeWorkloadIndex) {
this.workloads = workloads;

if (this.workloads.length > 1) {
checkpointInterval = this.workloads[1].getCheckpointInterval();
checkpointDuration = this.workloads[1].getCheckpointDuration();
checkpointIntervalScaling = this.workloads[1].getCheckpointIntervalScaling();
}

this.activeWorkloadIndex = activeWorkloadIndex;
}

Expand All @@ -60,6 +77,21 @@ final class SimChainWorkload implements SimWorkload {
this(workloads, 0);
}

@Override
public long getCheckpointInterval() {
return checkpointInterval;
}

@Override
public long getCheckpointDuration() {
return checkpointDuration;
}

@Override
public double getCheckpointIntervalScaling() {
return checkpointIntervalScaling;
}

@Override
public void setOffset(long now) {
for (SimWorkload workload : this.workloads) {
Expand All @@ -79,6 +111,11 @@ public void onStart(SimMachineContext ctx) {
final Context context = new Context(ctx);
activeContext = context;

if (checkpointInterval > 0) {
this.createCheckpointModel();
this.checkpointModel.start();
}

tryThrow(context.doStart(workloads[activeWorkloadIndex]));
}

Expand All @@ -94,27 +131,106 @@ public void onStop(SimMachineContext ctx) {
final Context context = activeContext;
activeContext = null;

if (this.checkpointModel != null) {
this.checkpointModel.stop();
}

tryThrow(context.doStop(workloads[activeWorkloadIndex]));
}

@Override
public SimChainWorkload snapshot() {
public void makeSnapshot(long now) {
final int activeWorkloadIndex = this.activeWorkloadIndex;
final SimWorkload[] workloads = this.workloads;
final SimWorkload[] newWorkloads = new SimWorkload[workloads.length - activeWorkloadIndex];

for (int i = 0; i < newWorkloads.length; i++) {
newWorkloads[i] = workloads[activeWorkloadIndex + i].snapshot();
workloads[activeWorkloadIndex + i].makeSnapshot(now);
newWorkloads[i] = workloads[activeWorkloadIndex + i].getSnapshot();
}

this.snapshot = new SimChainWorkload(newWorkloads, 0);
}

@Override
public SimChainWorkload getSnapshot() {
return this.snapshot;
}

@Override
public void createCheckpointModel() {
this.checkpointModel = new CheckPointModel(
activeContext, this, this.checkpointInterval, this.checkpointDuration, this.checkpointIntervalScaling);
}

private class CheckPointModel implements FlowStageLogic {
private SimChainWorkload workload;
private long checkpointInterval;
private long checkpointDuration;
private double checkpointIntervalScaling;
private FlowStage stage;

private long startOfInterval;
private Boolean firstCheckPoint = true;

CheckPointModel(
Context context,
SimChainWorkload workload,
long checkpointInterval,
long checkpointDuration,
double checkpointIntervalScaling) {
this.checkpointInterval = checkpointInterval;
this.checkpointDuration = checkpointDuration;
this.checkpointIntervalScaling = checkpointIntervalScaling;
this.workload = workload;

this.stage = context.getGraph().newStage(this);

InstantSource clock = this.stage.getGraph().getEngine().getClock();

this.startOfInterval = clock.millis();
}

@Override
public long onUpdate(FlowStage ctx, long now) {
long passedTime = now - startOfInterval;
long remainingTime = this.checkpointInterval - passedTime;

if (!this.firstCheckPoint) {
remainingTime += this.checkpointDuration;
}

// Interval not completed
if (remainingTime > 0) {
return now + remainingTime;
}

workload.makeSnapshot(now);
if (firstCheckPoint) {
this.firstCheckPoint = false;
}

// Scale the interval time between checkpoints based on the provided scaling
this.checkpointInterval = (long) (this.checkpointInterval * this.checkpointIntervalScaling);

return now + this.checkpointInterval + this.checkpointDuration;
}

public void start() {
this.stage.sync();
}

return new SimChainWorkload(newWorkloads, 0);
public void stop() {
this.stage.close();
}
}

/**
* A {@link SimMachineContext} that intercepts the shutdown calls.
*/
private class Context implements SimMachineContext {
private final SimMachineContext ctx;
private SimWorkload snapshot;

private Context(SimMachineContext ctx) {
this.ctx = ctx;
Expand Down Expand Up @@ -151,9 +267,16 @@ public List<? extends SimStorageInterface> getStorageInterfaces() {
}

@Override
public SimWorkload snapshot() {
public void makeSnapshot(long now) {
final SimWorkload workload = workloads[activeWorkloadIndex];
return workload.snapshot();
this.snapshot = workload.getSnapshot();
}

@Override
public SimWorkload getSnapshot(long now) {
this.makeSnapshot(now);

return this.snapshot;
}

@Override
Expand Down Expand Up @@ -192,6 +315,9 @@ public void shutdown(Exception cause) {
}
}

if (SimChainWorkload.this.checkpointModel != null) {
SimChainWorkload.this.checkpointModel.stop();
}
ctx.shutdown(cause);
}

Expand Down
Loading

0 comments on commit 920f02d

Please sign in to comment.