-
Notifications
You must be signed in to change notification settings - Fork 0
/
fileRateMonotonic.py
87 lines (79 loc) · 3.32 KB
/
fileRateMonotonic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Implementation of Rate Monotonic Algorithm (Cyclic Executive)
import numpy
import random
def getIntervals(tasks):
intervals = numpy.zeros(len(tasks),dtype=int)
for i,key in enumerate(tasks): intervals[i] = tasks[key]['intrvl']
return(intervals)
def eachTask_cyclicExecutionCost(period,interval,executionCost):
return(int((period/interval)*executionCost))
def check_conditions(tasks):
##print(tasks)
intervals = getIntervals(tasks).astype(int)
totalPeriod = numpy.lcm.reduce(intervals)
totalExecutionCost = 0
for i,key in enumerate(tasks):
totalExecutionCost += eachTask_cyclicExecutionCost(totalPeriod,tasks[key]['intrvl'],tasks[key]['wcet'][0])
##print(totalPeriod,totalExecutionCost)
if totalExecutionCost <= totalPeriod: return(True)
else: return(False)
def get_priorities(tasks):
priority_dict = {}
for i,key in enumerate(tasks): priority_dict[key] = tasks[key]['intrvl']
priority_dict = sorted(priority_dict, key=priority_dict.get)
return(priority_dict)
def dispatch_rm_scheduler(tasks,hyperInterval):
priorities = get_priorities(tasks)
totalPeriod = hyperInterval
taskScheduled = []
##print(priorities)
for item in priorities: tasks[item]['priority'] = priorities.index(item)
for i in range(totalPeriod):
for task in tasks:
if i % tasks[task]['intrvl'] == 0: taskScheduled.append([task,i,tasks[task]['wcet'][0],tasks[task]['priority']])
#print(taskScheduled)
executionQueue = []
tempQueue = []
for i in range(totalPeriod):
for item in taskScheduled:
if item[1] == i: tempQueue.append(item)
##print((tempQueue))
if len(tempQueue) == 0: executionQueue.append('Halt')
else:
tempQueue = sorted(tempQueue, key=lambda x: x[3])
executionQueue.append(tempQueue[0][0])
tempQueue[0][2] = tempQueue[0][2] - 1
if tempQueue[0][2] == 0: tempQueue.pop(0)
return(executionQueue)
def dispatchRateMonotonic(tasks,hyperInterval):
##print(tasks)
conditions_satisfiability = check_conditions(tasks)
if conditions_satisfiability == True:
taskExecutionQueue = dispatch_rm_scheduler(tasks,hyperInterval)
##print(taskExecutionQueue)
return(taskExecutionQueue)
else:
#print('Conditions Not Satified')
return(None)
def executeRateMonotonic(tasks,exeQueue,hyperInterval):
taskExe = {}
for key in tasks:
tasks[key]['exetime'] = random.choice(tasks[key]['wcet'])
tasks[key]['counter'] = tasks[key]['exetime']
taskExe[key] = tasks[key]['exetime']
#print(tasks[key]['exetime'])
##print(exeQueue)
for i in range(hyperInterval):
key = exeQueue[i]
if key != 'Halt':
for task in tasks:
if i % tasks[task]['intrvl'] == 0: tasks[task]['counter'] = tasks[task]['exetime']
#if i % tasks[key]['intrvl'] == 0 or :
##print(key)
#tasks[key]['counter'] = tasks[key]['exetime']
if tasks[key]['counter'] == 0:
exeQueue[i] = 'Halt'
else:
tasks[key]['counter'] = tasks[key]['counter'] - 1
##print(exeQueue)
return(exeQueue,taskExe)