forked from iandobbie/StatusLED
-
Notifications
You must be signed in to change notification settings - Fork 0
/
UDPSender.py
140 lines (102 loc) · 3.33 KB
/
UDPSender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""This module will emulate the UDP broadcasting of the FPGA
for testing purposes"""
import socket
from time import sleep, time
import json
MainFPGA_to_FSMachine_state = {
'0': 'default', # Default
'1': 'start', # Start
'2': 'configure', # Configuring
'3': 'idle', # Idle
'4': 'error', # Aborted
'5': 'action', # Running Action
'6': 'shutdown', # Shutdown
}
ActionFPGA_to_FSMachine_state = {
'0': 'default', # Default
'1': 'experiment', # Executing Experiment
'2': 'prepare', # Transferring Digitals
'3': 'prepare', # Transferring Analogues
'4': 'prepare', # Writing Indexes
'5': 'prepare', # Writing Digitals
'6': 'prepare', # Writing Analogue
'7': 'snap', # Taking Snap
'8': 'prepare', # Flushing FIFOs
'9': 'prepare', # Updating Repetitions
'10': 'mosaic', # Running Slow Mosaic
'11': 'mosaic', # Running Fast Mosaic
}
class Sender:
def __init__(self, ipAdress, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(1)
self.addr = (ipAdress, port)
self.msg = {'FPGA Main State': '0',
'Action State': '0',
'Timer': '0',
'Other Status Elements': 'WhatEver'}
def send_msg(self):
data = json.dumps(self.msg)
length = len(data)
length = str(length).rjust(4)
try:
self.sock.sendto(length.encode(), self.addr)
print('Sent:')
print(length)
except:
print('Could not send length')
self.sock.sendto(data.encode(), self.addr)
print('Sent:')
print(data)
def run_experiment(self, duration):
self.msg['FPGA Main State'] = '5'
self.msg['Action State'] = '1'
start_time = time()
end_time = start_time + duration
while time() <= end_time:
currentTime = str(round(((time() - start_time) / duration), 2))
self.msg['Timer'] = currentTime
self.send_msg()
sleep(0.2)
self.msg['FPGA Main State'] = '3'
self.msg['Action State'] = '0'
self.send_msg()
def run_snap(self):
self.msg['FPGA Main State'] = '5'
self.msg['Action State'] = '7'
self.send_msg()
self.msg['FPGA Main State'] = '3'
self.msg['Action State'] = '0'
self.send_msg()
def run_start(self):
self.msg['FPGA Main State'] = '1'
self.msg['Action State'] = '0'
self.send_msg()
sleep(2)
self.msg['FPGA Main State'] = '2'
self.msg['Action State'] = '0'
self.send_msg()
sleep(3)
self.msg['FPGA Main State'] = '3'
self.msg['Action State'] = '0'
self.send_msg()
sleep(5)
def run_abort(self):
self.msg['FPGA Main State'] = '4'
self.msg['Action State'] = '0'
self.send_msg()
def run_idle(self):
self.msg['FPGA Main State'] = '3'
self.msg['Action State'] = '0'
self.send_msg()
if __name__ == '__main__':
t = Sender(ipAdress='127.0.0.1', port=6666)
print('Tester created')
t.run_start()
print('Starting...')
sleep(4)
t.run_experiment(10)
t.run_idle()
sleep(5)
t.run_snap()
sleep(3)