-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathalgo.py
156 lines (127 loc) · 5.93 KB
/
algo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import numpy as np
import utility as util
import GetTransPower
import RunPF
class algo:
def __init__(self, DSSObj, env, P, Q, slot_len_in_min=10, start_hr=0, mode='change', params={}):
self.DSSObj = DSSObj
#self.P = mat['P']
#self.Q = mat['Q']
self.env = env
self.mode = mode
self.params = params
self.P_init = P
self.Q_init = Q
self.trans_accu = 0.90
self.slot_len_in_min = slot_len_in_min
self.n_slot_per_hr = 60 // self.slot_len_in_min
#self.start_slot = start_slot*slot_len_in_min
self.current_slot = start_hr*self.n_slot_per_hr
#self.time_factor = 3600.0/self.slot_len_in_min
self.remaining_demand = np.array(env['demand']) # In kWmin
#self.remaining_demand_d = 60.0*np.array(env['demand']) # In kWmin
#self.max_rate = 1+60.0*np.array(env['demand']) # In kWmin
self.max_rate = 8.0*np.ones(env['evNumber']) # In kW
self.max_rate_scaler = 1.0
#self.remaining_demand /= self.max_rate_scaler
self.arrival = np.round(np.array(self.env['evArrival'])/slot_len_in_min)
self.duration = np.round(np.array(self.env['evDuration'])/slot_len_in_min)
self.claimed_duration = np.round(np.array(self.env['evClaimedDuration'])/slot_len_in_min)
self.discrepancy = np.array(self.env['discrepancy'])/slot_len_in_min
def get_connected(self):
connected = []
for i in range(0, self.env['evNumber']):
if self.current_slot >= self.arrival[i] and self.current_slot <= self.arrival[i] + self.duration[i] and self.remaining_demand[i] > 0.0:
connected.append(i)
return np.array(connected)
def get_over_time(self, connected):
over_time = []
for c in connected:
if self.current_slot > self.arrival[c] + self.claimed_duration[c]:
over_time.append(True)
else:
over_time.append(False)
return over_time
def get_claimed(self, connected):
claimed = []
for c in connected:
claimed.append(self.claimed_duration[c])
return claimed
def get_urgent(self, connected):
urgent = []
for e in connected:
if self.current_slot < self.arrival[e]+self.claimed_duration[e]:
urgent.append(e)
return np.array(urgent)
def get_laxity(self, urgent, scale=1.0):
laxity = []
for u in urgent:
l = (self.arrival[u]+self.claimed_duration[u]-self.current_slot) + scale*self.discrepancy[u]
l -= (self.remaining_demand[u]/self.max_rate[u])/self.slot_len_in_min
laxity.append(l)
return np.array(laxity)
def get_discrepancy(self, connected):
discrepancy = []
for c in connected:
discrepancy.append(self.env['discrepancy'][c])
return np.array(discrepancy)
def get_driver_type(self, connected):
driver_type = []
for c in connected:
driver_type.append(self.env['evDriverType'][c])
return np.array(driver_type)
def update_remaining_demand(self, ev_power, duration):
self.remaining_demand = np.maximum(0.0, self.remaining_demand - ev_power*duration)
def get_trans_load(self, ev_power, P, Q): # In kVA
if self.mode=='change':
within_hr = int(self.current_slot % (60 // self.slot_len_in_min))
DSSCircuit = RunPF.runPF(self.DSSObj, P[:, within_hr], Q[:, within_hr], self.env['evNodeNumber'], ev_power)
elif self.mode=='constant':
DSSCircuit = RunPF.runPF(self.DSSObj, self.P_init[:, 0], self.Q_init[:, 0], self.env['evNodeNumber'], ev_power)
# get the transformer power magnitudes
trans_loads = GetTransPower.getTransPower(DSSCircuit)
trans_loads = np.ravel(trans_loads)
trans_loads = [np.sqrt(trans_loads[i]**2+trans_loads[i+1]**2) for i in range(0,len(trans_loads),2)]
return(np.array(trans_loads))
def get_available(self, trans_loads):
return self.trans_accu*np.maximum(0.0, np.array(self.env['transRating']) - trans_loads)/self.max_rate_scaler
def get_UB(self, connected):
#temp = np.minimum(self.remaining_demand/self.slot_len_in_min, self.max_rate)
temp = self.max_rate
#temp = np.maximum(util.tol, temp)
return np.array([temp[e] for e in connected])
def get_TAU(self, connected, P, Q, whole=0):
if whole==1:
connected = np.array(range(0,self.env['evNumber']))
T = []
A = []
U = []
trans_number = [self.env['evNodeNumber'][e]//55+1 for e in connected]
phase_number = [self.env['loadPhase'][self.env['evNodeNumber'][e]%55]-1 for e in connected]
available = self.get_available(self.get_trans_load(np.zeros(self.env['evNumber']),P,Q))
# For primary tranformers
for i in range(0,3):
temp = np.zeros(len(connected))
for j in range(0, len(connected)):
if phase_number[j]==i:
temp[j] = 1
if np.sum(temp)>0 or whole==1:
T.append(temp)
A.append(available[i])
U.append(i)
# For secondary transformers
for i in range(3, len(available)):
temp = np.zeros(len(connected))
for j in range(0, len(connected)):
if trans_number[j]==(i//3) and phase_number[j]==(i%3):
temp[j] = 1
if np.sum(temp)>0 or whole==1:
T.append(temp)
A.append(available[i])
U.append(i)
A = np.array(A)
#A = np.maximum(util.tol, A)
return (np.array(T), A, np.array(U))
def update(self, P, Q):
# Write codes for update in each time slot
return