Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime
from typing import Dict

from cimgraph.databases import ConnectionParameters
from cimgraph.databases.gridappsd import GridappsdConnection
from cimgraph.models import FeederModel
from cimgraph.models.distributed_area import DistributedArea
Expand Down Expand Up @@ -61,10 +60,7 @@ def __init__(self,
self.simulation_id = simulation_id
self.context = None

# TODO: Change params and connection to local connection
self.params = ConnectionParameters(cim_profile=CIM_PROFILE, iec61970_301=IEC61970_301)

self.connection = GridappsdConnection(self.params)
self.connection = GridappsdConnection()
self.connection.cim_profile = cim_profile

self.app_id = agent_config['app_id']
Expand All @@ -79,14 +75,10 @@ def __init__(self,
self.agent_area_dict = agent_area_dict

if upstream_message_bus_def is not None:
if upstream_message_bus_def.is_ot_bus:
self.upstream_message_bus = MessageBusFactory.create(upstream_message_bus_def)
# else:
# self.upstream_message_bus = VolttronMessageBus(upstream_message_bus_def)

self.upstream_message_bus = MessageBusFactory.create(upstream_message_bus_def)

if downstream_message_bus_def is not None:
if downstream_message_bus_def.is_ot_bus:
self.downstream_message_bus = MessageBusFactory.create(downstream_message_bus_def)
self.downstream_message_bus = MessageBusFactory.create(downstream_message_bus_def)

if self.downstream_message_bus is None and self.upstream_message_bus is None:
raise ValueError("Must have at least a downstream and/or upstream message bus specified")
Expand All @@ -97,11 +89,23 @@ def _connect(self):

if self.upstream_message_bus is not None:
self.upstream_message_bus.connect()
assert self.upstream_message_bus.is_connected(), "Failed to connect to upstream message bus"
_log.debug(f"Connected to upstream message bus: {self.upstream_message_bus.id}")
else:
_log.debug("No upstream message bus specified, skipping connection.")
if self.downstream_message_bus is not None:
self.downstream_message_bus.connect()
assert self.downstream_message_bus.is_connected(), "Failed to connect to downstream message bus"
_log.debug(f"Connected to downstream message bus: {self.downstream_message_bus.id}")
else:
_log.debug("No downstream message bus specified, skipping connection.")

if self.downstream_message_bus is None and self.upstream_message_bus is None:
raise ValueError("Either upstream or downstream bus must be specified!")

_log.debug(f"Upstream: {self.upstream_message_bus} downstream: {self.downstream_message_bus}")
_log.debug(f"Connected to message bus: {self.downstream_message_bus.id} and {self.upstream_message_bus.id}")
_log.debug(f"Agent ID: {self.agent_id} and App ID: {self.app_id}")
if ('context_manager' not in self.app_id):
self.agent_id = "da_" + self.app_id + "_" + self.downstream_message_bus.id

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import urllib

from loguru import logger as _log

from gridappsd import GridAPPSD
from gridappsd_field_bus.field_interface.interfaces import FieldMessageBus
from gridappsd_field_bus.field_interface.interfaces import MessageBusDefinition
Expand All @@ -23,17 +27,37 @@ def is_connected(self) -> bool:
"""
Is this object connected to the message bus
"""
pass
if self.gridappsd_obj is not None:
return self.gridappsd_obj.connected
else:
_log.error("GridAPPSD object is not initialized. Cannot check connection status.")
return False

def connect(self):
"""
Connect to the concrete message bus that implements this interface.
"""
self.gridappsd_obj = GridAPPSD()
_log.debug(f"Connecting to GridAPPSD message bus with address: {self._address}")
if self.gridappsd_obj is None:
addr = self._address
if not addr.startswith("tcp://"):
addr = "tcp://" + self._address
parsed = urllib.parse.urlparse(addr)
# TODO Handle a non-auth token field connection if possible
self.gridappsd_obj = GridAPPSD(stomp_address=parsed.hostname, stomp_port=parsed.port,
username=self._user,
password=self._password,
use_auth_token=False)
else:
_log.error("GridAPPSD object is not initialized. Cannot connect to message bus.")

Comment on lines +51 to +53
Copy link

Copilot AI Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic in the connect method is confusing: the else branch logs an error when gridappsd_obj is already set. Consider revising the condition to ensure that an existing, valid GridAPPSD object is used instead of logging an error.

Suggested change
else:
_log.error("GridAPPSD object is not initialized. Cannot connect to message bus.")
elif not self.gridappsd_obj.connected:
_log.debug("GridAPPSD object is not connected. Reconnecting to message bus.")
self.gridappsd_obj.connect()
else:
_log.debug("GridAPPSD object is already connected to the message bus.")

Copilot uses AI. Check for mistakes.

def subscribe(self, topic, callback):
_log.debug(f"Subscribing to topic: {topic}")
if self.gridappsd_obj is not None:
self.gridappsd_obj.subscribe(topic, callback)
else:
_log.error("GridAPPSD object is not connected. Cannot subscribe to topic.")

def unsubscribe(self, topic):
pass
Expand All @@ -42,8 +66,11 @@ def send(self, topic: str, message: Any):
"""
Publish device specific data to the concrete message bus.
"""
_log.debug(f"Sending message to topic: {topic} with message: {message}")
if self.gridappsd_obj is not None:
self.gridappsd_obj.send(topic, message)
else:
_log.error("GridAPPSD object is not connected. Cannot send message.")

def get_response(self, topic, message, timeout=5):
"""
Expand Down
93 changes: 63 additions & 30 deletions gridappsd-python-lib/gridappsd/goss.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
"""
import base64
import inspect
#import json
import logging
import os
import random
import re
import threading
from collections import defaultdict
from datetime import datetime
Expand All @@ -58,6 +58,10 @@
from stomp import Connection12 as Connection
from stomp.exception import NotConnectedException
from time import sleep
try: # python2.7
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse

from gridappsd import json_extension as json

Expand Down Expand Up @@ -119,6 +123,22 @@ def __init__(self,

if not self.__user__ or not self.__pass__:
raise ValueError("Invalid username/password specified.")

if self.stomp_address is None or self.stomp_address == '':
raise ValueError("Invalid stomp address specified.")

if not self.stomp_address.startswith("tcp://"):
self.stomp_address = "tcp://" + self.stomp_address

parsed_address = urlparse(self.stomp_address)

if self.stomp_port is None or self.stomp_port == '':
if parsed_address.port is not None:
self.stomp_port = parsed_address.port
else:
raise ValueError("Invalid stomp port specified.")

self._connection_tuple = [(parsed_address.hostname, int(self.stomp_port))]
self._conn = None
self._ids = set()
self._topic_set = set()
Expand Down Expand Up @@ -306,7 +326,7 @@ def _make_connection(self):
# send request to token topic
tokenTopic = "/topic/pnnl.goss.token.topic"

tmpConn = Connection([(self.stomp_address, self.stomp_port)])
tmpConn = Connection([*self._connection_tuple])
if self._override_thread_fc is not None:
tmpConn.transport.override_threading(self._override_thread_fc)
tmpConn.connect(self.__user__, self.__pass__, wait=True)
Expand Down Expand Up @@ -349,7 +369,7 @@ def on_disconnect(self, header, message):
sleep(1)
iter += 1

self._conn = Connection([(self.stomp_address, self.stomp_port)])
self._conn = Connection([*self._connection_tuple])
if self._override_thread_fc is not None:
self._conn.transport.override_threading(self._override_thread_fc)
try:
Expand All @@ -369,49 +389,62 @@ class CallbackRouter(object):

def __init__(self):
self.callbacks = {}
self._topics_callback_map = defaultdict(list)
self._queue_callerback = Queue()
self._topic_callbacks = defaultdict(list)
self._queue_callback = Queue()
self._thread = threading.Thread(target=self.run_callbacks)
self._thread.daemon = True
self._thread.start()

def run_callbacks(self):
_log.debug("Starting thread queue")
while True:
cb, hdrs, msg = self._queue_callerback.get()
callbacks, headers, msg = self._queue_callback.get()
try:
msg = json.loads(msg)
except:
pass
# msg = message

for c in cb:
c(hdrs, msg)
for callback in callbacks:
callback(headers, msg)
except Exception as e:
_log.error(f"Error in callback execution: {e}")
sleep(0.01)

def add_callback(self, topic, callback):
if not topic.startswith('/topic/') and not topic.startswith('/temp-queue/'):
topic = "/queue/{topic}".format(topic=topic)
if callback in self._topics_callback_map[topic]:
raise ValueError("Callbacks can only be used one time per topic")
_log.debug("Added callbac using topic {topic}".format(topic=topic))
self._topics_callback_map[topic].append(callback)

def remove_callback(self, topic, callback):
if topic in self._topics_callback_map:
callbacks = self._topics_callback_map[topic]
def add_callback(self, topic_pattern, callback):
""" Add a callback for a topic, supporting wildcards.

The topic can contain wildcards (based upon activmeq artemis
https://activemq.apache.org/components/classic/documentation/stomp-manual):

- '*' matches any level (e.g., `/topic/goss.gridappsd.field.heartbeat.*`)
- '**' matches multiple levels (e.g., `/topic/goss.gridappsd.**`)

:param topic_pattern: Topic string, allowing `*` or `**` as wildcards.
:param callback: Function to call when a matching message is received.
"""
regex_pattern = re.escape(topic_pattern).replace(r'\*', '[^/]+').replace(r'\*\*', '.*')
Copy link

Copilot AI Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing the single wildcard before the double wildcard may prevent the '**' conversion from ever being applied. It is recommended to replace the double wildcard pattern first.

Suggested change
regex_pattern = re.escape(topic_pattern).replace(r'\*', '[^/]+').replace(r'\*\*', '.*')
regex_pattern = re.escape(topic_pattern).replace(r'\*\*', '.*').replace(r'\*', '[^/]+')

Copilot uses AI. Check for mistakes.
self._topic_callbacks[regex_pattern].append(callback)
_log.debug(f"Added callback for topic pattern: {topic_pattern} (Regex: {regex_pattern})")

def remove_callback(self, topic_pattern, callback):
regex_pattern = re.escape(topic_pattern).replace(r'\*', '[^/]+').replace(r'\*\*', '.*')
if regex_pattern in self._topic_callbacks:
try:
callbacks.remove(callback)
self._topic_callbacks[regex_pattern].remove(callback)
if not self._topic_callbacks[regex_pattern]:
del self._topic_callbacks[regex_pattern]
except ValueError:
pass

def on_message(self, headers, message):
destination = headers['destination']
# _log.debug("Topic map keys are: {keys}".format(keys=self._topics_callback_map.keys()))
if destination in self._topics_callback_map:
self._queue_callerback.put((self._topics_callback_map[destination], headers, message))
""" Handles incoming messages by checking if any subscribed wildcard matches the topic. """
topic = headers.get("destination")
matching_callbacks = []

for pattern, callbacks in self._topic_callbacks.items():
if re.fullmatch(pattern, topic):
matching_callbacks.extend(callbacks)

if matching_callbacks:
self._queue_callback.put((matching_callbacks, headers, message))
else:
_log.error("INVALID DESTINATION {destination}".format(destination=destination))
_log.warning(f"No matching callback for topic: {topic}")

def on_error(self, header, message):
_log.error("Error in callback router")
Expand Down
9 changes: 6 additions & 3 deletions gridappsd-python-lib/gridappsd/gridappsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,19 @@ class GridAPPSD(GOSS):
# TODO Get the caller from the traceback/inspect module.
def __init__(self, simulation_id=None, address=None, **kwargs):

if address is None:
address = utils.get_gridappsd_address()

if 'stomp_address' in kwargs and 'stomp_port' in kwargs:
address = (kwargs.pop('stomp_address'), kwargs.pop('stomp_port'))
elif 'stomp_address' in kwargs and not 'stomp_port' in kwargs or \
'stomp_port' in kwargs and not 'stomp_address' in kwargs:
raise ValueError("If stomp_address is specified the so should stomp_port")

super(GridAPPSD, self).__init__(stomp_address=address[0], stomp_port=address[1], **kwargs)
if address is None:
super().__init__(**kwargs)
elif isinstance(address, tuple) or isinstance(address, list) and len(address) == 2:
Copy link

Copilot AI Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition combining isinstance checks may be ambiguous due to operator precedence. Consider adding explicit parentheses to clarify the intended logic, e.g., 'elif isinstance(address, tuple) or (isinstance(address, list) and len(address) == 2):'.

Suggested change
elif isinstance(address, tuple) or isinstance(address, list) and len(address) == 2:
elif isinstance(address, tuple) or (isinstance(address, list) and len(address) == 2):

Copilot uses AI. Check for mistakes.
super().__init__(stomp_address=address[0], stomp_port=address[1], **kwargs)
else:
raise ValueError("address must be a tuple of (hostname, port) or None")
self._houses = Houses(self)
self._simulation_log_topic = None
self._simulation_id = None
Expand Down
Loading