Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions kill_containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import logging
import time

from gridappsd import GridAPPSD
from gridappsd.docker_handler import run_gridappsd_container, run_dependency_containers, Containers

logging.basicConfig(level=logging.DEBUG)

_log = logging.getLogger(__name__)
Containers.reset_all_containers(ignore_list=["gridappsd_dev"])
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
SPARQLWrapper
gridappsd-python
gridappsd-python>=1.0
92 changes: 92 additions & 0 deletions run-sensor-in-thread-debug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import logging
import os
from threading import Thread

from gridappsd import GridAPPSD
from gridappsd.topics import simulation_output_topic, service_output_topic
from sensors import Sensors

logging.basicConfig(level=logging.INFO,
format="%(asctime)-15s %(process)d %(module)-10s %(message)s")

sim_request = {"power_system_config": {
"GeographicalRegion_name": "_73C512BD-7249-4F50-50DA-D93849B89C43",
"SubGeographicalRegion_name": "_A1170111-942A-6ABD-D325-C64886DC4D7D",
"Line_name": "_AAE94E4A-2465-6F5E-37B1-3E72183A4E44"
},
"application_config": {"applications": []},
"simulation_config": {"start_time": "1628531887", "duration": "900", "simulator": "GridLAB-D",
"timestep_frequency": "1000", "timestep_increment": "1000", "run_realtime": True,
"simulation_name": "final9500node", "power_flow_solver_method": "NR",
"model_creation_config": {"load_scaling_factor": "1",
"schedule_name": "ieeezipload", "z_fraction": "0",
"i_fraction": "1", "p_fraction": "0",
"randomize_zipload_fractions": False,
"use_houses": False}},
"test_config": {"events": [], "appId": ""}, "service_configs": [{"id": "gridappsd-sensor-simulator",
"user_options": {
"default-perunit-confidence-band": 0.02,
"simulate-all": True,
"default-normal-value": 100,
"random-seed": 150,
"default-aggregation-interval": 6,
"passthrough-if-not-specified": False,
"default-perunit-drop-rate": 0.00,
"randomize-sensor-offset": False
#,
#"sensors-config": {
# "_b68f03f8-372e-41cd-87cd-1f7580e9669e": {}
#}
}}]}

simulation_id = 1000


def run_sensor_service():
feeder = sim_request["power_system_config"]["Line_name"]
service_id = "gridappsd-sensor-simulator"
user_options = sim_request["service_configs"][0]["user_options"]

gapp = GridAPPSD(username="system",
password="manager",
stomp_port=61613,
stomp_address="gridappsd")

# gapp.get_logger().setLevel(opts.log_level)
read_topic = simulation_output_topic(simulation_id)
write_topic = service_output_topic(service_id, simulation_id)

log_file = "/tmp/gridappsd_tmp/{}/sensors.log".format(simulation_id)
if not os.path.exists(os.path.dirname(log_file)):
os.makedirs(os.path.dirname(log_file))

import sys
from pprint import pprint
from sensors.measurements import Measurements

meas = Measurements()
meta = meas.get_sensors_meta(feeder)

with open(log_file, 'w') as fp:
logging.basicConfig(stream=fp, level=logging.INFO)
logging.getLogger().info("Almost ready to create sensors!")
logging.getLogger().info(f"read topic: {read_topic}\nwrite topic: {write_topic}")
logging.getLogger().info(f"user options: {user_options}")
logging.getLogger().debug(f"Meta: {meta}")
run_sensors = Sensors(gapp, read_topic=read_topic, write_topic=write_topic,
user_options=user_options, measurements=meta)
run_sensors.main_loop()


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)

os.environ['GRIDAPPSD_APPLICATION_ID'] = 'gridappsd-sensor-simulator'
os.environ['GRIDAPPSD_APPLICATION_STATUS'] = 'STARTED'
os.environ['GRIDAPPSD_SIMULATION_ID'] = str(simulation_id)
os.environ['GRIDAPPSD_USER'] = 'system'
os.environ['GRIDAPPSD_PASSWORD'] = 'manager'

t = Thread(target=run_sensor_service)

t.run()
4 changes: 2 additions & 2 deletions sensor_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ def get_opts():
# parser.add_argument("--interval", type=float, default=30.0,
# help="Interval in seconds for min, max, average aggregation.")

