Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiplexer update #278

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@ public record GuestCpuStats(
long lostTime,
double capacity,
double usage,
double demand,
double utilization) {}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public class Guest(
counters.cpuLostTime / 1000L,
counters.cpuCapacity,
counters.cpuSupply,
counters.cpuDemand,
counters.cpuSupply / cpuLimit,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ public object DfltTaskExportColumns {
field = Types.required(FLOAT).named("cpu_limit"),
) { it.cpuLimit }

public val CPU_USAGE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(FLOAT).named("cpu_usage"),
) { it.cpuUsage }

public val CPU_DEMAND: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(FLOAT).named("cpu_demand"),
) { it.cpuDemand }

public val CPU_TIME_ACTIVE: ExportColumn<TaskTableReader> =
ExportColumn(
field = Types.required(INT64).named("cpu_time_active"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ public interface TaskTableReader : Exportable {
*/
public val cpuLimit: Double

/**
* The CPU given to this task (in MHz).
*/
public val cpuUsage: Double

/**
* The CPU demanded by this task (in MHz).
*/
public val cpuDemand: Double

/**
* The duration (in seconds) that a CPU was active in the task.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class TaskTableReaderImpl(
_timestampAbsolute = table.timestampAbsolute

_cpuLimit = table.cpuLimit
_cpuDemand = table.cpuDemand
_cpuUsage = table.cpuUsage
_cpuActiveTime = table.cpuActiveTime
_cpuIdleTime = table.cpuIdleTime
_cpuStealTime = table.cpuStealTime
Expand Down Expand Up @@ -128,6 +130,14 @@ public class TaskTableReaderImpl(
get() = _cpuLimit
private var _cpuLimit = 0.0

override val cpuUsage: Double
get() = _cpuUsage
private var _cpuUsage = 0.0

override val cpuDemand: Double
get() = _cpuDemand
private var _cpuDemand = 0.0

override val cpuActiveTime: Long
get() = _cpuActiveTime - previousCpuActiveTime
private var _cpuActiveTime = 0L
Expand Down Expand Up @@ -181,14 +191,16 @@ public class TaskTableReaderImpl(
_timestampAbsolute = now + startTime

_cpuLimit = cpuStats?.capacity ?: 0.0
_cpuActiveTime = cpuStats?.activeTime ?: 0
_cpuIdleTime = cpuStats?.idleTime ?: 0
_cpuStealTime = cpuStats?.stealTime ?: 0
_cpuLostTime = cpuStats?.lostTime ?: 0
_uptime = sysStats?.uptime?.toMillis() ?: 0
_downtime = sysStats?.downtime?.toMillis() ?: 0
_cpuDemand = cpuStats?.demand ?: 0.0
_cpuUsage = cpuStats?.usage ?: 0.0
_cpuActiveTime = cpuStats?.activeTime ?: _cpuActiveTime
_cpuIdleTime = cpuStats?.idleTime ?: _cpuIdleTime
_cpuStealTime = cpuStats?.stealTime ?: _cpuStealTime
_cpuLostTime = cpuStats?.lostTime ?: _cpuLostTime
_uptime = sysStats?.uptime?.toMillis() ?: _uptime
_downtime = sysStats?.downtime?.toMillis() ?: _downtime
_provisionTime = task.launchedAt
_bootTime = sysStats?.bootTime
_bootTime = sysStats?.bootTime ?: _bootTime
_creationTime = task.createdAt
_finishTime = task.finishedAt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ class ScenarioIntegrationTest {
assertAll(
{ assertEquals(0, monitor.tasksTerminated) { "Idle time incorrect" } },
{ assertEquals(1, monitor.tasksCompleted) { "Idle time incorrect" } },
{ assertEquals(4297000, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(5003000, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(4296000, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(5004000, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(14824, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(2860800.0, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
)
Expand Down Expand Up @@ -294,8 +294,8 @@ class ScenarioIntegrationTest {

// Note that these values have been verified beforehand
assertAll(
{ assertEquals(1803918431, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(787181569, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(1803918432, monitor.idleTime) { "Idle time incorrect" } },
{ assertEquals(787181568, monitor.activeTime) { "Active time incorrect" } },
{ assertEquals(0, monitor.stealTime) { "Steal time incorrect" } },
{ assertEquals(0, monitor.lostTime) { "Lost time incorrect" } },
{ assertEquals(6.7565629E8, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
Expand Down Expand Up @@ -341,8 +341,8 @@ class ScenarioIntegrationTest {
{ assertEquals(0, monitor.tasksActive, "All VMs should finish after a run") },
{ assertEquals(0, monitor.attemptsFailure, "No VM should be unscheduled") },
{ assertEquals(0, monitor.tasksPending, "No VM should not be in the queue") },
{ assertEquals(43101787498, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412502, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(43101787496, monitor.idleTime) { "Incorrect idle time" } },
{ assertEquals(3489412504, monitor.activeTime) { "Incorrect active time" } },
{ assertEquals(0, monitor.stealTime) { "Incorrect steal time" } },
{ assertEquals(0, monitor.lostTime) { "Incorrect lost time" } },
{ assertEquals(1.0016123392181786E10, monitor.energyUsage, 1E4) { "Incorrect energy usage" } },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ public final class SimCpu extends FlowNode implements FlowSupplier, FlowConsumer

private double currentCpuDemand = 0.0f; // cpu capacity demanded by the mux
private double currentCpuUtilization = 0.0f;
private double currentPowerDemand = 0.0f; // power demanded of the psu
private double currentCpuSupplied = 0.0f; // cpu capacity supplied to the mux

private double currentPowerDemand = 0.0f; // power demanded of the psu
private double currentPowerSupplied = 0.0f; // cpu capacity supplied by the psu

private double maxCapacity;
Expand Down Expand Up @@ -122,7 +123,7 @@ public SimCpu(FlowGraph graph, CpuModel cpuModel, CpuPowerModel powerModel, int
public long onUpdate(long now) {
updateCounters(now);

this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;
this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);

// Calculate Power Demand and send to PSU
double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);
Expand All @@ -132,7 +133,7 @@ public long onUpdate(long now) {
}

// Calculate the amount of cpu this can provide
double cpuSupply = this.currentCpuDemand;
double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);

if (cpuSupply != this.currentCpuSupplied) {
this.pushSupply(this.muxEdge, cpuSupply);
Expand Down Expand Up @@ -205,6 +206,8 @@ public void handleDemand(FlowEdge consumerEdge, double newCpuDemand) {
this.currentCpuDemand = newCpuDemand;
this.currentCpuUtilization = this.currentCpuDemand / this.maxCapacity;

this.currentCpuUtilization = Math.min(this.currentCpuDemand / this.maxCapacity, 1.0);

// Calculate Power Demand and send to PSU
double powerDemand = this.cpuPowerModel.computePower(this.currentCpuUtilization);

Expand All @@ -223,7 +226,8 @@ public void handleSupply(FlowEdge supplierEdge, double newPowerSupply) {
this.currentPowerSupplied = newPowerSupply;

// Calculate the amount of cpu this can provide
double cpuSupply = this.currentCpuDemand;
double cpuSupply = Math.min(this.currentCpuDemand, this.maxCapacity);
;

if (cpuSupply != this.currentCpuSupplied) {
this.pushSupply(this.muxEdge, cpuSupply);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public class Multiplexer extends FlowNode implements FlowSupplier, FlowConsumer

private double totalDemand; // The total demand of all the consumers
private double totalSupply; // The total supply from the supplier

private boolean overProvisioned = false;
private int currentConsumerIdx = -1;

private double capacity; // What is the max capacity

public Multiplexer(FlowGraph graph) {
Expand All @@ -59,52 +63,63 @@ public double getCapacity() {

public long onUpdate(long now) {

if (this.totalDemand > this.capacity) {
redistributeSupply(this.consumerEdges, this.supplies, this.capacity);
} else {
for (int i = 0; i < this.demands.size(); i++) {
this.supplies.set(i, this.demands.get(i));
return Long.MAX_VALUE;
}

private void distributeSupply() {
// if supply >= demand -> push supplies to all tasks
// TODO: possible optimization -> Only has to be done for the specific consumer that changed demand
if (this.totalSupply >= this.totalDemand) {

// If this came from a state of over provisioning, provide all consumers with their demand
if (this.overProvisioned) {
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
this.pushSupply(this.consumerEdges.get(idx), this.demands.get(idx));
}
}
}

double totalSupply = 0;
for (int i = 0; i < this.consumerEdges.size(); i++) {
this.pushSupply(this.consumerEdges.get(i), this.supplies.get(i));
totalSupply += this.supplies.get(i);
if (this.currentConsumerIdx != -1) {
this.pushSupply(
this.consumerEdges.get(this.currentConsumerIdx), this.demands.get(this.currentConsumerIdx));
this.currentConsumerIdx = -1;
}

this.overProvisioned = false;
}

// Only update supplier if supply has changed
if (this.totalSupply != totalSupply) {
this.totalSupply = totalSupply;
// if supply < demand -> distribute the supply over all consumers
else {
this.overProvisioned = true;
double[] supplies = redistributeSupply(this.demands, this.totalSupply);

pushDemand(this.supplierEdge, this.totalSupply);
for (int idx = 0; idx < this.consumerEdges.size(); idx++) {
this.pushSupply(this.consumerEdges.get(idx), supplies[idx]);
}
}

return Long.MAX_VALUE;
}

private static double redistributeSupply(
ArrayList<FlowEdge> consumerEdges, ArrayList<Double> supplies, double capacity) {
final long[] consumers = new long[consumerEdges.size()];
private record Demand(int idx, double value) {}

for (int i = 0; i < consumers.length; i++) {
FlowEdge consumer = consumerEdges.get(i);
private static double[] redistributeSupply(ArrayList<Double> demands, double totalSupply) {
int inputSize = demands.size();

if (consumer == null) {
break;
}
final double[] supplies = new double[inputSize];
final Demand[] tempDemands = new Demand[inputSize];

consumers[i] = (Double.doubleToRawLongBits(consumer.getDemand()) << 32) | (i & 0xFFFFFFFFL);
for (int i = 0; i < inputSize; i++) {
tempDemands[i] = new Demand(i, demands.get(i));
}
Arrays.sort(consumers);

double availableCapacity = capacity;
int inputSize = consumers.length;
Arrays.sort(tempDemands, (o1, o2) -> {
Double i1 = o1.value;
Double i2 = o2.value;
return i1.compareTo(i2);
});

double availableCapacity = totalSupply; // totalSupply

for (int i = 0; i < inputSize; i++) {
long v = consumers[i];
int slot = (int) v;
double d = Double.longBitsToDouble((int) (v >> 32));
double d = tempDemands[i].value;

if (d == 0.0) {
continue;
Expand All @@ -113,12 +128,13 @@ private static double redistributeSupply(
double availableShare = availableCapacity / (inputSize - i);
double r = Math.min(d, availableShare);

supplies.set(slot, r); // Update the rates
int idx = tempDemands[i].idx;
supplies[idx] = r; // Update the rates
availableCapacity -= r;
}

// Return the used capacity
return capacity - availableCapacity;
return supplies;
}

/**
Expand All @@ -132,17 +148,13 @@ public void addConsumerEdge(FlowEdge consumerEdge) {
this.consumerEdges.add(consumerEdge);
this.demands.add(0.0);
this.supplies.add(0.0);

this.invalidate();
}

@Override
public void addSupplierEdge(FlowEdge supplierEdge) {
this.supplierEdge = supplierEdge;
this.capacity = supplierEdge.getCapacity();
this.totalSupply = 0;

this.invalidate();
}

@Override
Expand All @@ -164,7 +176,9 @@ public void removeConsumerEdge(FlowEdge consumerEdge) {
this.consumerEdges.get(i).setConsumerIndex(i);
}

this.invalidate();
this.currentConsumerIdx = -1;

this.pushDemand(this.supplierEdge, this.totalDemand);
}

@Override
Expand All @@ -178,28 +192,34 @@ public void removeSupplierEdge(FlowEdge supplierEdge) {
public void handleDemand(FlowEdge consumerEdge, double newDemand) {
int idx = consumerEdge.getConsumerIndex();

this.currentConsumerIdx = idx;

if (idx == -1) {
System.out.println("Error (Multiplexer): Demand pushed by an unknown consumer");
return;
}

// Update the total demand (This is cheaper than summing over all demands)
double prevDemand = demands.get(idx);
demands.set(idx, newDemand);

demands.set(idx, newDemand);
this.totalDemand += (newDemand - prevDemand);

if (this.totalDemand <= this.capacity) {

this.totalSupply = this.totalDemand;
this.pushDemand(this.supplierEdge, this.totalSupply);

this.pushSupply(consumerEdge, newDemand);
if (overProvisioned) {
distributeSupply();
}
// TODO: add behaviour if capacity is reached

// Send new totalDemand to CPU
// TODO: Look at what happens if total demand is not changed (if total demand is higher than totalSupply)
this.pushDemand(this.supplierEdge, this.totalDemand);
}

@Override
public void handleSupply(FlowEdge supplierEdge, double newSupply) {}
public void handleSupply(FlowEdge supplierEdge, double newSupply) {
this.totalSupply = newSupply; // Currently this is from a single supply, might turn into multiple suppliers

this.distributeSupply();
}

@Override
public void pushDemand(FlowEdge supplierEdge, double newDemand) {
Expand Down
Loading