-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatasource_manager.py
149 lines (120 loc) · 5.6 KB
/
datasource_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import logging
import json
from ngsi_ld import broker_interface
from ngsi_ld import ngsi_parser
from ngsi_ld.ngsi_parser import NGSI_Type
from other.metadata_matcher_filebased import MetadataMatcher
logger = logging.getLogger('semanticenrichment')
class DatasourceManager:
def __init__(self):
self.subscriptions = {}
self.streams = {}
self.sensors = {}
self.observations = {}
self.observableproperties = {}
self.matcher = MetadataMatcher()
def delete_stream(self, stream_id):
if stream_id in self.streams:
stream = self.streams.pop(stream_id)
sensorId = ngsi_parser.get_stream_generatedBy(stream)
if sensorId in self.sensors:
sensor = self.sensors.pop(sensorId)
observationId = ngsi_parser.get_sensor_madeObservation(sensor)
observablePropertyId = ngsi_parser.get_sensor_observes(sensor)
if observationId in self.observations:
self.observations.pop(observationId)
#TODO dont delete observable property as might be used by multiple streams?
# if observablePropertyId in self.observableproperties:
# self.observableproperties.pop(observablePropertyId)
def clear(self):
#delete subscription
self.del_all_subscriptions()
self.streams.clear()
self.sensors.clear()
self.observations.clear()
self.observableproperties.clear()
def update(self, ngsi_type, ngsi_id, ngsi_data):
# check type
if ngsi_type is NGSI_Type.IoTStream:
# stream comes in if it is a new stream or stream has been updated
# Existing Stream: we have to check what is different... => we only have to check if there are new relations, new metadata will just be updated
# New Stream: GET related ObservableProperty, SUB to ObservableProperty and StreamObservation
# get sensor id first
sensorId = ngsi_parser.get_stream_generatedBy(ngsi_data)
#check if iotstream contains sensorId, if yes check sensor details
if sensorId:
if ngsi_id in self.streams: # existing stream
# check if sensorId changed
oldstream = self.streams[ngsi_id]
oldSensorId = ngsi_parser.get_stream_generatedBy(oldstream)
if sensorId != oldSensorId: # observable property has changed
# delete old sensor from dict
self.sensors.pop(oldSensorId, None)
# reqeuest new sensor (in new tread to avoid blocking) and subscribe to obsproperties and streamobservations
broker_interface.handleNewSensor(sensorId, self.sensors, self.observableproperties, self.subscriptions)
else:
logger.error("SensorId for IotStream missing:" + json.dumps(ngsi_data))
# finally just update the stream, metrics will request new metadata from store automatically
self.streams[ngsi_id] = ngsi_data
elif ngsi_type is NGSI_Type.StreamObservation:
self.observations[ngsi_id] = ngsi_data
elif ngsi_type is NGSI_Type.Sensor:
self.sensors[ngsi_id] = ngsi_data
elif ngsi_type is NGSI_Type.ObservableProperty:
self.observableproperties[ngsi_id] = ngsi_data
def get_sensor(self, sensor_id):
try:
return self.sensors[sensor_id]
except KeyError:
return None
def get_observation(self, observation_id):
try:
return self.observations[observation_id]
except KeyError:
return None
def get_observableproperty(self, observableproperty_id):
try:
return self.observableproperties[observableproperty_id]
except KeyError:
return None
def link_qoi(self, stream_id, qoi_id):
try:
stream = self.streams[stream_id]
ngsi_parser.update_stream_hasQuality(stream, qoi_id)
self.streams[stream_id] = stream
except KeyError:
pass
def initialise_subscriptions(self):
broker_interface.initialise_subscriptions(self.subscriptions)
def get_active_subscriptions(self):
broker_interface.get_active_subscriptions(self.subscriptions)
def add_subscription(self, subscription):
broker_interface.add_subscription(subscription, self.subscriptions)
def del_subscription(self, subid):
subscription = self.subscriptions.pop(subid)
broker_interface.del_subscription(subscription)
def del_all_subscriptions(self):
for subid in list(self.subscriptions.keys()):
self.del_subscription(subid)
def get_subscriptions(self):
return self.subscriptions
def get_stream(self, stream_id):
if stream_id in self.streams:
return self.streams[stream_id]
return None
def getStoredMetadata(self, sensor, field):
#first get observed property, TODO move this to initilisation?
# sensor = self.get_sensor()
observablePropertyId = ngsi_parser.get_sensor_observes(sensor)
if observablePropertyId:
observableProperty = self.get_observableproperty(observablePropertyId)
if observableProperty:
observedType = ngsi_parser.get_obsproperty_label(observableProperty)
if observedType:
metadata = self.matcher.match(observedType)
if metadata:
try:
return metadata[field]
except KeyError:
return None
return None