diff --git a/.gitignore b/.gitignore index 8d5a3f14..dca2d880 100644 --- a/.gitignore +++ b/.gitignore @@ -12,7 +12,7 @@ Plugins/* !Plugins/plugin_template/** !Plugins/tpms/** !Plugins/wifi/** - +!Plugins/test/ # MacOS Files .DS_Store @@ -30,3 +30,4 @@ Plugins/* .env # Some currently unused env files, but often contains secrets, so better be safe than sorry .venv env.py +artifacts* \ No newline at end of file diff --git a/Plugins/test/install_files/artifact_create.py b/Plugins/test/install_files/artifact_create.py new file mode 100644 index 00000000..3fad0db2 --- /dev/null +++ b/Plugins/test/install_files/artifact_create.py @@ -0,0 +1,76 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Artifact Creation Test Operation +""" +import asyncio +import logging +import os +import sys +from typing import Callable, Union + +try: + from fissure.utils.plugins.operations import Operation, ArtifactManager +except ImportError: + # add fissure to path and import modules + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..'))) + from fissure.utils.plugins.operations import Operation, ArtifactManager + +class OperationMain(Operation): + """Artifact Creation Test Operation""" + def __init__(self, frequency: int = 10, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = logging.getLogger(__name__), alert_callback: Union[Callable, None] = None, tak_cot_callback: Union[Callable, None] = None, artifact_manager: Union[ArtifactManager, None] = None) -> None: + """Initialize the Artifact Creation Test Operation. + + Parameters + ---------- + frequency : int, optional + The frequency in seconds at which to create artifacts, by default 1 + sensor_node_id : Union[int, str], optional + The ID of the sensor node, by default 0 + logger : logging.Logger, optional + Logger instance for logging, by default None + alert_callback : Union[Callable, None], optional + Callback function for alerts, by default None for logger-only alerts + tak_cot_callback : Union[Callable, None], optional + Callback function for TAK CoT messages, by default None for logger-only TAK CoT messages + artifact_manager : Union[ArtifactManager, None], optional + ArtifactManager instance for managing artifacts, by default None to use the global artifact manager + """ + # templated common init + super().__init__(sensor_node_id=sensor_node_id, logger=logger, alert_callback=alert_callback, tak_cot_callback=tak_cot_callback, artifact_manager=artifact_manager) + + # developer defined init + self.frequency = int(frequency) + + self._stop = False + + async def run(self) -> None: + """Run the Artifact Creation Test Operation.""" + self.logger.info("Starting Artifact Creation Test Operation") + count = 0 + while not self._stop: + count += 1 + self.logger.info(f"Creating test artifact number {count}") + art_fname = self.artifact_manager.get_filename_for_artifact(self.opid, '.txt') + with open(art_fname, 'w') as art_fd: + self.logger.debug(f"Writing to artifact file: {art_fname}") + art_fd.write(f"This is test artifact number {count}\n") + self.logger.debug(f"Finished writing to artifact file: {art_fname}") + _ = self.create_artifact( + file_path=art_fname, + name=f"Test artifact {count}", + artifact_type="text/plain", + metadata={"description": f"Test artifact number {count}"} + ) + self.logger.debug(f"Created artifact {count} with ID {_}") + self.logger.info(f"Artifact creation test operation count={count}") + await asyncio.sleep(self.frequency) + +if __name__ == "__main__": + """Run the plugin script as a standalone program for testing purposes. + """ + from fissure.utils.plugins.test_operation import run_test + run_test( + OperationMain, + {'frequency': 10}, + {} + ) \ No newline at end of file diff --git a/Plugins/test/install_files/artifact_create_image.py b/Plugins/test/install_files/artifact_create_image.py new file mode 100644 index 00000000..c0247a4c --- /dev/null +++ b/Plugins/test/install_files/artifact_create_image.py @@ -0,0 +1,87 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Artifact Creation Test Operation +""" +import asyncio +import logging +import numpy as np +from PIL import Image +import os +import sys +from typing import Callable, Union + +try: + from fissure.utils.plugins.operations import Operation, ArtifactManager +except ImportError: + # add fissure to path and import modules + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..'))) + from fissure.utils.plugins.operations import Operation, ArtifactManager + +class OperationMain(Operation): + """Artifact Creation Test Operation""" + def __init__(self, frequency: int = 10, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = logging.getLogger(__name__), alert_callback: Union[Callable, None] = None, tak_cot_callback: Union[Callable, None] = None, artifact_manager: Union[ArtifactManager, None] = None) -> None: + """Initialize the Artifact Creation Test Operation. + + Parameters + ---------- + frequency : int, optional + The frequency in seconds at which to create artifacts, by default 1 + sensor_node_id : Union[int, str], optional + The ID of the sensor node, by default 0 + logger : logging.Logger, optional + Logger instance for logging, by default None + alert_callback : Union[Callable, None], optional + Callback function for alerts, by default None for logger-only alerts + tak_cot_callback : Union[Callable, None], optional + Callback function for TAK CoT messages, by default None for logger-only TAK CoT messages + artifact_manager : Union[ArtifactManager, None], optional + ArtifactManager instance for managing artifacts, by default None to use the global artifact manager + """ + # templated common init + super().__init__(sensor_node_id=sensor_node_id, logger=logger, alert_callback=alert_callback, tak_cot_callback=tak_cot_callback, artifact_manager=artifact_manager) + + # developer defined init + self.frequency = int(frequency) + + self._stop = False + + async def run(self) -> None: + """Run the Artifact Creation Test Operation.""" + self.logger.info("Starting Artifact Creation Test Operation") + count = 0 + while not self._stop: + color = np.random.randint(0, 256, size=3, dtype=np.uint8) + count += 1 + self.logger.info(f"Creating test artifact number {count}") + art_fname = self.artifact_manager.get_filename_for_artifact(self.opid, '.png') + with open(art_fname, 'wb') as art_fd: + self.logger.debug(f"Writing to artifact file: {art_fname}") + # Create a small image (e.g., 100x100 pixels) with the random color + width, height = 100, 100 + image = Image.new('RGB', (width, height), tuple(color)) + image.save(art_fd, format='PNG') + + self.logger.debug(f"Finished writing to artifact file: {art_fname}") + _ = self.create_artifact( + file_path=art_fname, + name=f"Test artifact image data {count}", + artifact_type="image/png", + metadata={ + "description": f"Test image artifact number {count}", + 'color_space': 'RGB', + 'color': f'R:{color[0]} G:{color[1]} B:{color[2]}' + } + ) + self.logger.debug(f"Created artifact {count} with ID {_}") + self.logger.info(f"Artifact creation test operation count={count}") + await asyncio.sleep(self.frequency) + +if __name__ == "__main__": + """Run the plugin script as a standalone program for testing purposes. + """ + from fissure.utils.plugins.test_operation import run_test + run_test( + OperationMain, + {'frequency': 10}, + {} + ) \ No newline at end of file diff --git a/Plugins/test/install_files/artifact_create_iq.py b/Plugins/test/install_files/artifact_create_iq.py new file mode 100644 index 00000000..b627fdad --- /dev/null +++ b/Plugins/test/install_files/artifact_create_iq.py @@ -0,0 +1,87 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- +"""Artifact Creation Test Operation +""" +import asyncio +import logging +import numpy as np +import os +import sys +from typing import Callable, Union + +try: + from fissure.utils.plugins.operations import Operation, ArtifactManager +except ImportError: + # add fissure to path and import modules + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../../..'))) + from fissure.utils.plugins.operations import Operation, ArtifactManager + +class OperationMain(Operation): + """Artifact Creation Test Operation""" + def __init__(self, frequency: int = 10, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = logging.getLogger(__name__), alert_callback: Union[Callable, None] = None, tak_cot_callback: Union[Callable, None] = None, artifact_manager: Union[ArtifactManager, None] = None) -> None: + """Initialize the Artifact Creation Test Operation. + + Parameters + ---------- + frequency : int, optional + The frequency in seconds at which to create artifacts, by default 1 + sensor_node_id : Union[int, str], optional + The ID of the sensor node, by default 0 + logger : logging.Logger, optional + Logger instance for logging, by default None + alert_callback : Union[Callable, None], optional + Callback function for alerts, by default None for logger-only alerts + tak_cot_callback : Union[Callable, None], optional + Callback function for TAK CoT messages, by default None for logger-only TAK CoT messages + artifact_manager : Union[ArtifactManager, None], optional + ArtifactManager instance for managing artifacts, by default None to use the global artifact manager + """ + # templated common init + super().__init__(sensor_node_id=sensor_node_id, logger=logger, alert_callback=alert_callback, tak_cot_callback=tak_cot_callback, artifact_manager=artifact_manager) + + # developer defined init + self.frequency = int(frequency) + + self._stop = False + + async def run(self) -> None: + """Run the Artifact Creation Test Operation.""" + self.logger.info("Starting Artifact Creation Test Operation") + count = 0 + while not self._stop: + fc = np.random.uniform(-0.5, 0.5) + snr_db = np.random.uniform(0, 20) + count += 1 + self.logger.info(f"Creating test artifact number {count}") + art_fname = self.artifact_manager.get_filename_for_artifact(self.opid, '.32cf') + with open(art_fname, 'w') as art_fd: + self.logger.debug(f"Writing to artifact file: {art_fname}") + noise = (np.random.randn(1024) + 1j*np.random.randn(1024))/np.sqrt(2) + signal = np.exp(1j * 2 * np.pi * fc * np.arange(1024)) + samples = signal + 10**(-snr_db/10) * noise + samples.astype(np.complex64).tofile(art_fd) + self.logger.debug(f"Finished writing to artifact file: {art_fname}") + _ = self.create_artifact( + file_path=art_fname, + name=f"Test artifact IQ data {count}", + artifact_type="iq/32cf", + metadata={ + "description": f"Test IQ artifact number {count}", + 'center_frequency': fc, + 'sample_rate': 1.0, + 'snr_db': snr_db + } + ) + self.logger.debug(f"Created artifact {count} with ID {_}") + self.logger.info(f"Artifact creation test operation count={count}") + await asyncio.sleep(self.frequency) + +if __name__ == "__main__": + """Run the plugin script as a standalone program for testing purposes. + """ + from fissure.utils.plugins.test_operation import run_test + run_test( + OperationMain, + {'frequency': 10}, + {} + ) \ No newline at end of file diff --git a/Plugins/wifi/install_files/wifi_scan_ap.py b/Plugins/wifi/install_files/wifi_scan_ap.py index 2dfd5e0b..4cc861cf 100644 --- a/Plugins/wifi/install_files/wifi_scan_ap.py +++ b/Plugins/wifi/install_files/wifi_scan_ap.py @@ -9,6 +9,7 @@ """ import asyncio import csv +import json import logging import numpy as np import os @@ -26,20 +27,20 @@ # add wifi_lib to path and import modules sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) -from wifi_lib.query_iface import verify_interface, get_channels +from wifi_lib.query_iface import verify_interface, get_channels, choose_interface from wifi_lib.configure_iface import set_monitor_mode, set_channel from wifi_lib.oui import OUILookup, get_vendor_common_name class OperationMain(Operation): """WiFi AP Scanner """ - def __init__(self, dev: str, duration: float = -1, dwell: float = 1, power: float = -100, channels: Union[List[int], None] = None, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = logging.getLogger(__name__), alert_callback: callable = None, tak_cot_callback: callable = None) -> None: + def __init__(self, dev: Union[str, None] = None, duration: float = -1, dwell: float = 1, power: float = -100, channels: Union[List[int], None] = None, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = logging.getLogger(__name__), alert_callback: callable = None, tak_cot_callback: callable = None) -> None: """ Initialize the Wifi AP Scanner. Parameters ---------- - dev : str + dev : str, optional Network interface to use for scanning (e.g., 'wlan0'). duration : float, optional Duration of the scan in seconds. Default is -1 for indefinite scanning. @@ -62,6 +63,12 @@ def __init__(self, dev: str, duration: float = -1, dwell: float = 1, power: floa super().__init__(sensor_node_id=sensor_node_id, logger=logger, alert_callback=alert_callback, tak_cot_callback=tak_cot_callback) # developer defined init + if dev is None: + # choose interface + dev = choose_interface(logger=self.logger) + if dev is None: + self.logger.error("No suitable WiFi interface found for scanning.") + raise ValueError("No suitable WiFi interface found for scanning.") self.dev = dev self.duration = float(duration) self.dwell = float(dwell) @@ -80,7 +87,9 @@ def __init__(self, dev: str, duration: float = -1, dwell: float = 1, power: floa 'dev': self.dev } - self.aps = {} # AP table + self.ssids = {} # SSID table + + self.artifacts = {} # artifact table # build tshark command self.fields_ordered = [ @@ -181,6 +190,7 @@ async def run(self) -> None: """ # run end time tend = np.inf if self.duration == -1 else time.time() + self.duration + self._mac_reported = {} while not self._stop: for channel in self.channels: # yield event loop and check for stop conditions @@ -194,13 +204,6 @@ async def run(self) -> None: await self.stop() return - # set monitor mode - configured = set_monitor_mode(self.dev, raise_error=True, logger=self.logger) - if not configured: - self.logger.debug(f"Failed to set monitor mode on interface {self.dev}.") - self.logger.error("Operation environment setup failed.") - return - # set channel channel_set = set_channel(self.dev, channel) if not channel_set: @@ -233,27 +236,65 @@ async def run(self) -> None: # get ssid ssid_raw = row[self.fields.get('ssid')] - if ssid_raw == '' or ssid_raw == '' or ssid_raw is None: - # no ssid - ssid = None + if ssid_raw in ['', '']: + ssid = ssid_raw + elif ssid_raw is None: + ssid = '' else: ssid = '' for i in range(0,len(ssid_raw),2): ssid += chr(int(ssid_raw[i:i+2], 16)) - # report AP if beacon or association request - type_subtype = int(row[self.fields.get('type_subtype')], 16) - freq = float(row[self.fields.get('freq')]) + # report AP and stations based on frame type/subtype + subtype_idx = self.fields.get('type_subtype') + if subtype_idx is None: + self.logger.debug("type_subtype field index not found; skipping row.") + continue + type_subtype = int(row[subtype_idx], 16) + freq_idx = self.fields.get('freq') + if freq_idx is None: + self.logger.debug("freq field index not found; skipping row.") + continue + freq = float(row[freq_idx]) + sa_idx = self.fields.get('sa') + da_idx = self.fields.get('da') + + # Management frames (Type 0) if type_subtype == 8: # beacon - await self.add_ap(ssid, row[self.fields.get('sa')], freq, power) + if sa_idx is not None: + await self.add_ap(ssid, row[sa_idx], freq, power, emtype='ap') + elif type_subtype == 5: # probe response (from AP) + if sa_idx is not None: + await self.add_ap(ssid, row[sa_idx], freq, power, emtype='ap') + elif type_subtype == 4: # probe request (from station) + if sa_idx is not None: + await self.add_ap(ssid, row[sa_idx], freq, power, emtype='station') elif type_subtype == 0: # association request - await self.add_ap(ssid, row[self.fields.get('da')], freq, power) + if da_idx is not None: + await self.add_ap(ssid, row[da_idx], freq, power, emtype='ap') # DA is AP + if sa_idx is not None: + await self.add_ap(ssid, row[sa_idx], freq, power, emtype='station') # SA is station + elif type_subtype == 1: # association response + if sa_idx is not None: + await self.add_ap(ssid, row[sa_idx], freq, power, emtype='ap') # SA is AP + if da_idx is not None: + await self.add_ap(ssid, row[da_idx], freq, power, emtype='station') # DA is station + elif type_subtype == 2: # reassociation request + if da_idx is not None: + await self.add_ap(ssid, row[da_idx], freq, power, emtype='ap') # DA is AP + if sa_idx is not None: + await self.add_ap(ssid, row[sa_idx], freq, power, emtype='station') # SA is station + elif type_subtype == 3: # reassociation response + if sa_idx is not None: + await self.add_ap(ssid, row[sa_idx], freq, power, emtype='ap') # SA is AP + if da_idx is not None: + await self.add_ap(ssid, row[da_idx], freq, power, emtype='station') # DA is station else: - self.logger.debug(f"Frame type_subtype {type_subtype} not beacon or association request; skipping.") + self.logger.debug(f"Frame type_subtype {type_subtype} not handled; skipping.") if nrows == 0: self.logger.warning(f"No data captured on channel {channel}.") - async def add_ap(self, ssid: str, mac: str, freq: float, power: float) -> None: + async def add_ap(self, ssid: str, mac: str, freq: float, power: float, emtype: str) -> None: """ Add the AP to the AP table if not already present and report. @@ -262,56 +303,117 @@ async def add_ap(self, ssid: str, mac: str, freq: float, power: float) -> None: ssid : str SSID of the access point. mac : str - MAC address of the access point. + MAC address of the emitter. freq : float - Frequency of the access point. + Frequency of the emitter. power : float - Signal power of the access point. + Signal power of the emitter. + emtype : str + Type of the emitter (e.g., 'ap' or 'station'). """ - if mac == '': - self.logger.debug("Empty MAC address received; skipping.") + vendor = self.oui_lookup.match(mac) + + # update ssid document + self.update_ssid_doc(ssid, mac, freq, power, emtype, vendor) + + # check if reported recently + if mac in self._mac_reported: + last_time = self._mac_reported[mac] + if time.time() - last_time < self.dwell: + # already reported in this scan cycle + self.logger.debug(f"AP with MAC {mac} already reported recently; skipping.") + return + else: + self._mac_reported[mac] = time.time() + else: + self._mac_reported[mac] = time.time() - elif mac not in self._curr_ap: - self._curr_ap.append(mac) - vendor = self.oui_lookup.match(mac) + if ssid is None or ssid == '': + ssid = f'{emtype.upper()}_{mac}' - # get vendor plain text - vendor_plain = get_vendor_common_name(vendor) + # send alert + await self.alert_callback(self.sensor_node_id, self.opid, f'Wifi AP vendor "{vendor}" Detected: SSID={ssid} MAC={mac} FREQ={freq}MHz POWER={power}dBm', logger=self.logger) - # add to AP table - self.aps[mac] = { - 'vendor': vendor, - 'vendor_plain': vendor_plain, - 'ssid': ssid, - 'stations': {} - } + # send tak cot + await self.tak_cot_callback(self.sensor_node_id, self.opid, uid=ssid, remarks=f'Wifi AP vendor "{vendor}" Detected: SSID={ssid} MAC={mac} FREQ={freq}MHz POWER={power}dBm', lat=True, lon=True, alt=True, time=True, type='a-h-G-E-S', logger=self.logger) - if ssid is None or ssid == '': - ssid = mac + def update_ssid_doc(self, ssid: str, mac: str, freq: float, power: float, emtype: str, vendor: Union[str, None]) -> None: + """ + Update the SSID document in the database. - # send alert - await self.alert_callback(self.sensor_node_id, self.opid, f'Wifi AP vendor "{vendor}" Detected: SSID={ssid} MAC={mac} FREQ={freq}MHz POWER={power}dBm', logger=self.logger) + Parameters + ---------- + ssid : str + SSID of the access point. + mac : str + MAC address of the emitter. + freq : float + Frequency of the emitter (in MHz). + power : float + Signal power of the emitter (in dBm). + emtype : str + Type of the emitter ('ap' or 'station'). + vendor : Union[str, None] + Vendor of the emitter. + """ + if ssid not in self.ssids: + self.ssids[ssid] = { + 'ssid': ssid, + 'aps': [{'mac': mac, 'freq': freq, 'power': power, 'vendor': vendor}] if emtype == 'ap' else [], + 'stations': [{'mac': mac, 'freq': freq, 'power': power, 'vendor': vendor}] if emtype == 'station' else [], + } - # send tak cot - await self.tak_cot_callback(self.sensor_node_id, self.opid, uid=ssid, remarks=f'Wifi AP vendor "{vendor}" Detected: SSID={ssid} MAC={mac} FREQ={freq}MHz POWER={power}dBm', lat=True, lon=True, alt=True, time=True, type='a-h-G-E-S', logger=self.logger) + # create artifact for SSID + art_fname = self.artifact_manager.get_filename_for_artifact(self.opid, '.json') + with open(art_fname, 'w') as f: + json.dump(self.ssids[ssid], f, indent=4) + artifact_id = self.create_artifact(art_fname, f'Wifi SSID={ssid}', 'json') + self.artifacts[ssid] = artifact_id else: - self.logger.debug(f"AP with MAC {mac} already reported in this scan cycle.") + update = False + if emtype == 'ap' and mac not in [entry['mac'] for entry in self.ssids[ssid]['aps']]: + self.ssids[ssid]['aps'].append({'mac': mac, 'freq': freq, 'power': power, 'vendor': vendor}) + update = True + elif emtype == 'station' and mac not in [entry['mac'] for entry in self.ssids[ssid]['stations']]: + self.ssids[ssid]['stations'].append({'mac': mac, 'freq': freq, 'power': power, 'vendor': vendor}) + update = True + + if update: + # update artifact for SSID + artid = self.artifacts.get(ssid) + if artid is not None: + artifact = self.artifact_manager.get_artifact(artid) + if artifact is None: + self.logger.warning(f"Artifact with ID {artid} not found for SSID {ssid}.") + + else: + art_fname = artifact.file_path + with open(art_fname, 'w') as f: + json.dump(self.ssids[ssid], f, indent=4) + flag = self.update_artifact(artid) + if not flag: + self.logger.warning(f"Failed to update artifact for SSID {ssid}.") + else: + self.logger.warning(f"No artifact found for SSID {ssid} to update.") + + self.logger.debug(f"Updating SSID document: SSID={ssid} MAC={mac}") if __name__ == "__main__": """Run the plugin script as a standalone program for testing purposes. """ from fissure.utils.plugins.test_operation import run_test + dev = choose_interface() run_test( OperationMain, { - 'dev': 'wlx00c0cab5f8c9', + 'dev': dev, 'duration': -1, 'dwell': 0.5, 'power': -100, 'channels': list(range(1,10)) + [124,128,140] }, { - 'dev': 'wlx00c0cab5f8c9' + 'dev': dev } ) \ No newline at end of file diff --git a/YAML/User Configs/default.yaml b/YAML/User Configs/default.yaml index e1c46fc5..a12f327a 100644 --- a/YAML/User Configs/default.yaml +++ b/YAML/User Configs/default.yaml @@ -178,7 +178,8 @@ startup_automation_mode: Manual tak: cert: /home/user/Installed_by_FISSURE/takserver-docker-5.3-RELEASE-24/tak/certs/files/takserver.pem connect_mode: disabled # auto/manual/disabled - ip_addr: localhost + ip_addr: localhost # tak server IP for local communication + external_ip: 169.254.152.101 # tak server external IP for clients to connect key: /home/user/Installed_by_FISSURE/takserver-docker-5.3-RELEASE-24/tak/certs/files/takserver.key port: 8089 tak_on_startup: 'False' diff --git a/fissure/Sensor_Node/SensorNode.py b/fissure/Sensor_Node/SensorNode.py index fc9125c5..decaa256 100644 --- a/fissure/Sensor_Node/SensorNode.py +++ b/fissure/Sensor_Node/SensorNode.py @@ -29,6 +29,7 @@ import fissure.comms import fissure.utils from fissure.utils import PLUGIN_DIR +from fissure.utils.artifacts import ArtifactManager import uuid import logging @@ -293,8 +294,22 @@ def __init__(self, local_flag): self.gps_position['latitude'], self.gps_position['longitude'] ) - self.operations = {} + self.operations = {} # operation tracking dictionary + # initialize artifact manager + self.artifact_manager = ArtifactManager(logger=self.logger) + + # Store reference to original create_artifact method + self._original_create_artifact = self.artifact_manager.create_artifact + + # overload artifact manager create artifact to notify hiprfisr + def create_artifact_wrapper(source_id: str, operation_id: str, file_path: str, name: str, artifact_type: str, metadata: Union[Dict[str, Any], None] = None) -> str: + # Call original synchronous method + artifact_id = self._original_create_artifact(self.uuid, operation_id, file_path, name, artifact_type, metadata) + # Schedule async notification in background + asyncio.create_task(self._notify_hiprfisr_of_artifact(artifact_id)) + return artifact_id + self.artifact_manager.create_artifact = create_artifact_wrapper async def initialize_comms(self): if self.network_type == "IP": @@ -590,11 +605,6 @@ async def send_tak_cot(self, msg: dict) -> None: }, } - await self.hiprfisr_socket.send_msg( - fissure.comms.MessageTypes.COMMANDS, msg_out - ) - return - # -------------------------------------------------- # MESHTASTIC MODE → legacy LT list format # -------------------------------------------------- @@ -624,11 +634,6 @@ async def send_tak_cot(self, msg: dict) -> None: fissure.comms.MessageFields.PARAMETERS: PARAMETERS, } - await self.hiprfisr_socket.send_msg( - fissure.comms.MessageTypes.COMMANDS, msg_out - ) - return - # -------------------------------------------------- # Unknown network type # -------------------------------------------------- @@ -636,6 +641,34 @@ async def send_tak_cot(self, msg: dict) -> None: self.logger.error(f"Unknown network type for TAK: {self.network_type}") return + await self.hiprfisr_socket.send_msg( + fissure.comms.MessageTypes.COMMANDS, msg_out + ) + + + async def _notify_hiprfisr_of_artifact(self, artifact_id: str) -> None: + """Notify HIPRFISR of a new artifact (async helper method). + + Parameters + ---------- + artifact_id : str + The artifact ID to notify about + """ + # notify hiprfisr of new artifact + artifact = self.artifact_manager.get_artifact(artifact_id) + if artifact is None: + self.logger.error(f"Failed to retrieve newly created artifact {artifact_id} for notification.") + return + PARAMETERS = { + "artifact": artifact.to_dict() + } + msg = { + fissure.comms.MessageFields.IDENTIFIER: self.identifier, + fissure.comms.MessageFields.MESSAGE_NAME: "updateArtifact", + fissure.comms.MessageFields.PARAMETERS: PARAMETERS + } + await self.hiprfisr_socket.send_msg(fissure.comms.MessageTypes.COMMANDS, msg) + @@ -738,11 +771,19 @@ async def run_plugin_operation( parameters["sensor_node_id"] = sensor_node_id parameters["alert_callback"] = self.send_alert parameters["tak_cot_callback"] = self.send_tak_cot + parameters["artifact_manager"] = self.artifact_manager parameters["logger"] = self.logger # Initialize the operation class instance try: - operation_inst = operation_main(**parameters) + # Get the init signature to check for supported parameters + init_signature = inspect.signature(operation_main.__init__) + init_params = set(init_signature.parameters.keys()) + + # Filter parameters to only include those accepted by the class + filtered_parameters = {k: v for k, v in parameters.items() if k in init_params} + + operation_inst = operation_main(**filtered_parameters) except Exception as e: tb_str = traceback.format_exc() self.logger.error(f"Error initializing operation class from {plugin_script_path}: {e}\n{tb_str}") diff --git a/fissure/Server/HiprFisr.py b/fissure/Server/HiprFisr.py index 7a6a5f83..d67e97d0 100644 --- a/fissure/Server/HiprFisr.py +++ b/fissure/Server/HiprFisr.py @@ -24,6 +24,7 @@ import pytak from fissure.utils.tak_server import load_config as load_tak_config from fissure.utils.tak_server import TakReceiver +from fissure.utils.artifacts import ArtifactTracker HEARTBEAT_LOOP_DELAY = 0.1 # Seconds EVENT_LOOP_DELAY = 0.1 @@ -163,6 +164,9 @@ def __init__(self, address: fissure.comms.Address): self.register_callbacks(fissure.callbacks.HiprFisrCallbacks) self.register_callbacks(fissure.callbacks.HiprFisrCallbacksLT) + # Initialize artifact tracker + self.artifact_tracker = ArtifactTracker(logger=self.logger) + self.logger.info("=== READY ===") self.logger.info(f"Server listening @ {listen_addr}") diff --git a/fissure/callbacks/HiprFisrCallbacks.py b/fissure/callbacks/HiprFisrCallbacks.py index 1ddde4a9..5bef5598 100644 --- a/fissure/callbacks/HiprFisrCallbacks.py +++ b/fissure/callbacks/HiprFisrCallbacks.py @@ -2,9 +2,10 @@ # from fissure.comms.constants import * # from fissure_libutils import * import pytak -from typing import List +from typing import List, Optional import xml.etree.ElementTree as ET +import base64 import binascii import fissure.comms import fissure.utils @@ -12,6 +13,8 @@ from fissure.utils.common import PLUGIN_DIR from fissure.utils import plugin from fissure.utils import plugin_editor +from fissure.utils.artifacts import ArtifactTracker +from fissure.utils.tak_messages import create_artifact_data_package import os import time import yaml @@ -4742,4 +4745,147 @@ async def pluginOperationStopped(component: object, sensor_node_id: int, operati fissure.comms.MessageFields.PARAMETERS: PARAMETERS, } if component.dashboard_connected: - await component.dashboard_socket.send_msg(fissure.comms.MessageTypes.COMMANDS, msg) \ No newline at end of file + await component.dashboard_socket.send_msg(fissure.comms.MessageTypes.COMMANDS, msg) + + +async def updateArtifact(component: object, artifact: dict) -> None: + """Handle New or Updated Artifact Event + + Parameters + ---------- + component : object + Component + artifact : dict + Artifact data + """ + artifact_id = artifact.get('id', None) + if artifact_id is None: + component.logger.error("Artifact missing 'id' field") + return + + source_id = artifact.get('source_id') + if not source_id == component.local_node_uuid: + # Remote artifact; handle file path + file_path = artifact.get('file_path') + checksum = artifact.get('checksum') + if os.path.exists(file_path) and checksum is not None: + # a file exists; get existing artifact to compare checksum + existing_artifact = component.artifact_tracker.get_artifact(artifact_id) + if existing_artifact is not None: + # existing artifact; compare checksums + if existing_artifact.get('checksum') == checksum: + # checksums match; use existing file path + existing_file_path = existing_artifact.get('file_path') + if os.path.exists(existing_file_path): + artifact.file_path = existing_file_path + else: + # checksums do not match; set file path to sensor URI + artifact.file_path = f"sensor-{source_id}://{artifact.file_path}" + else: + # new artifact that points to a local file that exists; set file path to sensor URI + # calculate checksum of local file to verify + checksum = fissure.utils.calculate_file_checksum(file_path) + if checksum == artifact.get('checksum'): + artifact.file_path = file_path + else: + artifact.file_path = f"sensor-{source_id}://{artifact.file_path}" + else: + # artifact file does not exist locally; set file path to sensor URI + artifact.file_path = f"sensor-{source_id}://{artifact.file_path}" + + # Update artifact tracker + component.artifact_tracker.update_artifact(artifact) + + component.logger.debug(f"Preparing to send TAK plugin names for TAK UID: {source_id}") + + name = artifact.get("name", None) + if name is None: + component.logger.error("Artifact missing 'name' field, cannot send metadata to TAK") + return + + timestamp = artifact.get("modified_at", None) + if timestamp is None: + component.logger.error("Artifact missing 'modified_at' field, cannot send metadata to TAK") + return + + artid = artifact.get("id", None) + if artid is None: + component.logger.error("Artifact missing 'id' field, cannot send metadata to TAK") + return + + event_uid = f"{source_id}-artifact_metadata-{int(time.time()*1000)}" + msg = { + "msg_type": "event", + "uid": event_uid, + "data": { + "event_type": "artifact_metadata", + "name": name, + "timestamp": timestamp, + "artid": artid + } + } + + await fissure.utils.tak_messages.send(component, msg) + + +async def transferArtifactRequest(component: object, artifact_id: str, destination: str, data: Optional[bytes]) -> None: + """Handle Artifact Transfer Request + + Parameters + ---------- + component : object + Component + artifact_id : str + Artifact ID + destination : str + Destination path, currently supported: 'tak', 'hiprfisr' + data : Optional[bytes] + File data if sent from source + """ + artifact_tracker: ArtifactTracker = component.artifact_tracker + + if data is not None: + # Received file data; save to local artifact path + if not artifact_tracker.save_data(artifact_id, data, compressed=True): + component.logger.error(f"Failed to save artifact data for artifact ID {artifact_id}") + return + else: + # No data received; determine if local data exists + data = artifact_tracker.get_data(artifact_id) + + if data is None: + # no local data; request transfer from source + node_uuid = artifact['source_id'] + PARAMETERS = { + "artifact_id": artifact_id, + "destination": destination, + "data": None + } + msg = { + fissure.comms.MessageFields.IDENTIFIER: component.identifier, + fissure.comms.MessageFields.MESSAGE_NAME: "transferArtifactRequest", + fissure.comms.MessageFields.PARAMETERS: PARAMETERS, + } + + # Resolve Identity + identity = component.nodes[node_uuid].get("identity", None) + if identity is None: + component.logger.error(f"Could not resolve identity for sensor node UUID {node_uuid}") + return + + # Send through ROUTER + await component.sensor_node_router.send_msg( + fissure.comms.MessageTypes.COMMANDS, + msg, + target_ids=[identity] + ) + + if data is not None: + # Send data to destination + if destination == 'tak': + # Send artifact via TAK + artifact = component.artifact_tracker.get_artifact(artifact_id) + await fissure.utils.tak_messages.send_artifact_event(component, artifact, data) + + elif destination == 'hiprfisr': + component.logger.info(f"Artifact {artifact_id} saved to hiprfisr") diff --git a/fissure/callbacks/SensorNodeCallbacks.py b/fissure/callbacks/SensorNodeCallbacks.py index cbd3c9e9..4cbb07ce 100644 --- a/fissure/callbacks/SensorNodeCallbacks.py +++ b/fissure/callbacks/SensorNodeCallbacks.py @@ -3,6 +3,8 @@ import fissure.utils import fissure.utils.hardware from fissure.utils import plugin +from fissure.utils.artifacts import ArtifactManager +import logging import os import shutil import subprocess @@ -15,6 +17,7 @@ import zmq from typing import List import re +from typing import Optional async def updateLoggingLevels(component: object, new_console_level="", new_file_level=""): @@ -1675,4 +1678,42 @@ async def iwconfigIP(component: object, sensor_node_id: str): fissure.comms.MessageFields.MESSAGE_NAME: "iwconfigIP_Return", fissure.comms.MessageFields.PARAMETERS: PARAMETERS, } - await component.hiprfisr_socket.send_msg(fissure.comms.MessageTypes.COMMANDS, msg) \ No newline at end of file + await component.hiprfisr_socket.send_msg(fissure.comms.MessageTypes.COMMANDS, msg) + + +async def transferArtifactRequest(component: object, artifact_id: str, destination: str, data: Optional[bytes]) -> None: + """ + Transfer Artifact Request + + Parameters + ---------- + component : object + Component + artifact_id : str + Artifact ID + destination : str + Transfer destination ('tak' or 'hiprfisr') + data : Optional[bytes] + Artifact data, currently unused + """ + logger: logging.Logger = component.logger # type: ignore + artifact_manager: ArtifactManager = component.artifact_manager # type: ignore + + data = artifact_manager.get_data(artifact_id, compress=True) + if data is None: + logger.error(f"Artifact data not found or could not be read: {artifact_id}") + return + + PARAMETERS = { + "artifact_id": artifact_id, + "destination": destination, + "data": data, + } + msg = { + fissure.comms.MessageFields.IDENTIFIER: component.identifier, + fissure.comms.MessageFields.MESSAGE_NAME: "transferArtifactRequest", + fissure.comms.MessageFields.PARAMETERS: PARAMETERS, + } + await component.hiprfisr_socket.send_msg( + fissure.comms.MessageTypes.COMMANDS, msg + ) diff --git a/fissure/utils/artifacts.py b/fissure/utils/artifacts.py new file mode 100644 index 00000000..fbacc40a --- /dev/null +++ b/fissure/utils/artifacts.py @@ -0,0 +1,773 @@ +#!/usr/bin/env python3 +"""Artifact Management for FISSURE Operations +""" +import json +import os +import uuid +import hashlib +from datetime import datetime +from dataclasses import dataclass, asdict +from typing import List, Optional, Dict, Any, Union, Tuple +import logging +import gzip +import re + +ARTIFACT_NODE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + "/artifacts_node" +ARTIFACT_SYSTEM_DIR = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) + "/artifacts_system" + + +def calculate_file_checksum(file_path: str) -> str: + """Calculate SHA256 checksum of a file. + + Parameters + ---------- + file_path : str + Path to the file + + Returns + ------- + str + SHA256 checksum of the file + """ + sha256_hash = hashlib.sha256() + with open(file_path, "rb") as f: + for byte_block in iter(lambda: f.read(4096), b""): + sha256_hash.update(byte_block) + return sha256_hash.hexdigest() + + +@dataclass +class Artifact: + """Represents an artifact created by an operation.""" + id: str # Unique ID for the artifact + source_id: str # ID of the source that created the artifact + operation_id: str # ID of the operation that created the artifact + name: str # Human-readable name for the artifact + file_path: str # Path to the artifact file + artifact_type: str # Type of artifact (e.g., "log", "data", "image") + file_size: int # Size of the artifact file in bytes + created_at: str # ISO formatted creation timestamp + modified_at: str # ISO formatted modification timestamp + metadata: Dict[str, Any] # Additional metadata for the artifact + checksum: str # SHA256 checksum of the artifact file + + def __post_init__(self): + """Validate that no fields are None.""" + for field_name, field_value in self.__dict__.items(): + if field_value is None: + raise ValueError(f"Field '{field_name}' cannot be None") + + def to_dict(self) -> Dict[str, Any]: + """Convert artifact to dictionary for JSON serialization.""" + return asdict(self) + + @classmethod + def from_dict(cls, data: Dict[str, Any], enforce_fields: bool = False) -> 'Artifact': + """Create artifact from dictionary. + + Parameters + ---------- + data : Dict[str, Any] + Dictionary representation of the artifact + enforce_fields : bool, optional + Whether to enforce presence of all fields, by default False + """ + if 'id' not in data or data['id'] is None: + if enforce_fields: + raise ValueError("Missing required field: id") + data['id'] = str(uuid.uuid4()) + + if 'source_id' not in data or data['source_id'] is None: + if enforce_fields: + raise ValueError("Missing required field: source_id") + data['source_id'] = "unknown_source" + + if 'operation_id' not in data or data['operation_id'] is None: + if enforce_fields: + raise ValueError("Missing required field: operation_id") + data['operation_id'] = "unknown_operation" + + if 'name' not in data or data['name'] is None: + if enforce_fields: + raise ValueError("Missing required field: name") + data['name'] = "unknown_artifact" + + if 'file_path' not in data or data['file_path'] is None: + if enforce_fields: + raise ValueError("Missing required field: file_path") + data['file_path'] = "" + + if 'artifact_type' not in data or data['artifact_type'] is None: + if enforce_fields: + raise ValueError("Missing required field: artifact_type") + data['artifact_type'] = "unknown_type" + + if 'file_size' not in data or data['file_size'] is None: + if enforce_fields: + raise ValueError("Missing required field: file_size") + if 'file_path' in data and os.path.exists(data['file_path']): + data['file_size'] = os.path.getsize(data['file_path']) + else: + data['file_size'] = 0 + + if 'created_at' not in data or data['created_at'] is None: + if enforce_fields: + raise ValueError("Missing required field: created_at") + data['created_at'] = datetime.now().isoformat() + + if 'modified_at' not in data or data['modified_at'] is None: + if enforce_fields: + raise ValueError("Missing required field: modified_at") + data['modified_at'] = data['created_at'] + + if 'metadata' not in data or data['metadata'] is None: + if enforce_fields: + raise ValueError("Missing required field: metadata") + data['metadata'] = {} + + if 'checksum' not in data or data['checksum'] is None: + if enforce_fields: + raise ValueError("Missing required field: checksum") + if 'file_path' in data and os.path.exists(data['file_path']): + data['checksum'] = calculate_file_checksum(data['file_path']) + else: + data['checksum'] = "" + + return cls(**data) + + +class ArtifactManager(object): + """Manages artifacts on the sensor node.""" + + def __init__(self, base_dir: str = ARTIFACT_NODE_DIR, logger: Union[logging.Logger, None] = None): + """Initialize the artifact manager. + + Parameters + ---------- + base_dir : Union[str, None], optional + Base directory for storing artifacts, defaults to ARTIFACT_NODE_DIR + logger : Union[logging.Logger, None], optional + Logger instance, defaults to None to use module logger + """ + self.base_dir = base_dir + os.makedirs(base_dir, exist_ok=True) + self.logger = logger or logging.getLogger(__name__) + self.index_file = os.path.join(base_dir, "index.json") + self._artifacts = self._load_index() + + def _load_index(self) -> Dict[str, Artifact]: + """Load the artifact index from disk. + + Returns + ------- + Dict[str, Artifact] + Mapping of artifact IDs to Artifact instances + """ + if not os.path.exists(self.index_file): + return {} + try: + with open(self.index_file, 'r') as f: + data = json.load(f) + return {aid: Artifact.from_dict(artifact_data) for aid, artifact_data in data.items()} + except Exception as e: + self.logger.error(f"Failed to load artifact index: {e}") + return {} + + def _save_index(self) -> None: + """Save the artifact index to disk.""" + try: + data = {aid: artifact.to_dict() for aid, artifact in self._artifacts.items()} + with open(self.index_file, 'w') as f: + json.dump(data, f, indent=2) + except Exception as e: + self.logger.error(f"Failed to save artifact index: {e}") + + def _calculate_checksum(self, file_path: str) -> str: + """Calculate SHA256 checksum of a file. + + Parameters + ---------- + file_path : str + Path to the file + + Returns + ------- + str + SHA256 checksum of the file + """ + try: + return calculate_file_checksum(file_path) + except Exception as e: + self.logger.error(f"Failed to calculate checksum for {file_path}: {e}") + return "" + + def _get_operation_dir(self, operation_id: str) -> str: + """Get the directory path for an operation's artifacts. + + Parameters + ---------- + operation_id : str + The operation ID + + Returns + ------- + str + The directory path for the operation's artifacts + """ + return os.path.join(self.base_dir, operation_id) + + def create_operation_dir(self, operation_id: str) -> Tuple[str, str]: + """Create directory for an operation's artifacts. + + Parameters + ---------- + operation_id : str + The operation ID + + Returns + ------- + str, str + The directory path for the operation's artifacts and the directory path for the operation's files + """ + op_dir = self._get_operation_dir(operation_id) + os.makedirs(op_dir, exist_ok=True) + file_dir = os.path.join(op_dir, "files") + os.makedirs(file_dir, exist_ok=True) + return op_dir, file_dir + + def get_filename_for_artifact(self, operation_id: str, ext: str) -> str: + """Generate a unique filename for an artifact file. + + Parameters + ---------- + operation_id : str + The operation ID + ext : str + The file extension + + Returns + ------- + str + Unique filename + """ + _, file_dir = self.create_operation_dir(operation_id) + return os.path.join(file_dir, str(uuid.uuid4()) + ext) + + def create_artifact(self, source_id: str, operation_id: str, file_path: str, name: str, artifact_type: str, metadata: Union[Dict[str, Any], None] = None) -> str: + """Create a new artifact record. + + Parameters + ---------- + source_id : str + ID of the source that created the artifact + operation_id : str + ID of the operation that created the artifact + file_path : str + Path to the artifact file + name : str + Human-readable name for the artifact + artifact_type : str + Type of artifact (e.g., "log", "data", "image") + metadata : Dict[str, Any], optional + Additional metadata for the artifact + + Returns + ------- + str + The artifact ID + """ + if not os.path.exists(file_path): + self.logger.error(f"Artifact file does not exist: {file_path}") + return "" + + # Check if file_path basename is a UUID4 + basename = os.path.basename(file_path) + basename_no_ext = os.path.splitext(basename)[0] + try: + parsed_uuid = uuid.UUID(basename_no_ext, version=4) + # Verify it's actually a UUID4 + if parsed_uuid.version != 4: + self.logger.debug(f"File basename '{basename_no_ext}' is a UUID but not version 4; will generate new UUID for artifact ID") + artifact_id = str(uuid.uuid4()) + else: + self.logger.info(f"File basename '{basename_no_ext}' is a valid UUID4 and will be used as artifact ID") + artifact_id = basename_no_ext + except ValueError: + self.logger.debug(f"File basename '{basename_no_ext}' is not a valid UUID4; will generate new UUID for artifact ID") + artifact_id = str(uuid.uuid4()) + + # Create operation directory if it doesn't exist + _ = self.create_operation_dir(operation_id) + + # Calculate file size and checksum + file_size = os.path.getsize(file_path) + checksum = self._calculate_checksum(file_path) + + # Create artifact record + created_at = datetime.now().isoformat() + artifact = Artifact( + id=artifact_id, + source_id=source_id, + operation_id=operation_id, + name=name, + file_path=file_path, + artifact_type=artifact_type, + created_at=created_at, + modified_at=created_at, + file_size=file_size, + checksum=checksum, + metadata=metadata or {} + ) + + # Store in index + self._artifacts[artifact_id] = artifact + self._save_index() + + self.logger.info(f"Created artifact {artifact_id}: {name} ({artifact_type})") + return artifact_id + + def get_artifact(self, artifact_id: str) -> Optional[Artifact]: + """Get an artifact by ID. + + Parameters + ---------- + artifact_id : str + The artifact ID + + Returns + ------- + Optional[Artifact] + The artifact or None if not found + """ + return self._artifacts.get(artifact_id) + + def get_artifacts_by_operation(self, operation_id: str) -> List[Artifact]: + """Get all artifacts for a specific operation. + + Parameters + ---------- + operation_id : str + The operation ID + + Returns + ------- + List[Artifact] + List of artifacts for the operation + """ + return [artifact for artifact in self._artifacts.values() + if artifact.operation_id == operation_id] + + def get_all_artifacts(self) -> List[Artifact]: + """Get all artifacts. + + Returns + ------- + List[Artifact] + List of all artifacts + """ + return list(self._artifacts.values()) + + def get_data(self, artifact_id: str, compress: bool = False) -> Optional[bytes]: + """Retrieve data from an artifact's file. + + Parameters + ---------- + artifact_id : str + The artifact ID + compress : bool, optional + Whether to compress the data after retrieval, by default False + + Returns + ------- + Optional[bytes] + The artifact data or None if not found/error + """ + artifact = self._artifacts.get(artifact_id) + if not artifact: + self.logger.error(f"Artifact not found: {artifact_id}") + return None + + if re.match(r'^sensor-[\w-]+://', artifact.file_path): + self.logger.error(f"Cannot retrieve data from remote artifact: {artifact.file_path}") + return None + elif not os.path.exists(artifact.file_path): + self.logger.error(f"Artifact file does not exist: {artifact.file_path}") + return None + + try: + with open(artifact.file_path, 'rb') as f: + data = f.read() + except Exception as e: + self.logger.error(f"Failed to read data from artifact {artifact_id}: {e}") + return None + + if compress: + data = gzip.compress(data) + + return data + + def update_artifact(self, artifact_id: str, file_path: Union[str, None] = None, metadata: Union[Dict[str, Any], None] = None) -> bool: + """Update an artifact's metadata and optionally its file. + + Parameters + ---------- + artifact_id : str + The artifact ID + file_path : Union[str, None], optional + New file path for the artifact, by default None + metadata : Union[Dict[str, Any], None], optional + New metadata to merge into existing metadata, by default None + + Returns + ------- + bool + True if updated successfully, False otherwise + """ + artifact = self._artifacts.get(artifact_id) + if not artifact: + self.logger.error(f"Artifact not found: {artifact_id}") + return False + + if file_path: + if not os.path.exists(file_path): + self.logger.error(f"New artifact file does not exist: {file_path}") + return False + artifact.file_path = file_path + else: + file_path = artifact.file_path + artifact.file_size = os.path.getsize(file_path) + artifact.checksum = self._calculate_checksum(file_path) + + if metadata: + artifact.metadata.update(metadata) + + artifact.modified_at = datetime.now().isoformat() + + self._save_index() + + self.logger.info(f"Updated artifact {artifact_id}: {artifact.name}") + return True + + def delete_artifact(self, artifact_id: str) -> bool: + """Delete an artifact and its file. + + Parameters + ---------- + artifact_id : str + The artifact ID + + Returns + ------- + bool + True if deleted successfully, False otherwise + """ + artifact = self._artifacts.get(artifact_id) + if not artifact: + self.logger.error(f"Artifact not found: {artifact_id}") + return False + + # Remove file if it exists + if os.path.exists(artifact.file_path): + try: + os.remove(artifact.file_path) + except Exception as e: + self.logger.error(f"Failed to delete artifact file {artifact.file_path}: {e}") + return False + + # Remove from index + del self._artifacts[artifact_id] + self._save_index() + + self.logger.info(f"Deleted artifact {artifact_id}: {artifact.name}") + return True + + def cleanup_operation(self, operation_id: str) -> int: + """Delete all artifacts for an operation. + + Parameters + ---------- + operation_id : str + The operation ID + + Returns + ------- + int + Number of artifacts deleted + """ + artifacts = self.get_artifacts_by_operation(operation_id) + deleted_count = 0 + + for artifact in artifacts: + if self.delete_artifact(artifact.id): + deleted_count += 1 + + # Remove operation directory if empty + op_dir = self._get_operation_dir(operation_id) + if os.path.exists(op_dir) and not os.listdir(op_dir): + try: + os.rmdir(op_dir) + except Exception as e: + self.logger.error(f"Failed to remove operation directory {op_dir}: {e}") + + return deleted_count + + +class ArtifactTracker(object): + """Tracks artifacts across the system.""" + def __init__(self, base_dir: str = ARTIFACT_SYSTEM_DIR, logger: Union[logging.Logger, None] = None): + """Initialize the artifact tracker. + + Parameters + ---------- + base_dir : Union[str, None], optional + Base directory for storing artifacts, defaults to ARTIFACT_SYSTEM_DIR + logger : Union[logging.Logger, None], optional + Logger instance, defaults to None to use module logger + """ + self.base_dir = base_dir + self.logger = logger or logging.getLogger(__name__) + self.index_file = os.path.join(base_dir, "index.json") + + # Ensure base directory exists + os.makedirs(base_dir, exist_ok=True) + + # Load existing index + self._artifacts = self._load_index() + + def _load_index(self) -> Dict[str, Artifact]: + """Load the artifact index from disk. + + Returns + ------- + Dict[str, Artifact] + Mapping of artifact IDs to Artifact instances + """ + if not os.path.exists(self.index_file): + return {} + try: + with open(self.index_file, 'r') as f: + data = json.load(f) + return {aid: Artifact.from_dict(artifact_data) for aid, artifact_data in data.items()} + except Exception as e: + self.logger.error(f"Failed to load artifact index: {e}") + return {} + + def _save_index(self) -> None: + """Save the artifact index to disk.""" + try: + data = {aid: artifact.to_dict() for aid, artifact in self._artifacts.items()} + with open(self.index_file, 'w') as f: + json.dump(data, f, indent=2) + except Exception as e: + self.logger.error(f"Failed to save artifact index: {e}") + + def sync_index(self, artifacts: List[Union[Artifact, dict]]) -> None: + """Update the artifact index with a provided list of artifacts. + + Parameters + ---------- + artifacts : List[Union[Artifact, dict]] + List of artifacts to sync + """ + for artifact in artifacts: + self.add_artifact(artifact, update_index=False) + self._save_index() + + def add_artifact(self, artifact: Union[Artifact, dict], update_index: bool = True) -> None: + """Add an artifact to the tracker. + + Parameters + ---------- + artifact : Union[Artifact, dict] + The artifact to add, either as an Artifact instance or a dictionary + update_index : bool, optional + Whether to update the index file after adding, by default True + """ + if isinstance(artifact, dict): + artifact = Artifact.from_dict(artifact) + if artifact.id in self._artifacts and self._artifacts[artifact.id].checksum == artifact.checksum: + self.logger.debug(f"Artifact {artifact.id} already exists with same checksum; skipping add") + return # No action needed for duplicate with same checksum + self.logger.debug(f"Adding artifact {artifact.id}: {artifact.name}") + self._artifacts[artifact.id] = artifact + if update_index: + self.logger.info(f"Artifact {artifact.id} added to tracker") + self._save_index() + + def get_artifact(self, artifact_id: str) -> Optional[Artifact]: + """Get an artifact by ID. + + Parameters + ---------- + artifact_id : str + The artifact ID + + Returns + ------- + Optional[Artifact] + The artifact or None if not found + """ + return self._artifacts.get(artifact_id) + + def update_artifact(self, artifact: Union[Artifact, dict]) -> bool: + """Update an artifact's metadata and optionally its file. + + Parameters + ---------- + artifact : Union[Artifact, dict] + The artifact to update, either as an Artifact instance or a dictionary + + Returns + ------- + bool + True if updated successfully, False otherwise + """ + if isinstance(artifact, dict): + artifact = Artifact.from_dict(artifact) + + existing_artifact = self._artifacts.get(artifact.id) + if not existing_artifact: + self.add_artifact(artifact) + return True + + existing_artifact.name = artifact.name + existing_artifact.artifact_type = artifact.artifact_type + existing_artifact.metadata.update(artifact.metadata) + if artifact.file_path: + existing_artifact.file_path = artifact.file_path + existing_artifact.file_size = artifact.file_size + existing_artifact.modified_at = artifact.modified_at + existing_artifact.checksum = artifact.checksum + + self._save_index() + self.logger.info(f"Updated artifact {artifact.id}: {artifact.name}") + return True + + def save_data(self, artifact_id: str, data: bytes, compressed: bool = False) -> bool: + """Save data to an artifact's file. + + Parameters + ---------- + artifact_id : str + The artifact ID + data : bytes + The data to write to the artifact file + compressed : bool, optional + Whether to compress the data before saving, by default False + + Returns + ------- + bool + True if saved successfully, False otherwise + """ + artifact = self._artifacts.get(artifact_id) + if not artifact: + self.logger.error(f"Artifact not found: {artifact_id}") + return False + + target_path = os.path.join(self.base_dir, artifact.source_id, artifact.operation_id, os.path.basename(artifact.file_path)) + + if compressed: + data = gzip.decompress(data) + + try: + with open(target_path, 'wb') as f: + f.write(data) + except Exception as e: + self.logger.error(f"Failed to save data to artifact {artifact_id}: {e}") + return False + + artifact.file_path = target_path + + file_size = os.path.getsize(target_path) + if file_size != artifact.file_size: + self.logger.warning(f"File size mismatch when saving data to artifact {artifact_id}: expected {artifact.file_size}, got {file_size}") + artifact.file_size = file_size + + checksum_calc = calculate_file_checksum(target_path) + if checksum_calc != artifact.checksum: + self.logger.warning(f"Checksum mismatch when saving data to artifact {artifact_id}: expected {artifact.checksum}, got {checksum_calc}") + artifact.checksum = checksum_calc + + self._artifacts[artifact_id] = artifact + self._save_index() + self.logger.info(f"Saved data to artifact {artifact_id}: {artifact.name}") + return True + + def get_data(self, artifact_id: str, compress: bool = False) -> Optional[bytes]: + """Retrieve data from an artifact's file. + + Parameters + ---------- + artifact_id : str + The artifact ID + compress : bool, optional + Whether to compress the data after retrieval, by default False + + Returns + ------- + Optional[bytes] + The artifact data or None if not found/error + """ + artifact = self._artifacts.get(artifact_id) + if not artifact: + self.logger.error(f"Artifact not found: {artifact_id}") + return None + + if re.match(r'^sensor-[\w-]+://', artifact.file_path): + self.logger.error(f"Cannot retrieve data from remote artifact: {artifact.file_path}") + return None + elif not os.path.exists(artifact.file_path): + self.logger.error(f"Artifact file does not exist: {artifact.file_path}") + return None + + try: + with open(artifact.file_path, 'rb') as f: + data = f.read() + except Exception as e: + self.logger.error(f"Failed to read data from artifact {artifact_id}: {e}") + return None + + if compress: + data = gzip.compress(data) + + return data + + def get_artifacts_source_id(self, source_id: str, sortby: Optional[str] = None) -> List[Artifact]: + """Get all artifacts created by a specific source. + + Parameters + ---------- + source_id : str + The source ID + sortby : Optional[str], optional + Metadata key to sort by, by default None + + Returns + ------- + List[Artifact] + List of artifacts created by the source + """ + artifacts = [artifact for artifact in self._artifacts.values() if artifact.source_id == source_id] + + if sortby is not None: + artifacts = sorted(artifacts, key=lambda x: x.__getattribute__(sortby)) + + return artifacts + +# Global artifact manager instance +_artifact_manager = None + +def get_artifact_manager() -> ArtifactManager: + """Get the global artifact manager instance. + + Returns + ------- + ArtifactManager + The global artifact manager instance + """ + global _artifact_manager + if _artifact_manager is None: + _artifact_manager = ArtifactManager() + return _artifact_manager + diff --git a/fissure/utils/plugins/operations.py b/fissure/utils/plugins/operations.py index 1c7cad59..e564c47d 100644 --- a/fissure/utils/plugins/operations.py +++ b/fissure/utils/plugins/operations.py @@ -9,11 +9,12 @@ import uuid import re import sys -from typing import Dict, Any, Union +from typing import Dict, Any, Union, Callable from fissure.Sensor_Node.utils.resources import Resource +from fissure.utils.artifacts import ArtifactManager, get_artifact_manager -_base_params = ['self', 'sensor_node_id', 'logger', 'alert_callback', 'tak_cot_callback'] +_base_params = ['self', 'sensor_node_id', 'logger', 'alert_callback', 'tak_cot_callback', 'artifact_manager'] async def send_alert(sensor_node_id: Union[int, str], opid: str, message: str, logger=logging.getLogger(__name__)) -> None: """Placeholder for alert callback if none is provided. @@ -105,7 +106,7 @@ async def wrapper(self) -> None: return return wrapper -def stop_decorator(func) -> None: +def stop_decorator(func) -> Callable: async def wrapper(self) -> None: self.logger.info(f"Stopping {self.__class__.__name__}...") self._stop = True @@ -116,7 +117,7 @@ async def wrapper(self) -> None: self.logger.info(f"{self.__class__.__name__} stopped.") return wrapper -def teardown_decorator(func) -> None: +def teardown_decorator(func) -> Callable: async def wrapper(self) -> None: self.logger.info(f"Tearing down operation environment for {self.__class__.__name__}...") @@ -182,7 +183,7 @@ def dec_init(self, *args, **kwargs): class Operation(object): """Base class for plugin operations. """ - def __init__(self, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = logging.getLogger(__name__), alert_callback: callable = None, tak_cot_callback: callable = None) -> None: + def __init__(self, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = logging.getLogger(__name__), alert_callback: Union[Callable, None] = None, tak_cot_callback: Union[Callable, None] = None, artifact_manager: Union[ArtifactManager, None] = None) -> None: """Initialize the Operation class. Parameters @@ -191,10 +192,12 @@ def __init__(self, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = The ID of the sensor node, by default 0 logger : logging.Logger, optional Logger instance for logging, by default logging.getLogger(__name__) - alert_callback : callable, optional - Callback function for alerts, by default None - tak_cot_callback : callable, optional - Callback function for TAK CoT messages, by default None + alert_callback : Union[Callable, None], optional + Callback function for alerts, by default None for logger-only alerts + tak_cot_callback : Union[Callable, None], optional + Callback function for TAK CoT messages, by default None for logger-only TAK CoT messages + artifact_manager : Union[ArtifactManager, None], optional + ArtifactManager instance for managing artifacts, by default None to use the global artifact manager """ # input parameters self.sensor_node_id = sensor_node_id @@ -205,6 +208,13 @@ def __init__(self, sensor_node_id: Union[int, str] = 0, logger: logging.Logger = tak_cot_callback = send_tak_cot self.alert_callback = alert_callback self.tak_cot_callback = tak_cot_callback + if artifact_manager is not None: + self.artifact_manager = artifact_manager + else: + self.artifact_manager = get_artifact_manager() + + # unique operation ID + self.opid = str(uuid.uuid4()) def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) @@ -348,7 +358,7 @@ def prepare_resources(self) -> None: self.logger.info(f"Resources defined: {resources}") self.resources = [ Resource( - pid=os.getpid(), + pid=str(os.getpid()), op_uuid=self.opid, type=res_info.get('type'), model=res_info.get('model'), @@ -404,6 +414,80 @@ async def teardown(self) -> None: """ return + def create_artifact(self, file_path: str, name: str, artifact_type: str, metadata: Union[dict, None] = None) -> str: + """Create an artifact record for a file generated by the operation. + + This method creates an artifact record in the system and returns the artifact ID. + + Parameters + ---------- + file_path : str + The file path of the artifact. + name : str + The name of the artifact. + artifact_type : str + The type of the artifact (e.g., "log", "data", "image"). + metadata : Union[dict, None], optional + Additional metadata for the artifact, by default None + + Returns + ------- + str + The artifact ID. + """ + try: + return self.artifact_manager.create_artifact( + source_id=str(self.sensor_node_id), + operation_id=self.opid, + file_path=file_path, + name=name, + artifact_type=artifact_type, + metadata=metadata or {} + ) + except Exception as e: + self.logger.error(f"Failed to create artifact: {e}") + return "" + + def update_artifact(self, artifact_id: str, file_path: Union[str, None] = None, metadata: Union[Dict[str, Any], None] = None) -> bool: + """Update the metadata of an existing artifact. + + Parameters + ---------- + artifact_id : str + The ID of the artifact to update. + file_path : str, optional + The new file path for the artifact. + metadata : dict, optional + The metadata to update. + + Returns + ------- + bool + True if the update was successful, False otherwise. + """ + if not self.artifact_manager: + self.logger.error("Artifact manager is not initialized.") + return False + + artifact = self.artifact_manager.get_artifact(artifact_id) + if artifact is None: + self.logger.error(f"Artifact {artifact_id} not found.") + return False + + if artifact.operation_id != self.opid: + self.logger.error(f"Artifact {artifact_id} does not belong to operation {self.opid}.") + return False + + try: + return self.artifact_manager.update_artifact( + artifact_id=artifact_id, + file_path=file_path, + metadata=metadata + ) + except Exception as e: + self.logger.error(f"Failed to update artifact {artifact_id}: {e}") + return False + def main(*args, **kwargs) -> object: """Create an instance of the operation class. diff --git a/fissure/utils/tak_messages.py b/fissure/utils/tak_messages.py index a057a2fa..acbf6b70 100644 --- a/fissure/utils/tak_messages.py +++ b/fissure/utils/tak_messages.py @@ -1,11 +1,21 @@ #!/usr/bin/env python3 -import xml.etree.ElementTree as ET +import aiohttp +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.serialization import pkcs12 +import datetime +import hashlib +import os import pytak -from datetime import datetime, timezone -from fissure.utils.common import extractFrequencyFromUID -from fissure.utils.library import classifyFrequencyFromTextDirect - +import ssl +import tempfile +from typing import Union, Tuple +import xml.etree.ElementTree as ET +import xml.dom.minidom +import zipfile +from fissure.utils.artifacts import Artifact +from fissure.utils.common import extractFrequencyFromUID, get_fissure_config +from fissure.utils.library import classifyFrequencyFromTextDirect # --------------------------------------------------------- # Base COT builder @@ -20,8 +30,16 @@ def _build_base_event(uid: str, stale: int): """ msg = pytak.gen_cot_xml(uid=uid, stale=stale) - # Normalize in case pytak returned a string - if not isinstance(msg, ET.Element): + # Normalize in case pytak returned a string or None + if msg is None: + # Create minimal CoT structure if pytak fails + msg = ET.Element("event") + msg.set("version", "2.0") + msg.set("uid", uid) + msg.set("time", "2025-01-01T00:00:00.000Z") + msg.set("start", "2025-01-01T00:00:00.000Z") + msg.set("stale", f"2025-01-01T{stale:02d}:00:00.000Z") + elif not isinstance(msg, ET.Element): msg = ET.fromstring(msg) detail = msg.find("detail") @@ -271,4 +289,316 @@ def _serialize_payload(parent, data, skip_keys=None): return # SCALAR VALUE (fallback) - parent.text = str(data) \ No newline at end of file + parent.text = str(data) + + +def _format_xml_pretty(element: ET.Element) -> str: + """Format XML element with proper indentation + + Parameters + ---------- + element : xml.etree.ElementTree.Element + XML element to format + """ + rough_string = ET.tostring(element, encoding='UTF-8', xml_declaration=True) + reparsed = xml.dom.minidom.parseString(rough_string) + return reparsed.toprettyxml(indent=" ", encoding='UTF-8').decode('UTF-8') + + +def create_artifact_data_package(artifact: Union[Artifact, dict], file_data: bytes) -> Tuple[bytes, str]: + """Create TAK-compatible data package for artifact + + Parameters + ---------- + artifact : Union[Artifact, dict] + Artifact object or dict + file_data : bytes + Raw bytes of the artifact file + + Returns + ------- + Tuple[bytes, str] + ZIP data as bytes and the artifact filename + """ + if isinstance(artifact, dict): + artifact = Artifact.from_dict(artifact) + + # Generate a proper package name + package_name = f"DP-{artifact.name[:20].upper().replace(' ', '_')}" + package_uid = artifact.id + + # Create subdirectory + subdir = artifact.id.replace('-', '').replace('_', '')[:32] + if not subdir: + subdir = "artifacts" + + # Create temporary ZIP file + with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as temp_zip: + with zipfile.ZipFile(temp_zip.name, 'w', zipfile.ZIP_DEFLATED) as zipf: + # Add the artifact file in subdirectory structure + artifact_filename = os.path.basename(artifact.file_path) + # Ensure we have a valid filename + if not artifact_filename: + artifact_filename = f"{artifact.name}.bin" + + zip_entry_path = f"{subdir}/{artifact_filename}" + zipf.writestr(zip_entry_path, file_data) + + # Create simple MANIFEST.xml + manifest = ET.Element("MissionPackageManifest", version="2") + + # Configuration - name first, then uid + config = ET.SubElement(manifest, "Configuration") + ET.SubElement(config, "Parameter", name="name", value=package_name) + ET.SubElement(config, "Parameter", name="uid", value=package_uid) + + # Contents section + contents = ET.SubElement(manifest, "Contents") + ET.SubElement(contents, "Content", + zipEntry=zip_entry_path, + ignore="false") + + # Format XML + manifest_xml = _format_xml_pretty(manifest) + + # Save manifest in MANIFEST folder + zipf.writestr("MANIFEST/manifest.xml", manifest_xml.encode('UTF-8')) + + # Read the ZIP data + with open(temp_zip.name, "rb") as f: + zip_data = f.read() + + # Clean up temp file + os.unlink(temp_zip.name) + + return zip_data, artifact_filename + + +async def send_artifact_event(component: object, artifact: Union[Artifact, dict], artifact_data: bytes) -> None: + """Send artifact event to TAK clients + + Parameters + ---------- + component : object + FISSURE component with TAK clitool + artifact : Union[Artifact, dict] + Artifact object or dict + artifact_data : bytes + Raw bytes of the artifact file + """ + if isinstance(artifact, dict): + artifact = Artifact.from_dict(artifact) + + artifact_id = artifact.id + tak_data, _ = create_artifact_data_package(artifact, artifact_data) + + # Calculate SHA256 checksum + sha256_hash = hashlib.sha256(tak_data).hexdigest() + + # Build CoT message manually for fileshare + msg, detail = _build_base_event( + uid=f"FISSURE-DP-{artifact_id}", # More descriptive UID + stale=300 # 5 minute stale time (longer for file transfers) + ) + + # Set proper CoT type for data package - WinTAK uses b-f-t-r for file transfers + msg.set("type", "b-f-t-r") # File transfer request + msg.set("version", "2.0") + + # Set proper time attributes + now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ") + msg.set("time", now) + msg.set("start", now) + stale_time = (datetime.datetime.utcnow() + datetime.timedelta(minutes=5)).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + msg.set("stale", stale_time) + + # Upload the data package to TAK server first + package_filename = f"DP-{artifact.name[:20].upper().replace(' ', '_')}.zip" + + # Try to upload to TAK server + sender_url = await upload_data_package_to_tak_server(tak_data, sha256_hash, package_filename, component) + + if not sender_url: + component.logger.error("Failed to upload data package to TAK server. Artifact event not sent.") # type: ignore + return + + # Create fileshare element directly under detail (TAK standard) + fileshare = ET.SubElement(detail, "fileshare") + + # Add attributes + fileshare.set("filename", package_filename) + fileshare.set("senderUrl", sender_url) + fileshare.set("sizeInBytes", str(len(tak_data))) + fileshare.set("sha256", sha256_hash) + fileshare.set("senderUid", f"FISSURE-{artifact.source_id}") + fileshare.set("senderCallsign", f"FISSURE-{artifact.source_id[:8]}") + fileshare.set("name", f"DP-{artifact.name[:20].upper().replace(' ', '_')}") + + # Add ackrequest element + ackrequest = ET.SubElement(detail, "ackrequest") + ackrequest.set("uid", f"ack-{artifact_id[:8]}") + ackrequest.set("ackrequested", "true") + ackrequest.set("tag", f"DP-{artifact.name[:20].upper().replace(' ', '_')}") + + # Set point coordinates (0,0 with high uncertainty) + point = msg.find("point") + if point is None: + point = ET.SubElement(msg, "point") + point.set("lat", "0.0") + point.set("lon", "0.0") + point.set("hae", "0.0") + point.set("ce", "9999999.0") + point.set("le", "9999999.0") + + return _send_to_tak(component, msg) + + +async def upload_data_package_to_tak_server(tak_data: bytes, sha256_hash: str, filename: str, component) -> Union[str, None]: + """Upload data package to TAK server sync endpoint + + Parameters + ---------- + tak_data : bytes + Raw bytes of the TAK data package (ZIP) + sha256_hash : str + SHA256 hash of the data package + filename : str + Filename of the data package + component : object + FISSURE component with logger + """ + # Get TAK server configuration + fissure_config = get_fissure_config() + tak_config = fissure_config.get('tak', {}) + + tak_internal_ip = tak_config.get('ip_addr', 'localhost') + api_port = '8443' # Standard TAK server HTTPS API port + p12_cert_path = tak_config.get('webadmin_cert', '') + + # Get external/host IP address for clients to access + external_ip = tak_config.get('external_ip') + + if not external_ip: + component.logger.error("TAK external_ip not found in configuration") + return None + component.logger.info(f"TAK server - Internal IP: {tak_internal_ip}:{api_port}, External IP: {external_ip}") + + # Download URL for clients + download_url = f"https://{external_ip}:{api_port}/Marti/sync/content?hash={sha256_hash}" + + # Check if P12 certificate exists + if not p12_cert_path or not os.path.exists(p12_cert_path): + component.logger.error(f"Client certificate not found: {p12_cert_path} - TAK server upload requires client certificate authentication") + return None + + component.logger.info(f"Using client certificate: {p12_cert_path}") + + # Convert P12 certificate to PEM for aiohttp + cert_pem_path = None + key_pem_path = None + + try: + # Import cryptography if available + try: + # Load P12 certificate + with open(p12_cert_path, 'rb') as f: + p12_data = f.read() + + private_key, certificate, additional_certificates = pkcs12.load_key_and_certificates( + p12_data, b'atakatak' # Standard TAK password + ) + + if private_key and certificate: + # Convert to PEM format + key_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + ) + + cert_pem = certificate.public_bytes(serialization.Encoding.PEM) + + # Save to temporary files + with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.key') as key_file: + key_file.write(key_pem) + key_pem_path = key_file.name + + with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.crt') as cert_file: + cert_file.write(cert_pem) + cert_pem_path = cert_file.name + + component.logger.info("Successfully converted P12 to PEM format") + else: + raise Exception("Could not extract key/certificate from P12 file") + + except ImportError: + component.logger.error("cryptography library not available - cannot convert P12 certificate") + return download_url + + # Create SSL context with client certificate + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE # Accept self-signed certs + ssl_context.load_cert_chain(cert_pem_path, key_pem_path) + + component.logger.info("SSL context created with client certificate") + + # Prepare upload data + data = aiohttp.FormData() + data.add_field('assetfile', tak_data, filename=filename, content_type='application/zip') + + # Headers + headers = { + 'User-Agent': 'FISSURE-TAK-Client/1.0', + 'Accept': 'application/json, */*', + 'Connection': 'close' + } + + # Try upload to TAK server + upload_url = f"https://{tak_internal_ip}:{api_port}/Marti/sync/upload" + component.logger.info(f"Uploading to: {upload_url}") + + async with aiohttp.ClientSession( + connector=aiohttp.TCPConnector(ssl=ssl_context), + timeout=aiohttp.ClientTimeout(total=60) + ) as session: + + component.logger.info(f"Uploading {len(tak_data)} bytes with client certificate...") + + async with session.post(upload_url, data=data, headers=headers) as response: + component.logger.info(f"Upload response status: {response.status}") + + # Read response for debugging + try: + response_text = await response.text() + if response_text: + component.logger.info(f"Upload response: {response_text[:300]}") + except Exception as e: + component.logger.warning(f"Could not read response body: {e}") + + if response.status in [200, 201, 202]: + component.logger.info("Successfully uploaded data package to TAK server!") + return download_url + elif response.status == 409: + component.logger.info("Data package already exists on TAK server") + return download_url + elif response.status == 401: + component.logger.error("Authentication failed - check client certificate") + return download_url + elif response.status == 403: + component.logger.error("Forbidden - check TAK server permissions") + return download_url + else: + component.logger.error(f"Upload failed with HTTP {response.status}") + return download_url + + except Exception as e: + component.logger.error(f"Certificate upload failed: {e}") + return download_url + + finally: + # Clean up temporary certificate files + if cert_pem_path and os.path.exists(cert_pem_path): + os.unlink(cert_pem_path) + if key_pem_path and os.path.exists(key_pem_path): + os.unlink(key_pem_path) diff --git a/fissure/utils/tak_server.py b/fissure/utils/tak_server.py index fd05723f..53a98c68 100644 --- a/fissure/utils/tak_server.py +++ b/fissure/utils/tak_server.py @@ -197,6 +197,32 @@ async def handle_data(self, data: bytes) -> None: try: root = ET.fromstring(data_decode) + + # Deleting after testing Artifact Download +# uid = root.get("uid", "unknown") +# remarks = root.find(".//detail/remarks") +# if remarks is not None and remarks.text: +# if remarks.text == "FTN Requested: Plugin Names": +# self._logger.info(f"FTN Plugin Names Requested for UID: {uid}") +# await HiprFisrCallbacks.sendPluginNamesTak(self.hipfisr, uid, sensor_node_id=0) +# elif remarks.text.startswith("FTN Requested: Plugin Actions:"): +# plugin_name = remarks.text.split("FTN Requested: Plugin Actions:")[-1].strip() +# self._logger.info(f"FTN Plugin Action Names Requested: {plugin_name} for UID: {uid}") +# await HiprFisrCallbacks.sendPluginActionNamesTak(self.hipfisr, uid, plugin_name, sensor_node_id=0) +# elif remarks.text.startswith("FTN Requested: Plugin Action:"): +# remaining = remarks.text.split("FTN Requested: Plugin Action:")[-1].strip() +# plugin_name, action_name = remaining.split(":", 1) +# plugin_name = plugin_name.strip() +# action_name = action_name.strip() +# self._logger.info(f"FTN Plugin Action Requested: {action_name} for UID: {uid}") +# await HiprFisrCallbacks.sendPluginActionTak(self.hipfisr, uid, sensor_node_id=0, plugin_name=plugin_name, action_name=action_name, parameters={}) +# elif remarks.text.startswith("FTN Requested: Plugin Action Stop"): +# self._logger.info(f"FTN Plugin Stop Requested for UID: {uid}") +# await HiprFisrCallbacks.stop_all_plugin_operations(self.hipfisr, uid, sensor_node_id=0) +# elif remarks.text.startswith("FTN Requested: Artifact Download:"): +# artid = remarks.text.split("FTN Requested: Artifact Download:")[-1].strip() +# self._logger.info(f"FTN Artifact Download Requested for artifact id: {artid}") +# await HiprFisrCallbacks.transferArtifactRequest(self.hipfisr, artid, 'tak', None) # Event metadata event_uid = root.get("uid", "unknown") diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..13bc1da9 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +addopts = -v --tb=short \ No newline at end of file diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/test_artifacts.py b/tests/utils/test_artifacts.py new file mode 100644 index 00000000..c0fb63c9 --- /dev/null +++ b/tests/utils/test_artifacts.py @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +"""Tests for the FISSURE artifact management system.""" + +import pytest +import tempfile +import os +import uuid +import shutil +from unittest.mock import patch + +# Import the modules we're testing +from fissure.utils.artifacts import Artifact, ArtifactManager, get_artifact_manager + + +class TestArtifact: + """Test cases for the Artifact dataclass.""" + + def test_artifact_creation(self): + """Test creating an Artifact instance.""" + artifact = Artifact( + id="test-id", + source_id="source-123", + operation_id="op-123", + name="Test Artifact", + file_path="/tmp/test.log", + artifact_type="log", + created_at="2024-01-01T00:00:00", + file_size=1024, + metadata={"test": True}, + checksum="abc123", + modified_at="2024-01-01T01:00:00" + ) + + assert artifact.id == "test-id" + assert artifact.source_id == "source-123" + assert artifact.operation_id == "op-123" + assert artifact.name == "Test Artifact" + assert artifact.artifact_type == "log" + assert artifact.file_size == 1024 + assert artifact.metadata == {"test": True} + assert artifact.modified_at == "2024-01-01T01:00:00" + + def test_artifact_creation_without_modified_at(self): + """Test creating an Artifact instance with modified_at same as created_at.""" + artifact = Artifact( + id="test-id", + source_id="source-123", + operation_id="op-123", + name="Test Artifact", + file_path="/tmp/test.log", + artifact_type="log", + created_at="2024-01-01T00:00:00", + file_size=1024, + metadata={"test": True}, + checksum="abc123", + modified_at="2024-01-01T00:00:00" # Same as created_at + ) + + assert artifact.modified_at == artifact.created_at + + def test_artifact_to_dict(self): + """Test converting artifact to dictionary.""" + artifact = Artifact( + id="test-id", + source_id="source-123", + operation_id="op-123", + name="Test Artifact", + file_path="/tmp/test.log", + artifact_type="log", + created_at="2024-01-01T00:00:00", + file_size=1024, + metadata={"test": True}, + checksum="abc123", + modified_at="2024-01-01T01:00:00" + ) + + artifact_dict = artifact.to_dict() + + assert isinstance(artifact_dict, dict) + assert artifact_dict["id"] == "test-id" + assert artifact_dict["source_id"] == "source-123" + assert artifact_dict["operation_id"] == "op-123" + assert artifact_dict["metadata"] == {"test": True} + assert artifact_dict["modified_at"] == "2024-01-01T01:00:00" + + def test_artifact_from_dict(self): + """Test creating artifact from dictionary.""" + data = { + "id": "test-id", + "source_id": "source-123", + "operation_id": "op-123", + "name": "Test Artifact", + "file_path": "/tmp/test.log", + "artifact_type": "log", + "created_at": "2024-01-01T00:00:00", + "file_size": 1024, + "metadata": {"test": True}, + "checksum": "abc123", + "modified_at": "2024-01-01T01:00:00" + } + + artifact = Artifact.from_dict(data) + + assert artifact.id == "test-id" + assert artifact.source_id == "source-123" + assert artifact.operation_id == "op-123" + assert artifact.metadata == {"test": True} + assert artifact.modified_at == "2024-01-01T01:00:00" + + +class TestArtifactManager: + """Test cases for the ArtifactManager class.""" + + @pytest.fixture + def temp_dir(self): + """Create a temporary directory for testing.""" + temp_dir = tempfile.mkdtemp() + yield temp_dir + # Cleanup + shutil.rmtree(temp_dir, ignore_errors=True) + + @pytest.fixture + def artifact_manager(self, temp_dir): + """Create an ArtifactManager instance with temporary directory.""" + return ArtifactManager(base_dir=temp_dir) + + @pytest.fixture + def test_file(self, temp_dir): + """Create a test file.""" + test_file_path = os.path.join(temp_dir, "test_file.txt") + with open(test_file_path, 'w') as f: + f.write("This is a test file for artifact testing.") + return test_file_path + + def test_artifact_manager_init(self, temp_dir): + """Test ArtifactManager initialization.""" + am = ArtifactManager(base_dir=temp_dir) + + assert am.base_dir == temp_dir + assert os.path.exists(temp_dir) + assert isinstance(am._artifacts, dict) + + def test_artifact_manager_init_default_path(self): + """Test ArtifactManager initialization with default path.""" + with patch('os.path.dirname') as mock_dirname: + mock_dirname.return_value = "/mock/path" + with patch('os.makedirs'): + am = ArtifactManager() + expected_path = "/mock/path/artifacts" + assert expected_path in am.base_dir + + def test_create_operation_dir(self, artifact_manager, temp_dir): + """Test creating operation directory.""" + operation_id = "test-op-123" + + op_dir, file_dir = artifact_manager.create_operation_dir(operation_id) + + assert os.path.exists(op_dir) + assert os.path.exists(file_dir) + assert op_dir == os.path.join(temp_dir, operation_id) + assert file_dir == os.path.join(temp_dir, operation_id, "files") + + def test_get_filename_for_artifact(self, artifact_manager): + """Test generating filename for artifact.""" + operation_id = "test-op-123" + ext = ".log" + + filename = artifact_manager.get_filename_for_artifact(operation_id, ext) + + assert filename.endswith(ext) + assert operation_id in filename + assert "files" in filename + # Check that it's a UUID in the filename + basename = os.path.basename(filename) + uuid_part = os.path.splitext(basename)[0] + uuid.UUID(uuid_part) # This will raise if not a valid UUID + + def test_create_artifact_success(self, artifact_manager, test_file): + """Test creating an artifact successfully.""" + operation_id = "test-op-123" + + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Test Log", + artifact_type="log", + metadata={"source": "test"} + ) + + assert artifact_id != "" + assert uuid.UUID(artifact_id) # Valid UUID + + # Check artifact was stored + artifact = artifact_manager.get_artifact(artifact_id) + assert artifact is not None + assert artifact.name == "Test Log" + assert artifact.operation_id == operation_id + assert artifact.artifact_type == "log" + assert artifact.metadata == {"source": "test"} + assert artifact.file_size > 0 + assert artifact.checksum != "" + + def test_create_artifact_with_uuid_filename(self, artifact_manager, temp_dir): + """Test creating an artifact with UUID filename uses that UUID as artifact ID.""" + operation_id = "test-op-123" + test_uuid = str(uuid.uuid4()) + test_file_path = os.path.join(temp_dir, f"{test_uuid}.txt") + + with open(test_file_path, 'w') as f: + f.write("Test content") + + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file_path, + name="Test File", + artifact_type="data" + ) + + assert artifact_id == test_uuid + + def test_create_artifact_nonexistent_file(self, artifact_manager): + """Test creating artifact with nonexistent file.""" + operation_id = "test-op-123" + + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path="/nonexistent/file.txt", + name="Missing File", + artifact_type="log" + ) + + assert artifact_id == "" + + def test_get_artifact(self, artifact_manager, test_file): + """Test getting an artifact by ID.""" + operation_id = "test-op-123" + + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Test Log", + artifact_type="log" + ) + + artifact = artifact_manager.get_artifact(artifact_id) + + assert artifact is not None + assert artifact.id == artifact_id + assert artifact.name == "Test Log" + + def test_get_nonexistent_artifact(self, artifact_manager): + """Test getting a nonexistent artifact.""" + artifact = artifact_manager.get_artifact("nonexistent-id") + assert artifact is None + + def test_get_artifacts_by_operation(self, artifact_manager, test_file, temp_dir): + """Test getting artifacts by operation ID.""" + operation_id = "test-op-123" + other_operation_id = "other-op-456" + + # Create test files + test_file2 = os.path.join(temp_dir, "test_file2.txt") + with open(test_file2, 'w') as f: + f.write("Another test file") + + # Create artifacts for first operation + id1 = artifact_manager.create_artifact("test-source", operation_id, test_file, "File 1", "log") + id2 = artifact_manager.create_artifact("test-source", operation_id, test_file2, "File 2", "data") + + # Create artifact for second operation + id3 = artifact_manager.create_artifact("test-source", other_operation_id, test_file, "File 3", "log") + + artifacts = artifact_manager.get_artifacts_by_operation(operation_id) + + assert len(artifacts) == 2 + artifact_ids = [a.id for a in artifacts] + assert id1 in artifact_ids + assert id2 in artifact_ids + assert id3 not in artifact_ids + + def test_get_all_artifacts(self, artifact_manager, test_file): + """Test getting all artifacts.""" + operation_id1 = "test-op-123" + operation_id2 = "test-op-456" + + artifact_manager.create_artifact("test-source", operation_id1, test_file, "File 1", "log") + artifact_manager.create_artifact("test-source", operation_id2, test_file, "File 2", "data") + + all_artifacts = artifact_manager.get_all_artifacts() + + assert len(all_artifacts) == 2 + + def test_delete_artifact(self, artifact_manager, test_file): + """Test deleting an artifact.""" + operation_id = "test-op-123" + + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Test Log", + artifact_type="log" + ) + + # Verify artifact exists + assert artifact_manager.get_artifact(artifact_id) is not None + + # Delete artifact + result = artifact_manager.delete_artifact(artifact_id) + + assert result is True + assert artifact_manager.get_artifact(artifact_id) is None + assert not os.path.exists(test_file) # File should be deleted + + def test_delete_nonexistent_artifact(self, artifact_manager): + """Test deleting a nonexistent artifact.""" + result = artifact_manager.delete_artifact("nonexistent-id") + assert result is False + + def test_cleanup_operation(self, artifact_manager, test_file, temp_dir): + """Test cleaning up all artifacts for an operation.""" + operation_id = "test-op-123" + other_operation_id = "other-op-456" + + # Create additional test file + test_file2 = os.path.join(temp_dir, "test_file2.txt") + with open(test_file2, 'w') as f: + f.write("Another test file") + + # Create artifacts for target operation + artifact_manager.create_artifact("test-source", operation_id, test_file, "File 1", "log") + artifact_manager.create_artifact("test-source", operation_id, test_file2, "File 2", "data") + + # Create artifact for other operation + artifact_manager.create_artifact("test-source", other_operation_id, test_file, "File 3", "log") + + # Verify initial state + target_artifacts = artifact_manager.get_artifacts_by_operation(operation_id) + other_artifacts = artifact_manager.get_artifacts_by_operation(other_operation_id) + assert len(target_artifacts) == 2 + assert len(other_artifacts) == 1 + + # Cleanup target operation + deleted_count = artifact_manager.cleanup_operation(operation_id) + + assert deleted_count == 2 + assert len(artifact_manager.get_artifacts_by_operation(operation_id)) == 0 + assert len(artifact_manager.get_artifacts_by_operation(other_operation_id)) == 1 + + def test_index_persistence(self, temp_dir, test_file): + """Test that the artifact index persists across manager instances.""" + operation_id = "test-op-123" + + # Create artifact with first manager instance + am1 = ArtifactManager(base_dir=temp_dir) + artifact_id = am1.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Persistent Test", + artifact_type="log" + ) + + # Create second manager instance + am2 = ArtifactManager(base_dir=temp_dir) + + # Verify artifact is accessible from second instance + artifact = am2.get_artifact(artifact_id) + assert artifact is not None + assert artifact.name == "Persistent Test" + + def test_checksum_calculation(self, artifact_manager, temp_dir): + """Test checksum calculation.""" + test_content = "This is test content for checksum calculation." + test_file_path = os.path.join(temp_dir, "checksum_test.txt") + + with open(test_file_path, 'w') as f: + f.write(test_content) + + checksum = artifact_manager._calculate_checksum(test_file_path) + + assert checksum != "" + assert len(checksum) == 64 # SHA256 hex string length + + # Verify checksum is consistent + checksum2 = artifact_manager._calculate_checksum(test_file_path) + assert checksum == checksum2 + + def test_update_artifact_metadata_only(self, artifact_manager, test_file): + """Test updating artifact metadata without changing file.""" + operation_id = "test-op-123" + + # Create artifact + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Test Log", + artifact_type="log", + metadata={"version": 1, "source": "test"} + ) + + # Get original artifact + original_artifact = artifact_manager.get_artifact(artifact_id) + original_modified_at = original_artifact.modified_at + assert original_modified_at == original_artifact.created_at # Initially same + original_checksum = original_artifact.checksum + + # Update metadata + result = artifact_manager.update_artifact( + artifact_id=artifact_id, + metadata={"version": 2, "updated_by": "test_suite"} + ) + + assert result is True + + # Verify updates + updated_artifact = artifact_manager.get_artifact(artifact_id) + assert updated_artifact.modified_at != original_modified_at # Should be different now + assert updated_artifact.metadata["version"] == 2 + assert updated_artifact.metadata["source"] == "test" # Original metadata preserved + assert updated_artifact.metadata["updated_by"] == "test_suite" + assert updated_artifact.checksum == original_checksum # File unchanged + + def test_update_artifact_with_new_file(self, artifact_manager, test_file, temp_dir): + """Test updating artifact with a new file.""" + operation_id = "test-op-123" + + # Create artifact + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Test Log", + artifact_type="log" + ) + + # Create new file with different content + new_test_file = os.path.join(temp_dir, "new_test_file.txt") + with open(new_test_file, 'w') as f: + f.write("This is updated content for the test file.") + + # Get original artifact + original_artifact = artifact_manager.get_artifact(artifact_id) + original_size = original_artifact.file_size + original_checksum = original_artifact.checksum + + # Update with new file + result = artifact_manager.update_artifact( + artifact_id=artifact_id, + file_path=new_test_file, + metadata={"updated_reason": "new_content"} + ) + + assert result is True + + # Verify updates + updated_artifact = artifact_manager.get_artifact(artifact_id) + assert updated_artifact.modified_at is not None + assert updated_artifact.file_path == new_test_file + assert updated_artifact.file_size != original_size + assert updated_artifact.checksum != original_checksum + assert updated_artifact.metadata["updated_reason"] == "new_content" + + def test_update_nonexistent_artifact(self, artifact_manager): + """Test updating a nonexistent artifact.""" + result = artifact_manager.update_artifact( + artifact_id="nonexistent-id", + metadata={"test": "data"} + ) + assert result is False + + def test_update_artifact_with_nonexistent_file(self, artifact_manager, test_file): + """Test updating artifact with nonexistent new file.""" + operation_id = "test-op-123" + + # Create artifact + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Test Log", + artifact_type="log" + ) + + # Try to update with nonexistent file + result = artifact_manager.update_artifact( + artifact_id=artifact_id, + file_path="/nonexistent/file.txt" + ) + + assert result is False + + def test_create_artifact_has_modified_at_set_to_created_at(self, artifact_manager, test_file): + """Test that newly created artifacts have modified_at set to created_at.""" + operation_id = "test-op-123" + + artifact_id = artifact_manager.create_artifact( + source_id="test-source", + operation_id=operation_id, + file_path=test_file, + name="Test Log", + artifact_type="log" + ) + + artifact = artifact_manager.get_artifact(artifact_id) + assert artifact.modified_at == artifact.created_at + + +class TestGlobalArtifactManager: + """Test cases for the global artifact manager.""" + + def test_get_artifact_manager_singleton(self): + """Test that get_artifact_manager returns the same instance.""" + am1 = get_artifact_manager() + am2 = get_artifact_manager() + + assert am1 is am2 + + def test_get_artifact_manager_type(self): + """Test that get_artifact_manager returns ArtifactManager instance.""" + am = get_artifact_manager() + assert isinstance(am, ArtifactManager) + + +if __name__ == "__main__": + pytest.main([__file__]) \ No newline at end of file