-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMain.py
167 lines (132 loc) · 5.75 KB
/
Main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import MQTT_logger
import SMplatform as smp
import yaml, glob, os, time, sys, calendar, dotenv
import msvcrt
import threading
def getOldest(Directory):
for file in glob.glob(Directory + "\\*.csv"):
file_list.append(file)
if(file_list != []):
oldest_file = min(file_list, key=os.path.getctime)
print(oldest_file)
return os.path.splitext(os.path.basename(oldest_file))[0] + '.csv'
else:
print("No Files Present... Sleeping for 1 second...")
time.sleep(1)
return None
def background_MQTT_Logger_Start():
MQTT_logger.Start()
def ConnectionStatus_SMP(ConnectionStatus, config, Connector_Identifier, endpoint_url, header, verbose=False):
#QoD messaging?
print(f"executeing connection status ({ConnectionStatus}) mutation...")
tag_info = []
numberOfTopics = len(config['Topic_toSMP'])
for x in range(0, numberOfTopics):
FQ_Tag = f'''{Connector_Identifier}.{''.join(config['Topic_toSMP'][x])}'''
tag_info += [smp.findTagID_Create(Connector_Identifier, FQ_Tag, config['Topic_toSMP_dataType'][x],
endpoint_url, header, create = True, verbose = False)]
#TODO Need to check if this is proper ~~~~~~~~~~
for i in tag_info:
mutation = smp.build_UpdateTagTS_Mutation(i[1], "null", time="now", status=ConnectionStatus)
if(verbose==True):print(mutation)
result = smp.request(mutation, endpoint_url, header)
if 'errors' in result.keys():
raise Exception(result)
sys.exit()
#TODO error handling
#TODO~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
return tag_info
#read config from config.yml
try:
with open("config.yml", 'r') as data:
config = yaml.safe_load(data)
except Exception:
print("error importing config.yml")
exit()
#SMP header setup
dotenv.load_dotenv()
endpoint_url = os.environ.get("endpoint_url")
header = smp.SMP_auth()
print(endpoint_url)
print(header)
Connector_Identifier = "MQTT_Connector"
#File Organization
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
Source_DIR = f'{ROOT_DIR}\\MQTT_Logged\\'
Destination_DIR = f'{ROOT_DIR}\\uploaded\\'
#Initialize Variables
file_list=[]
Last_Run = False
Enable_Verbose = input('Verbose? (y/n)')
if(Enable_Verbose == 'y'):
Connector_Verbose = True
else:
Connector_Verbose = False
#Kick off the MQTT Logger
#TODO confirm MQTT_Logger is on/connected?
Enable_MQTT_Logger = input('Start MQTT Logger? (y/n)')
if(Enable_MQTT_Logger == 'y'):
mqttclientThread = threading.Thread(target=background_MQTT_Logger_Start)
mqttclientThread.daemon = True
mqttclientThread.start()
#UncertainInitialValue HEX 40920000 -> 1083310080
#tag_info = ConnectionStatus_SMP('1083310080', config, Connector_Identifier, endpoint_url, header)
#SMP doesnt deal with reconnection status
tag_info = []
while True:
#Check if this is the last run and Source Directory has any CSV files
if (Last_Run==True and any(".csv" in s for s in os.listdir(Source_DIR)) == False):
#grab last logged data
#TODO Search for any csv file instead of 'backlog.csv'???
file = 'backlog.csv'
if(os.path.exists(f'{ROOT_DIR}\\MQTT_Logging\\{file}')):
os.replace(f'{ROOT_DIR}\\MQTT_Logging\\{file}', f'{ROOT_DIR}\\pushing\\{file}')
else:
#Select Oldest File
file = getOldest(Source_DIR)
if(file != None): os.replace(f'{Source_DIR}{file}', f'{ROOT_DIR}\\pushing\\{file}')
current_file = f'{ROOT_DIR}\\pushing\\{file}'
if(os.path.exists(current_file) and file != None):
print("Pushing: " + str(file))
entries = smp.build_entries(config, current_file)
numberOfTopics = len(config['Topic_toSMP'])
for x in range(0, numberOfTopics):
FQ_Tag = f'''{Connector_Identifier}.{''.join(config['Topic_toSMP'][x])}'''
tag_info += [smp.findTagID_Create(Connector_Identifier, FQ_Tag, config['Topic_toSMP_dataType'][x],
endpoint_url, header, create = True, verbose = False)]
if(entries[x] != ''):
if(Connector_Verbose==True):print("these are the entries" + entries[x])
mutation = smp.build_UpdateMultipleTagTS_Mutation(tag_info[x][1], entries[x])
if(Connector_Verbose==True):print(mutation)
result = smp.request(mutation, endpoint_url, header) # Execute the mutation
#check for errors
if 'errors' in result.keys():
raise Exception(result)
sys.exit()
else:
print(f"nothing to push for: {FQ_Tag}")
#a little house cleaning
if file == 'backlog.csv':
timestamp_epoch = calendar.timegm(time.localtime())
file = f"backlog{timestamp_epoch}.csv"
os.rename(current_file,rf"{ROOT_DIR}\\pushing\\{file}")
os.rename(f'{ROOT_DIR}\\pushing\\{file}', f'{Destination_DIR}{file}')
file_list=[]
if Last_Run==True:
break
#if enter hit exit code
#note this only works on microsoft.... may need to rethink
if msvcrt.kbhit():
if msvcrt.getwche() == '\r':
print("Exiting Connecter")
Last_Run = True
if(Enable_MQTT_Logger == 'y'):
print('Stopping MQTT Logger')
MQTT_logger.Stop()
#TODO confirm MQTT_Logger is off? Sleep 0.5 as bandaid for now
time.sleep(0.5)
# I think this would be better: BadConnectionClosed HEX 80AE0000 -> 2158886912
# CESMII uses: BAD HEX 80000000 -> 2147483648
tag_info = ConnectionStatus_SMP('2147483648', config, Connector_Identifier, endpoint_url, header)
print('Exiting Program...')
sys.exit()