parser.add_argument("-u", "--username", default=utils.get_gridappsd_user(),
parser.add_argument("-u", "--username",
help="The username to authenticate with the message bus.")
parser.add_argument("-p", "--password", default=utils.get_gridappsd_pass(),
parser.add_argument("-p", "--password",
help="The password to authenticate with the message bus.")
parser.add_argument("-a", "--address", default=utils.get_gridappsd_address(),
help="The tcp://addr:port that gridappsd is located on.")
Expand Down
47 changes: 24 additions & 23 deletions sensors/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
import random
import time
import traceback

from .measurements import Measurements

Expand Down Expand Up @@ -111,7 +112,7 @@ def __init__(self, gridappsd, read_topic, write_topic, measurements, user_option
DEFAULT_SENSOR_CONFIG['default-aggregation-interval'])
self.default_normal_value = user_options.get('default-normal-value',
DEFAULT_SENSOR_CONFIG['default-normal-value'])
self._measurements = deepcopy(measurements) # Measurements()
self._measurements = deepcopy(measurements) # Measurements()
if self.simulate_all:
_log.debug("Using passed 'measurements' as configurations")
sensors_config = self._measurements # self._measurements.get_sensors_config(feeder)
Expand Down Expand Up @@ -195,8 +196,10 @@ def on_simulation_message(self, headers, message):
:param message:
Simulation measurement message.
"""

_log.info(f"starting on_simulation_message: {len(message['message']['measurements'])} num sensors: {len(self._sensors)}")
measurement_out = {}
# if passthrough set then copy over the measurmments of the entire message
# if pass-through set then copy over the measurements of the entire message
# into the output, then we will update the ones that are specified as
# sensors input.
if self.passthrough_if_not_specified:
Expand All @@ -206,12 +209,15 @@ def on_simulation_message(self, headers, message):
# _log.info(f"Detected measurement at timestamp {timestamp}")
try:

# Loop over the configured sensor andding measurements for each of them
# Loop over the configured sensor adding measurements for each of them
i = 0
for mrid in self._sensors:
i += 1
new_measurement = dict(
measurement_mrid=mrid
)

# Get the passed sensor values from from the original measurement.
item = message['message']['measurements'].get(mrid)

if not item:
Expand All @@ -220,7 +226,6 @@ def on_simulation_message(self, headers, message):
self._reported_invalid.add(mrid)
continue

debug_set = set()
# Create new values for data from the sensor.
for prop, value in item.items():
# transfer mrid to new_measurement dictionary
Expand All @@ -230,13 +235,10 @@ def on_simulation_message(self, headers, message):
new_value = None

sensor = self._sensors[mrid]
if not mrid in debug_set:
_log.info(f"prop: {prop} ts: {timestamp} interval start: "
f"{sensor.interval_start_time} interval end: {sensor.interval_end_time} "
f"left: {sensor.interval_end_time - timestamp} {mrid}")
debug_set.add(mrid)


# Only provide simulation for angle and magnitude for the sensors.
if prop in ('angle', 'magnitude'):
# look up the sub-sensor for the property angle or magnitude.
sensor_prop = sensor.get_property_sensor(prop)
if sensor_prop is None:
# sensor is normally 0
Expand All @@ -245,22 +247,20 @@ def on_simulation_message(self, headers, message):
sensor_prop = sensor.get_property_sensor(prop)
new_value = sensor_prop.get_new_value(timestamp, value)
else:
# Keep values other than angle and magnitued the same for now.
# Keep values other than angle and magnitude the same for now.
new_measurement[prop] = value

if new_value is None:
continue

_log.debug(f"mrid: {mrid} timestamp: {timestamp} prop: {prop} new_value: {new_value}")
new_measurement[prop] = new_value

# Make sure there is more properties than just the mrid
if len(list(new_measurement.keys())) > 1:
# _log.info(f"Adding measurement: {new_measurement}")
measurement_out[mrid] = new_measurement
if len(measurement_out) > 0:
message['message']['measurements'] = measurement_out
if self._log_statistics:
self._log_sensors()
self._gappsd.send(self._write_topic, message)
_log.info(f"For timestamp: {timestamp} publishing service {len(measurement_out)} measurements")
else:
Expand All @@ -272,22 +272,19 @@ def on_simulation_message(self, headers, message):
_log.error(stack)
except Exception as ex2:
_log.error(ex2)

def _log_sensors(self):
for s in self._sensors:
_log.debug(s)
self._logger.debug(s)
_log.info(f"ending on_simulation_message measurement_out {len(measurement_out)}")

def main_loop(self):
print(f"Read topic {self._read_topic}")
self._gappsd.subscribe(self._read_topic, self.on_simulation_message)

while True and not self._simulation_complete:
time.sleep(0.001)
time.sleep(0.01)


class Sensor(object):
def __init__(self, normal_value, aggregation_interval, perunit_drop_rate,
perunit_confidence_band, parent=None, randomize_offset=True):
perunit_confidence_band, parent: "Sensor" = None, randomize_offset=True):
"""
An object modeling an individual sensor.

Expand Down Expand Up @@ -323,7 +320,10 @@ def __init__(self, normal_value, aggregation_interval, perunit_drop_rate,
# A secondary list of sensors
self._properties = {}
self._offset = 0
self._randomize_offset = randomize_offset
if parent:
self._randomize_offset = parent._randomize_offset
else:
self._randomize_offset = randomize_offset

self._LOG.debug(self)

Expand All @@ -333,7 +333,8 @@ def add_property_sensor(self, key, normal_value, aggregation_interval, perunit_d
if key in self._properties:
raise KeyError(f"key {key} already exists in the sensor properties")

self._properties[key] = Sensor(normal_value, aggregation_interval, perunit_drop_rate, perunit_confidence_band, self)
self._properties[key] = Sensor(normal_value, aggregation_interval, perunit_drop_rate, perunit_confidence_band,
parent=self)

def get_property_sensor(self, key):
if key == 'magnitude':
Expand Down
22 changes: 22 additions & 0 deletions start_containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import logging
import time

from gridappsd import GridAPPSD
from gridappsd.docker_handler import run_gridappsd_container, run_dependency_containers, Containers

logging.basicConfig(level=logging.DEBUG)

_log = logging.getLogger(__name__)
Containers.reset_all_containers(ignore_list=["mysql", "proven", "influxdb", "redis", "blazegraph", "mysql",
"gridappsd_dev"])

with run_dependency_containers(stop_after=False):
with run_gridappsd_container(stop_after=False):
goss = GridAPPSD()
goss.connect()
assert goss.connected
goss.disconnect()

_log.debug("After with statements!")
_log.debug(f"Containers are {Containers.container_list()}")