diff --git a/kill_containers.py b/kill_containers.py new file mode 100644 index 0000000..313f54f --- /dev/null +++ b/kill_containers.py @@ -0,0 +1,10 @@ +import logging +import time + +from gridappsd import GridAPPSD +from gridappsd.docker_handler import run_gridappsd_container, run_dependency_containers, Containers + +logging.basicConfig(level=logging.DEBUG) + +_log = logging.getLogger(__name__) +Containers.reset_all_containers(ignore_list=["gridappsd_dev"]) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 55592f0..a7879f7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ SPARQLWrapper -gridappsd-python +gridappsd-python>=1.0 diff --git a/run-sensor-in-thread-debug.py b/run-sensor-in-thread-debug.py new file mode 100644 index 0000000..97bc096 --- /dev/null +++ b/run-sensor-in-thread-debug.py @@ -0,0 +1,92 @@ +import logging +import os +from threading import Thread + +from gridappsd import GridAPPSD +from gridappsd.topics import simulation_output_topic, service_output_topic +from sensors import Sensors + +logging.basicConfig(level=logging.INFO, + format="%(asctime)-15s %(process)d %(module)-10s %(message)s") + +sim_request = {"power_system_config": { + "GeographicalRegion_name": "_73C512BD-7249-4F50-50DA-D93849B89C43", + "SubGeographicalRegion_name": "_A1170111-942A-6ABD-D325-C64886DC4D7D", + "Line_name": "_AAE94E4A-2465-6F5E-37B1-3E72183A4E44" +}, + "application_config": {"applications": []}, + "simulation_config": {"start_time": "1628531887", "duration": "900", "simulator": "GridLAB-D", + "timestep_frequency": "1000", "timestep_increment": "1000", "run_realtime": True, + "simulation_name": "final9500node", "power_flow_solver_method": "NR", + "model_creation_config": {"load_scaling_factor": "1", + "schedule_name": "ieeezipload", "z_fraction": "0", + "i_fraction": "1", "p_fraction": "0", + "randomize_zipload_fractions": False, + "use_houses": False}}, + "test_config": {"events": [], "appId": ""}, "service_configs": [{"id": "gridappsd-sensor-simulator", + "user_options": { + "default-perunit-confidence-band": 0.02, + "simulate-all": True, + "default-normal-value": 100, + "random-seed": 150, + "default-aggregation-interval": 6, + "passthrough-if-not-specified": False, + "default-perunit-drop-rate": 0.00, + "randomize-sensor-offset": False + #, + #"sensors-config": { + # "_b68f03f8-372e-41cd-87cd-1f7580e9669e": {} + #} + }}]} + +simulation_id = 1000 + + +def run_sensor_service(): + feeder = sim_request["power_system_config"]["Line_name"] + service_id = "gridappsd-sensor-simulator" + user_options = sim_request["service_configs"][0]["user_options"] + + gapp = GridAPPSD(username="system", + password="manager", + stomp_port=61613, + stomp_address="gridappsd") + + # gapp.get_logger().setLevel(opts.log_level) + read_topic = simulation_output_topic(simulation_id) + write_topic = service_output_topic(service_id, simulation_id) + + log_file = "/tmp/gridappsd_tmp/{}/sensors.log".format(simulation_id) + if not os.path.exists(os.path.dirname(log_file)): + os.makedirs(os.path.dirname(log_file)) + + import sys + from pprint import pprint + from sensors.measurements import Measurements + + meas = Measurements() + meta = meas.get_sensors_meta(feeder) + + with open(log_file, 'w') as fp: + logging.basicConfig(stream=fp, level=logging.INFO) + logging.getLogger().info("Almost ready to create sensors!") + logging.getLogger().info(f"read topic: {read_topic}\nwrite topic: {write_topic}") + logging.getLogger().info(f"user options: {user_options}") + logging.getLogger().debug(f"Meta: {meta}") + run_sensors = Sensors(gapp, read_topic=read_topic, write_topic=write_topic, + user_options=user_options, measurements=meta) + run_sensors.main_loop() + + +if __name__ == '__main__': + logging.basicConfig(level=logging.DEBUG) + + os.environ['GRIDAPPSD_APPLICATION_ID'] = 'gridappsd-sensor-simulator' + os.environ['GRIDAPPSD_APPLICATION_STATUS'] = 'STARTED' + os.environ['GRIDAPPSD_SIMULATION_ID'] = str(simulation_id) + os.environ['GRIDAPPSD_USER'] = 'system' + os.environ['GRIDAPPSD_PASSWORD'] = 'manager' + + t = Thread(target=run_sensor_service) + + t.run() diff --git a/sensor_simulator.py b/sensor_simulator.py index 97ecaba..c70203a 100644 --- a/sensor_simulator.py +++ b/sensor_simulator.py @@ -34,9 +34,9 @@ def get_opts(): # parser.add_argument("--interval", type=float, default=30.0, # help="Interval in seconds for min, max, average aggregation.") - parser.add_argument("-u", "--username", default=utils.get_gridappsd_user(), + parser.add_argument("-u", "--username", help="The username to authenticate with the message bus.") - parser.add_argument("-p", "--password", default=utils.get_gridappsd_pass(), + parser.add_argument("-p", "--password", help="The password to authenticate with the message bus.") parser.add_argument("-a", "--address", default=utils.get_gridappsd_address(), help="The tcp://addr:port that gridappsd is located on.") diff --git a/sensors/sensor.py b/sensors/sensor.py index fa1bda0..67738e9 100644 --- a/sensors/sensor.py +++ b/sensors/sensor.py @@ -4,6 +4,7 @@ import sys import random import time +import traceback from .measurements import Measurements @@ -111,7 +112,7 @@ def __init__(self, gridappsd, read_topic, write_topic, measurements, user_option DEFAULT_SENSOR_CONFIG['default-aggregation-interval']) self.default_normal_value = user_options.get('default-normal-value', DEFAULT_SENSOR_CONFIG['default-normal-value']) - self._measurements = deepcopy(measurements) # Measurements() + self._measurements = deepcopy(measurements) # Measurements() if self.simulate_all: _log.debug("Using passed 'measurements' as configurations") sensors_config = self._measurements # self._measurements.get_sensors_config(feeder) @@ -195,8 +196,10 @@ def on_simulation_message(self, headers, message): :param message: Simulation measurement message. """ + + _log.info(f"starting on_simulation_message: {len(message['message']['measurements'])} num sensors: {len(self._sensors)}") measurement_out = {} - # if passthrough set then copy over the measurmments of the entire message + # if pass-through set then copy over the measurements of the entire message # into the output, then we will update the ones that are specified as # sensors input. if self.passthrough_if_not_specified: @@ -206,12 +209,15 @@ def on_simulation_message(self, headers, message): # _log.info(f"Detected measurement at timestamp {timestamp}") try: - # Loop over the configured sensor andding measurements for each of them + # Loop over the configured sensor adding measurements for each of them + i = 0 for mrid in self._sensors: + i += 1 new_measurement = dict( measurement_mrid=mrid ) + # Get the passed sensor values from from the original measurement. item = message['message']['measurements'].get(mrid) if not item: @@ -220,7 +226,6 @@ def on_simulation_message(self, headers, message): self._reported_invalid.add(mrid) continue - debug_set = set() # Create new values for data from the sensor. for prop, value in item.items(): # transfer mrid to new_measurement dictionary @@ -230,13 +235,10 @@ def on_simulation_message(self, headers, message): new_value = None sensor = self._sensors[mrid] - if not mrid in debug_set: - _log.info(f"prop: {prop} ts: {timestamp} interval start: " - f"{sensor.interval_start_time} interval end: {sensor.interval_end_time} " - f"left: {sensor.interval_end_time - timestamp} {mrid}") - debug_set.add(mrid) - + + # Only provide simulation for angle and magnitude for the sensors. if prop in ('angle', 'magnitude'): + # look up the sub-sensor for the property angle or magnitude. sensor_prop = sensor.get_property_sensor(prop) if sensor_prop is None: # sensor is normally 0 @@ -245,22 +247,20 @@ def on_simulation_message(self, headers, message): sensor_prop = sensor.get_property_sensor(prop) new_value = sensor_prop.get_new_value(timestamp, value) else: - # Keep values other than angle and magnitued the same for now. + # Keep values other than angle and magnitude the same for now. new_measurement[prop] = value if new_value is None: continue - _log.debug(f"mrid: {mrid} timestamp: {timestamp} prop: {prop} new_value: {new_value}") new_measurement[prop] = new_value # Make sure there is more properties than just the mrid if len(list(new_measurement.keys())) > 1: + # _log.info(f"Adding measurement: {new_measurement}") measurement_out[mrid] = new_measurement if len(measurement_out) > 0: message['message']['measurements'] = measurement_out - if self._log_statistics: - self._log_sensors() self._gappsd.send(self._write_topic, message) _log.info(f"For timestamp: {timestamp} publishing service {len(measurement_out)} measurements") else: @@ -272,22 +272,19 @@ def on_simulation_message(self, headers, message): _log.error(stack) except Exception as ex2: _log.error(ex2) - - def _log_sensors(self): - for s in self._sensors: - _log.debug(s) - self._logger.debug(s) + _log.info(f"ending on_simulation_message measurement_out {len(measurement_out)}") def main_loop(self): + print(f"Read topic {self._read_topic}") self._gappsd.subscribe(self._read_topic, self.on_simulation_message) while True and not self._simulation_complete: - time.sleep(0.001) + time.sleep(0.01) class Sensor(object): def __init__(self, normal_value, aggregation_interval, perunit_drop_rate, - perunit_confidence_band, parent=None, randomize_offset=True): + perunit_confidence_band, parent: "Sensor" = None, randomize_offset=True): """ An object modeling an individual sensor. @@ -323,7 +320,10 @@ def __init__(self, normal_value, aggregation_interval, perunit_drop_rate, # A secondary list of sensors self._properties = {} self._offset = 0 - self._randomize_offset = randomize_offset + if parent: + self._randomize_offset = parent._randomize_offset + else: + self._randomize_offset = randomize_offset self._LOG.debug(self) @@ -333,7 +333,8 @@ def add_property_sensor(self, key, normal_value, aggregation_interval, perunit_d if key in self._properties: raise KeyError(f"key {key} already exists in the sensor properties") - self._properties[key] = Sensor(normal_value, aggregation_interval, perunit_drop_rate, perunit_confidence_band, self) + self._properties[key] = Sensor(normal_value, aggregation_interval, perunit_drop_rate, perunit_confidence_band, + parent=self) def get_property_sensor(self, key): if key == 'magnitude': diff --git a/start_containers.py b/start_containers.py new file mode 100644 index 0000000..a214577 --- /dev/null +++ b/start_containers.py @@ -0,0 +1,22 @@ +import logging +import time + +from gridappsd import GridAPPSD +from gridappsd.docker_handler import run_gridappsd_container, run_dependency_containers, Containers + +logging.basicConfig(level=logging.DEBUG) + +_log = logging.getLogger(__name__) +Containers.reset_all_containers(ignore_list=["mysql", "proven", "influxdb", "redis", "blazegraph", "mysql", + "gridappsd_dev"]) + +with run_dependency_containers(stop_after=False): + with run_gridappsd_container(stop_after=False): + goss = GridAPPSD() + goss.connect() + assert goss.connected + goss.disconnect() + +_log.debug("After with statements!") +_log.debug(f"Containers are {Containers.container_list()}") +