From 309ebc0aa612e20c89969700d81166a3322e6a2c Mon Sep 17 00:00:00 2001 From: "C. Allwardt" <3979063+craig8@users.noreply.github.com> Date: Thu, 27 Mar 2025 15:43:42 -0700 Subject: [PATCH 1/5] fixed connection issues I found --- gridappsd-python-lib/gridappsd/goss.py | 24 +++++++++++++++++++-- gridappsd-python-lib/gridappsd/gridappsd.py | 9 +++++--- 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/gridappsd-python-lib/gridappsd/goss.py b/gridappsd-python-lib/gridappsd/goss.py index c5d3683..75ad64c 100644 --- a/gridappsd-python-lib/gridappsd/goss.py +++ b/gridappsd-python-lib/gridappsd/goss.py @@ -58,6 +58,10 @@ from stomp import Connection12 as Connection from stomp.exception import NotConnectedException from time import sleep +try: # python2.7 + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse from gridappsd import json_extension as json @@ -119,6 +123,22 @@ def __init__(self, if not self.__user__ or not self.__pass__: raise ValueError("Invalid username/password specified.") + + if self.stomp_address is None or self.stomp_address == '': + raise ValueError("Invalid stomp address specified.") + + if not self.stomp_address.startswith("tcp://"): + self.stomp_address = "tcp://" + self.stomp_address + + parsed_address = urlparse(self.stomp_address) + + if self.stomp_port is None or self.stomp_port == '': + if parsed_address.port is not None: + self.stomp_port = parsed_address.port + else: + raise ValueError("Invalid stomp port specified.") + + self._connection_tuple = [(parsed_address.hostname, int(self.stomp_port))] self._conn = None self._ids = set() self._topic_set = set() @@ -306,7 +326,7 @@ def _make_connection(self): # send request to token topic tokenTopic = "/topic/pnnl.goss.token.topic" - tmpConn = Connection([(self.stomp_address, self.stomp_port)]) + tmpConn = Connection([*self._connection_tuple]) if self._override_thread_fc is not None: tmpConn.transport.override_threading(self._override_thread_fc) tmpConn.connect(self.__user__, self.__pass__, wait=True) @@ -349,7 +369,7 @@ def on_disconnect(self, header, message): sleep(1) iter += 1 - self._conn = Connection([(self.stomp_address, self.stomp_port)]) + self._conn = Connection([*self._connection_tuple]) if self._override_thread_fc is not None: self._conn.transport.override_threading(self._override_thread_fc) try: diff --git a/gridappsd-python-lib/gridappsd/gridappsd.py b/gridappsd-python-lib/gridappsd/gridappsd.py index 11b81d3..9f02aa7 100644 --- a/gridappsd-python-lib/gridappsd/gridappsd.py +++ b/gridappsd-python-lib/gridappsd/gridappsd.py @@ -69,8 +69,6 @@ class GridAPPSD(GOSS): # TODO Get the caller from the traceback/inspect module. def __init__(self, simulation_id=None, address=None, **kwargs): - if address is None: - address = utils.get_gridappsd_address() if 'stomp_address' in kwargs and 'stomp_port' in kwargs: address = (kwargs.pop('stomp_address'), kwargs.pop('stomp_port')) @@ -78,7 +76,12 @@ def __init__(self, simulation_id=None, address=None, **kwargs): 'stomp_port' in kwargs and not 'stomp_address' in kwargs: raise ValueError("If stomp_address is specified the so should stomp_port") - super(GridAPPSD, self).__init__(stomp_address=address[0], stomp_port=address[1], **kwargs) + if address is None: + super().__init__(**kwargs) + elif isinstance(address, tuple) or isinstance(address, list) and len(address) == 2: + super().__init__(stomp_address=address[0], stomp_port=address[1], **kwargs) + else: + raise ValueError("address must be a tuple of (hostname, port) or None") self._houses = Houses(self) self._simulation_log_topic = None self._simulation_id = None From d25f1ab91626ac343ef73e678966a6ba618e3491 Mon Sep 17 00:00:00 2001 From: "C. Allwardt" <3979063+craig8@users.noreply.github.com> Date: Thu, 27 Mar 2025 16:20:48 -0700 Subject: [PATCH 2/5] remove params from gridapspdconnection --- .../gridappsd_field_bus/field_interface/agents/agents.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py index 0bbd295..38b87fa 100644 --- a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py @@ -62,9 +62,9 @@ def __init__(self, self.context = None # TODO: Change params and connection to local connection - self.params = ConnectionParameters(cim_profile=CIM_PROFILE, iec61970_301=IEC61970_301) + #self.params = ConnectionParameters(cim_profile=CIM_PROFILE, iec61970_301=IEC61970_301) - self.connection = GridappsdConnection(self.params) + self.connection = GridappsdConnection() self.connection.cim_profile = cim_profile self.app_id = agent_config['app_id'] From cad46b2aef55e17a05663eb2d036ff8b5179f677 Mon Sep 17 00:00:00 2001 From: "C. Allwardt" <3979063+craig8@users.noreply.github.com> Date: Thu, 27 Mar 2025 16:22:19 -0700 Subject: [PATCH 3/5] remove connectionparameters --- .../gridappsd_field_bus/field_interface/agents/agents.py | 1 - 1 file changed, 1 deletion(-) diff --git a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py index 38b87fa..e31d900 100644 --- a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py @@ -6,7 +6,6 @@ from datetime import datetime from typing import Dict -from cimgraph.databases import ConnectionParameters from cimgraph.databases.gridappsd import GridappsdConnection from cimgraph.models import FeederModel from cimgraph.models.distributed_area import DistributedArea From 2b38039b6f2b585c5d219460863db158e60170fe Mon Sep 17 00:00:00 2001 From: "C. Allwardt" <3979063+craig8@users.noreply.github.com> Date: Wed, 2 Apr 2025 17:10:39 -0700 Subject: [PATCH 4/5] Refactor DistributedAgent to streamline message bus connection logic and improve logging --- .../field_interface/agents/agents.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py index e31d900..83414ed 100644 --- a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py +++ b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/agents/agents.py @@ -60,9 +60,6 @@ def __init__(self, self.simulation_id = simulation_id self.context = None - # TODO: Change params and connection to local connection - #self.params = ConnectionParameters(cim_profile=CIM_PROFILE, iec61970_301=IEC61970_301) - self.connection = GridappsdConnection() self.connection.cim_profile = cim_profile @@ -78,14 +75,10 @@ def __init__(self, self.agent_area_dict = agent_area_dict if upstream_message_bus_def is not None: - if upstream_message_bus_def.is_ot_bus: - self.upstream_message_bus = MessageBusFactory.create(upstream_message_bus_def) - # else: - # self.upstream_message_bus = VolttronMessageBus(upstream_message_bus_def) - + self.upstream_message_bus = MessageBusFactory.create(upstream_message_bus_def) + if downstream_message_bus_def is not None: - if downstream_message_bus_def.is_ot_bus: - self.downstream_message_bus = MessageBusFactory.create(downstream_message_bus_def) + self.downstream_message_bus = MessageBusFactory.create(downstream_message_bus_def) if self.downstream_message_bus is None and self.upstream_message_bus is None: raise ValueError("Must have at least a downstream and/or upstream message bus specified") @@ -96,11 +89,23 @@ def _connect(self): if self.upstream_message_bus is not None: self.upstream_message_bus.connect() + assert self.upstream_message_bus.is_connected(), "Failed to connect to upstream message bus" + _log.debug(f"Connected to upstream message bus: {self.upstream_message_bus.id}") + else: + _log.debug("No upstream message bus specified, skipping connection.") if self.downstream_message_bus is not None: self.downstream_message_bus.connect() + assert self.downstream_message_bus.is_connected(), "Failed to connect to downstream message bus" + _log.debug(f"Connected to downstream message bus: {self.downstream_message_bus.id}") + else: + _log.debug("No downstream message bus specified, skipping connection.") + if self.downstream_message_bus is None and self.upstream_message_bus is None: raise ValueError("Either upstream or downstream bus must be specified!") + _log.debug(f"Upstream: {self.upstream_message_bus} downstream: {self.downstream_message_bus}") + _log.debug(f"Connected to message bus: {self.downstream_message_bus.id} and {self.upstream_message_bus.id}") + _log.debug(f"Agent ID: {self.agent_id} and App ID: {self.app_id}") if ('context_manager' not in self.app_id): self.agent_id = "da_" + self.app_id + "_" + self.downstream_message_bus.id From 6cb407d9e3abcd3677f77eb2fbf5f698233263ac Mon Sep 17 00:00:00 2001 From: "C. Allwardt" <3979063+craig8@users.noreply.github.com> Date: Wed, 2 Apr 2025 17:11:08 -0700 Subject: [PATCH 5/5] Enhance GridAPPSDMessageBus with connection checks and improved logging; refactor CallbackRouter for wildcard topic support --- .../field_interface/gridappsd_field_bus.py | 31 ++++++++- gridappsd-python-lib/gridappsd/goss.py | 69 +++++++++++-------- 2 files changed, 70 insertions(+), 30 deletions(-) diff --git a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/gridappsd_field_bus.py b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/gridappsd_field_bus.py index 724d6e0..a9e0a6d 100644 --- a/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/gridappsd_field_bus.py +++ b/gridappsd-field-bus-lib/gridappsd_field_bus/field_interface/gridappsd_field_bus.py @@ -1,3 +1,7 @@ +import urllib + +from loguru import logger as _log + from gridappsd import GridAPPSD from gridappsd_field_bus.field_interface.interfaces import FieldMessageBus from gridappsd_field_bus.field_interface.interfaces import MessageBusDefinition @@ -23,17 +27,37 @@ def is_connected(self) -> bool: """ Is this object connected to the message bus """ - pass + if self.gridappsd_obj is not None: + return self.gridappsd_obj.connected + else: + _log.error("GridAPPSD object is not initialized. Cannot check connection status.") + return False def connect(self): """ Connect to the concrete message bus that implements this interface. """ - self.gridappsd_obj = GridAPPSD() + _log.debug(f"Connecting to GridAPPSD message bus with address: {self._address}") + if self.gridappsd_obj is None: + addr = self._address + if not addr.startswith("tcp://"): + addr = "tcp://" + self._address + parsed = urllib.parse.urlparse(addr) + # TODO Handle a non-auth token field connection if possible + self.gridappsd_obj = GridAPPSD(stomp_address=parsed.hostname, stomp_port=parsed.port, + username=self._user, + password=self._password, + use_auth_token=False) + else: + _log.error("GridAPPSD object is not initialized. Cannot connect to message bus.") + def subscribe(self, topic, callback): + _log.debug(f"Subscribing to topic: {topic}") if self.gridappsd_obj is not None: self.gridappsd_obj.subscribe(topic, callback) + else: + _log.error("GridAPPSD object is not connected. Cannot subscribe to topic.") def unsubscribe(self, topic): pass @@ -42,8 +66,11 @@ def send(self, topic: str, message: Any): """ Publish device specific data to the concrete message bus. """ + _log.debug(f"Sending message to topic: {topic} with message: {message}") if self.gridappsd_obj is not None: self.gridappsd_obj.send(topic, message) + else: + _log.error("GridAPPSD object is not connected. Cannot send message.") def get_response(self, topic, message, timeout=5): """ diff --git a/gridappsd-python-lib/gridappsd/goss.py b/gridappsd-python-lib/gridappsd/goss.py index 75ad64c..cc6af95 100644 --- a/gridappsd-python-lib/gridappsd/goss.py +++ b/gridappsd-python-lib/gridappsd/goss.py @@ -44,10 +44,10 @@ """ import base64 import inspect -#import json import logging import os import random +import re import threading from collections import defaultdict from datetime import datetime @@ -389,8 +389,8 @@ class CallbackRouter(object): def __init__(self): self.callbacks = {} - self._topics_callback_map = defaultdict(list) - self._queue_callerback = Queue() + self._topic_callbacks = defaultdict(list) + self._queue_callback = Queue() self._thread = threading.Thread(target=self.run_callbacks) self._thread.daemon = True self._thread.start() @@ -398,40 +398,53 @@ def __init__(self): def run_callbacks(self): _log.debug("Starting thread queue") while True: - cb, hdrs, msg = self._queue_callerback.get() + callbacks, headers, msg = self._queue_callback.get() try: - msg = json.loads(msg) - except: - pass - # msg = message - - for c in cb: - c(hdrs, msg) + for callback in callbacks: + callback(headers, msg) + except Exception as e: + _log.error(f"Error in callback execution: {e}") sleep(0.01) - def add_callback(self, topic, callback): - if not topic.startswith('/topic/') and not topic.startswith('/temp-queue/'): - topic = "/queue/{topic}".format(topic=topic) - if callback in self._topics_callback_map[topic]: - raise ValueError("Callbacks can only be used one time per topic") - _log.debug("Added callbac using topic {topic}".format(topic=topic)) - self._topics_callback_map[topic].append(callback) - - def remove_callback(self, topic, callback): - if topic in self._topics_callback_map: - callbacks = self._topics_callback_map[topic] + def add_callback(self, topic_pattern, callback): + """ Add a callback for a topic, supporting wildcards. + + The topic can contain wildcards (based upon activmeq artemis + https://activemq.apache.org/components/classic/documentation/stomp-manual): + + - '*' matches any level (e.g., `/topic/goss.gridappsd.field.heartbeat.*`) + - '**' matches multiple levels (e.g., `/topic/goss.gridappsd.**`) + + :param topic_pattern: Topic string, allowing `*` or `**` as wildcards. + :param callback: Function to call when a matching message is received. + """ + regex_pattern = re.escape(topic_pattern).replace(r'\*', '[^/]+').replace(r'\*\*', '.*') + self._topic_callbacks[regex_pattern].append(callback) + _log.debug(f"Added callback for topic pattern: {topic_pattern} (Regex: {regex_pattern})") + + def remove_callback(self, topic_pattern, callback): + regex_pattern = re.escape(topic_pattern).replace(r'\*', '[^/]+').replace(r'\*\*', '.*') + if regex_pattern in self._topic_callbacks: try: - callbacks.remove(callback) + self._topic_callbacks[regex_pattern].remove(callback) + if not self._topic_callbacks[regex_pattern]: + del self._topic_callbacks[regex_pattern] except ValueError: pass def on_message(self, headers, message): - destination = headers['destination'] - # _log.debug("Topic map keys are: {keys}".format(keys=self._topics_callback_map.keys())) - if destination in self._topics_callback_map: - self._queue_callerback.put((self._topics_callback_map[destination], headers, message)) + """ Handles incoming messages by checking if any subscribed wildcard matches the topic. """ + topic = headers.get("destination") + matching_callbacks = [] + + for pattern, callbacks in self._topic_callbacks.items(): + if re.fullmatch(pattern, topic): + matching_callbacks.extend(callbacks) + + if matching_callbacks: + self._queue_callback.put((matching_callbacks, headers, message)) else: - _log.error("INVALID DESTINATION {destination}".format(destination=destination)) + _log.warning(f"No matching callback for topic: {topic}") def on_error(self, header, message): _log.error("Error in callback router")