-
Notifications
You must be signed in to change notification settings - Fork 5
/
GS_Controller.py
450 lines (354 loc) · 15.6 KB
/
GS_Controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
'''
LEW-20210-1, Python Ground Station for a Core Flight System with CCSDS Electronic Data Sheets Support
Copyright (c) 2020 United States Government as represented by
the Administrator of the National Aeronautics and Space Administration.
All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
'''
cFS-Groundstation Controller:
The controller handles the business logic of the cFS-Groundstation.
The overall functionality includes:
- Initializes the EDS database objects and interfaces
- Telemetry Listener that receives and decodes EDS telemetry messages automatically
- Command generator that creates and sends command packets to core flight instances
'''
import socket
import time
from PyQt5.QtCore import QThread
from PyQt5.QtCore import pyqtSignal
import GS_Model
import EdsLib
import CFE_MissionLib
class TlmListener(QThread):
'''
This QThread based class is spawned when the "Start Listening" button is pressed
in the Telemetry system. This will listen for messages on a given port, decode them,
and send the raw messages to the data model for local storage.
'''
signal = pyqtSignal(str, str)
def __init__(self, port):
super().__init__()
self.continue_listening = True
self.port = port
self.mission = control.mission
self.intf_db = control.intf_db
def DecodeMessage(self, raw_message):
'''
Decodes a raw bytes message into an EDS object
Inputs:
raw_message - received bytes message
Outputs:
topic_id - The Telemetry TopicId associated with the raw_message
eds_entry - The EDS function to create the associated telemetry object
eds_object - The decoded EDS object
'''
eds_id, topic_id = self.intf_db.DecodeEdsId(raw_message)
eds_entry = EdsLib.DatabaseEntry(self.mission, eds_id)
eds_object = eds_entry(EdsLib.PackedObject(raw_message))
return (topic_id, eds_entry, eds_object)
def PauseListening(self):
'''
Sets the flag associated with the telemetry listening to false
'''
self.continue_listening = False
def RestartListening(self):
'''
Sets the flag associated with the telemetry listening to true
'''
self.continue_listening = True
def run(self):
'''
Method that is run when QThread spawns a new thread.
This opens up a port to listen on. When messages are received,
they are decoded and sent to the data model for storage.
'''
# Init udp socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', self.port))
while True:
if self.continue_listening:
try:
# Receive message
datagram, host = sock.recvfrom(4096) # buffer size is 1024 bytes
# Ignore datagram if not long enough (i.e. doesn't contain a tlm header)
if len(datagram) < 6:
continue
decode_output = self.DecodeMessage(datagram)
tlm_type, message = GS_Model.data.AddTlm(control.eds_db, host, datagram, decode_output)
if tlm_type is None:
self.signal.emit('', message)
else:
self.signal.emit(tlm_type, message)
# Handle socket errors
except socket.error:
print("Ignored socket error.")
time.sleep(1)
else:
time.sleep(1)
def ValidPayload(payload_entry, payload_value):
'''
Checks to see if a given payload value is valid based on the payload_entry function
Inputs:
payload_entry - EDS function to create the object that is filled by payload_value
payload_value - The user input value to be checked if an EDS object can be created
Outputs:
Boolean value of the payload_value validity
'''
try:
object_test = payload_entry[1](payload_value)
return True
except TypeError:
return False
class Controller(object):
'''
The Controller class contains routines that interact with the EDS and MissionLib databases
- Update Instance, Topic, and Subcommand dictionaries/lists
- Generate, set, pack, and send telecommand messages
'''
def __init__(self):
self.initialized = False
self.mission = None
self.eds_db = None
self.intf_db = None
self.telecommand = None
self.telemetry = None
self.cmd_entry = None
self.cmd = None
self.payload_entry = None
self.payload = None
self.payload_struct = None
self.payload_values = None
def InitializeDatabases(self, mission):
'''
Initialize the EDS and MissionLib databases as well as useful Interfaces
and associated lists
Inputs:
mission - mission name
'''
if not self.initialized:
try:
# If the mission name is invlaid a RuntimeError will occur here
self.eds_db = EdsLib.Database(mission)
# Set the mission name and the rest of the CFE_MissionLib objects
self.mission = mission
self.intf_db = CFE_MissionLib.Database(self.mission, self.eds_db)
self.telecommand = self.intf_db.Interface('CFE_SB/Telecommand')
self.telemetry = self.intf_db.Interface('CFE_SB/Telemetry')
# Call Data Model initialization function
GS_Model.data.InitializeLists()
self.initialized = True
return True
except RuntimeError:
return False
else:
return True
def GetInstances(self):
'''
Returns the instance dictionary based on the instances in the interface database
'''
instance_list_dict = {GS_Model.data.instance_chooser : 0}
for instance in self.intf_db:
instance_list_dict[instance[0]] = instance[1]
return instance_list_dict
def GetTelecommandTopics(self):
'''
Returns a dictionary of Telecommand topics
'''
topic_list_dict = {GS_Model.data.topic_chooser : 0}
for topic in self.telecommand:
topic_list_dict[topic[0]] = topic[1]
return topic_list_dict
def GetTelemetryTopics(self):
'''
Returns a dictionary of Telemetry topics
'''
topic_list_dict = {GS_Model.data.topic_chooser : 0}
for topic in self.telemetry:
topic_list_dict[topic[0]] = topic[1]
return topic_list_dict
def GetSubcommands(self, input_topic):
'''
Returns a dictionary of Subcommands based on a given telecommand topic
Inputs:
input_topic - User specified telecommand topic name
'''
subcommand_list_dict = {GS_Model.data.subcommand_chooser : 0}
try:
topic = self.telecommand.Topic(input_topic)
for subcommand in topic:
subcommand_list_dict[subcommand[0]] = subcommand[1]
except RuntimeError:
pass
return subcommand_list_dict
def GetEdsIdFromTopic(self, topic_name):
'''
Returns the EdsId associated with a given topic name
Inputs:
topic_name - Telecommand topic name
'''
topic = self.telecommand.Topic(topic_name)
return topic.EdsId
def GetPayloadStruct(self, base_entry, base_object, base_name):
'''
Recursive function that goes through an EDS object structure (arrays and structs)
To get down to the fundamental objects (ints, strings, and enumerations).
Inputs:
base_entry - EDS fucntion to create the base_object
base_object - EDS Object that is iterated over to find the structure
base_name - Name used in the recursion to get the full name of a fundamental object
Outputs:
EDS Object data structure
'''
struct = {}
# Arrays
if (self.eds_db.IsArray(base_object)):
# Get the type of an array element
array_type_split = str(type(base_object[0])).split("'")
array_entry = EdsLib.DatabaseEntry(array_type_split[1], array_type_split[3])
array_object = array_entry()
# Loop over all the aray elements
struct = []
struct_name = base_name + array_entry.Name
for i in range(len(base_object)):
struct_name = f"{base_name}[{i}]"
array_struct = self.GetPayloadStruct(array_entry, array_object, struct_name)
struct.append(array_struct)
# Containers
elif (self.eds_db.IsContainer(base_object)):
# Iterate over the subobjects within the container
for subobj in base_object:
for subentry in base_entry:
if subobj[0] == subentry[0]:
entry_eds = EdsLib.DatabaseEntry(subentry[1], subentry[2])
struct_name = f"{base_name}.{subobj[0]}"
struct[subobj[0]] = self.GetPayloadStruct(entry_eds, subobj[1], struct_name)
# Enumeration
elif (self.eds_db.IsEnum(base_entry)):
struct = ()
enum_dict = {}
# Iterate over the Enumeration labels
for enum in base_entry:
enum_dict[enum[0]] = enum[1]
struct = (base_name, base_entry, 'enum', enum_dict)
# Anything left over uses an entry field
else:
struct = (base_name, base_entry, 'entry', None)
return struct
def GetPayload(self, eds_id):
'''
From a given eds_id this checks if a payload is present. If so, payload structure
fuctions are called which are used in the Viewer to get all the appropriate user
input widgets as well as creating the Payload structure itself.
Inputs:
eds_id - The EdsId associated with the command object
Outputs:
The payload structure of the EDS object associated with the input EdsId
'''
self.cmd_entry = EdsLib.DatabaseEntry(self.mission, eds_id)
self.cmd = self.cmd_entry()
payload_item = None
for item in self.cmd_entry:
if item[0] == 'Payload':
payload_item = item
if payload_item is not None:
self.payload_entry = EdsLib.DatabaseEntry(payload_item[1], payload_item[2])
self.payload = self.payload_entry()
self.payload_struct = self.GetPayloadStruct(self.payload_entry, self.payload,
payload_item[0])
else:
self.payload_entry = None
self.payload = None
self.payload_struct = None
return self.payload_struct
def SetPubSub(self, instance_id, topic_id):
'''
Sets the Publisher/Subscribe parameters in a command header based on the instance_id
and topic_id. We call the function SetPubSub defined in the CFE_Missionlib
python bindings that set the header values of the cmd message based on the
CFE_MissionLib runtime library.
Inputs:
instance_id - The ID associated with the desitination core flight instance
topic_id - The ID associated with the desired telecommand topic
'''
self.intf_db.SetPubSub(instance_id, topic_id, self.cmd)
self.cmd.CCSDS.SeqFlag = 3 # SeqFlag is hardcoded to 3 in cmdUtil.c
def SetPayloadValues(self, structure):
'''
Traverses through the payload structure found in GetPayloadStruct to create a payload
structure and fill it with the user supplied values.
Inputs:
Structure - The payload structure returned from GetPayloadStruct
Outputs:
Python structure that can be used to fill the Payload of the EDS Command object
'''
if isinstance(structure, dict):
result = {}
for item in list(structure.keys()):
result[item] = self.SetPayloadValues(structure[item])
elif isinstance(structure, list):
result = []
for item in structure:
result.append(self.SetPayloadValues(item))
elif isinstance(structure, tuple):
result = structure[1](self.payload_values[structure[0]])
else:
print("Something went wrong in the SetPayloadValues function")
result = None
return result
def SendCommand(self, ip_address, base_port, instance_name, topic_name, subcommand_name, payload_values):
'''
Sends a command message to an instance of core flight
- Checks to make sure all required parameters are set
- Creates the EDS command object and sets the necessary header parameters
- Generates and sets the payload values (if necessary)
- opens up a socket and sends the packed message
Inputs:
ip_address - The destination IP Address
base_port - The base port used to send the command
instance_name - The name of the core flight instance to send the command message
topic_name - The name of the Telecommand topic to send
subcommand_name - The name of the subcommand to the telecommand topic
payload_values - list of user supplied payload values
Outputs:
A packed bytes message sent via UDP to an instance of core flight
Tuple that contains:
A flag if the message was successful
A hex representation of the command message that was sent
A timestamp of when the message was sent
The port the command message was sent to
'''
if instance_name == GS_Model.data.instance_chooser:
return(False, "Please Choose an Instance")
if topic_name == GS_Model.data.topic_chooser:
return(False, "Please Choose a Topic")
if (subcommand_name == GS_Model.data.subcommand_chooser and
len(GS_Model.data.subcommand_keys) > 1):
return(False, "Please Choose a Subcommand")
instance_id = GS_Model.data.instance_dict[instance_name]
topic_id = GS_Model.data.telecommand_topic_dict[topic_name]
self.cmd = self.cmd_entry()
self.SetPubSub(instance_id, topic_id)
self.payload_values = payload_values
if len(self.payload_values) != 0:
eds_payload = self.SetPayloadValues(self.payload_struct)
self.payload = self.payload_entry(eds_payload)
self.cmd['Payload'] = self.payload
cmd_packed = EdsLib.PackedObject(self.cmd)
port = base_port + instance_id - 1
try:
opened_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
opened_socket.sendto(bytes(cmd_packed), (ip_address, port))
time_str = time.strftime("%Y-%m-%d__%H_%M_%S", time.gmtime())
return(True, cmd_packed.hex(), time_str, port)
except socket.error:
return(False, "Failed to send message.")
control = Controller()