diff --git a/show/memory_statistics.py b/show/memory_statistics.py new file mode 100644 index 0000000000..a727cc9231 --- /dev/null +++ b/show/memory_statistics.py @@ -0,0 +1,407 @@ +import sys +import socket +import json +import click +import syslog +import time +import os +from typing import Dict, Any, Union +from dataclasses import dataclass +from difflib import get_close_matches +from swsscommon.swsscommon import ConfigDBConnector + + +@dataclass +class Config: + SOCKET_PATH: str = '/var/run/dbus/memstats.socket' + SOCKET_TIMEOUT: int = 30 + BUFFER_SIZE: int = 8192 + MAX_RETRIES: int = 3 + RETRY_DELAY: float = 1.0 + DEFAULT_CONFIG = { + "enabled": "false", + "retention_period": "Unknown", + "sampling_interval": "Unknown" + } + + +class ConnectionError(Exception): + """Custom exception for connection-related errors.""" + pass + + +class Dict2Obj: + """Converts dictionaries or lists into objects with attribute-style access.""" + def __init__(self, d: Union[Dict[str, Any], list]) -> None: + if not isinstance(d, (dict, list)): + raise ValueError("Input should be a dictionary or a list") + + if isinstance(d, dict): + for key, value in d.items(): + if isinstance(value, (list, tuple)): + setattr( + self, + key, + [Dict2Obj(x) if isinstance(x, dict) else x for x in value], + ) + else: + setattr( + self, key, Dict2Obj(value) if isinstance(value, dict) else value + ) + elif isinstance(d, list): + self.items = [Dict2Obj(x) if isinstance(x, dict) else x for x in d] + + def to_dict(self) -> Dict[str, Any]: + """Converts the object back to a dictionary format.""" + result = {} + if hasattr(self, "items"): + return [x.to_dict() if isinstance(x, Dict2Obj) else x for x in self.items] + + for key in self.__dict__: + value = getattr(self, key) + if isinstance(value, Dict2Obj): + result[key] = value.to_dict() + elif isinstance(value, list): + result[key] = [v.to_dict() if isinstance(v, Dict2Obj) else v for v in value] + else: + result[key] = value + return result + + def __repr__(self) -> str: + """Provides a string representation of the object for debugging.""" + return f"<{self.__class__.__name__} {self.to_dict()}>" + + +class SonicDBConnector: + """Handles interactions with SONiC's configuration database with improved connection handling.""" + def __init__(self) -> None: + """Initialize the database connector with retry mechanism.""" + self.config_db = ConfigDBConnector() + self.connect_with_retry() + + def connect_with_retry(self, max_retries: int = 3, retry_delay: float = 1.0) -> None: + """ + Attempts to connect to the database with a retry mechanism. + + Args: + max_retries: Maximum number of connection attempts + retry_delay: Delay between retries in seconds + + Raises: + ConnectionError: If connection fails after all retries + """ + retries = 0 + last_error = None + + while retries < max_retries: + try: + self.config_db.connect() + syslog.syslog(syslog.LOG_INFO, "Successfully connected to SONiC config database") + return + except Exception as e: + last_error = e + retries += 1 + if retries < max_retries: + syslog.syslog(syslog.LOG_WARNING, + f"Failed to connect to database (attempt {retries}/{max_retries}): {str(e)}") + time.sleep(retry_delay) + + error_msg = ( + f"Failed to connect to SONiC config database after {max_retries} attempts. " + f"Last error: {str(last_error)}" + ) + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def get_memory_statistics_config(self) -> Dict[str, str]: + """ + Retrieves memory statistics configuration with error handling. + + Returns: + Dict containing configuration values or default config + + Raises: + RuntimeError: If there's an error retrieving the configuration + """ + try: + config = self.config_db.get_table('MEMORY_STATISTICS') + if not config or 'memory_statistics' not in config: + syslog.syslog(syslog.LOG_WARNING, + "Memory statistics configuration not found, using defaults") + return Config.DEFAULT_CONFIG + return config['memory_statistics'] + except Exception as e: + error_msg = f"Error retrieving memory statistics configuration: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise RuntimeError(error_msg) + + +class SocketManager: + """Manages Unix domain socket connections with improved reliability.""" + def __init__(self, socket_path: str = Config.SOCKET_PATH): + self.socket_path = socket_path + self.sock = None + self._validate_socket_path() + + def _validate_socket_path(self) -> None: + """Validates the socket path exists or can be created.""" + socket_dir = os.path.dirname(self.socket_path) + if not os.path.exists(socket_dir): + error_msg = f"Socket directory {socket_dir} does not exist" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def connect(self) -> None: + """Establishes socket connection with improved error handling.""" + retries = 0 + last_error = None + + while retries < Config.MAX_RETRIES: + try: + if self.sock: + self.close() + + self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + self.sock.settimeout(Config.SOCKET_TIMEOUT) + self.sock.connect(self.socket_path) + syslog.syslog(syslog.LOG_INFO, "Successfully connected to memory statistics service") + return + except socket.error as e: + last_error = e + retries += 1 + if retries < Config.MAX_RETRIES: + syslog.syslog(syslog.LOG_WARNING, + f"Failed to connect to socket (attempt {retries}/{Config.MAX_RETRIES}): {str(e)}") + time.sleep(Config.RETRY_DELAY) + self.close() + + error_msg = ( + f"Failed to connect to memory statistics service after {Config.MAX_RETRIES} " + f"attempts. Last error: {str(last_error)}. " + f"Please verify that the service is running and socket file exists at {self.socket_path}" + ) + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def receive_all(self) -> str: + """Receives all data with improved error handling.""" + if not self.sock: + raise ConnectionError("No active socket connection") + + chunks = [] + while True: + try: + chunk = self.sock.recv(Config.BUFFER_SIZE) + if not chunk: + break + chunks.append(chunk) + except socket.timeout: + error_msg = f"Socket operation timed out after {Config.SOCKET_TIMEOUT} seconds" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + except socket.error as e: + error_msg = f"Socket error during receive: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + return b''.join(chunks).decode('utf-8') + + def send(self, data: str) -> None: + """Sends data with improved error handling.""" + if not self.sock: + raise ConnectionError("No active socket connection") + + try: + self.sock.sendall(data.encode('utf-8')) + except socket.error as e: + error_msg = f"Failed to send data: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ConnectionError(error_msg) + + def close(self) -> None: + """Closes the socket connection safely.""" + if self.sock: + try: + self.sock.close() + except Exception as e: + syslog.syslog(syslog.LOG_WARNING, f"Error closing socket: {str(e)}") + finally: + self.sock = None + + +def send_data(command: str, data: Dict[str, Any], quiet: bool = False) -> Dict2Obj: + """Sends a command and data to the memory statistics service.""" + socket_manager = SocketManager() + + try: + socket_manager.connect() + request = {"command": command, "data": data} + socket_manager.sock.sendall(json.dumps(request).encode('utf-8')) + + response = socket_manager.receive_all() + if not response: + raise ConnectionError("No response received from memory statistics service") + + try: + jdata = json.loads(response) + except json.JSONDecodeError as e: + error_msg = f"Failed to parse server response: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise ValueError(error_msg) + + if not isinstance(jdata, dict): + raise ValueError("Invalid response format from server") + + response_obj = Dict2Obj(jdata) + if not getattr(response_obj, 'status', True): + error_msg = getattr(response_obj, 'msg', 'Unknown error occurred') + raise RuntimeError(error_msg) + + return response_obj + + except Exception as e: + if not quiet: + click.echo(f"Error: {str(e)}", err=True) + raise + finally: + socket_manager.close() + + +@click.group() +@click.pass_context +def cli(ctx: click.Context) -> None: + """Main entry point for the SONiC Memory Statistics CLI.""" + ctx.ensure_object(dict) + + +def validate_command(command: str, valid_commands: list) -> None: + """Validates the user's command input against a list of valid commands.""" + match = get_close_matches(command, valid_commands, n=1, cutoff=0.6) + if match: + error_msg = f"Error: Invalid command '{command}'. Did you mean '{match[0]}'?" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise click.UsageError(error_msg) + else: + error_msg = f"Error: Invalid command '{command}'." + syslog.syslog(syslog.LOG_ERR, error_msg) + raise click.UsageError(error_msg) + + +@click.group() +def show(): + """Show commands for memory statistics.""" + pass + + +@show.command(name="memory-stats") +@click.option( + '--from', 'from_time', + help='Start time for memory statistics (e.g., "15 hours ago", "7 days ago", "ISO Format")' +) +@click.option( + '--to', 'to_time', + help='End time for memory statistics (e.g., "now", "ISO Format")' +) +@click.option( + '--select', 'select_metric', + help='Show statistics for specific metric (e.g., total_memory, used_memory)' +) +@click.option( + '--config', 'show_config', is_flag=True, + help='Show memory statistics configuration' +) +@click.pass_context +def memory_stats(ctx: click.Context, from_time: str, to_time: str, select_metric: str, show_config: bool) -> None: + """Displays memory statistics or configuration.""" + try: + if show_config: + try: + db_connector = SonicDBConnector() + display_config(db_connector) + except Exception as e: + click.echo(f"Error initializing database connection: {str(e)}", err=True) + sys.exit(1) + else: + display_statistics(ctx, from_time, to_time, select_metric) + except Exception as e: + click.echo(f"Error: {str(e)}", err=True) + sys.exit(1) + + +def display_config(db_connector: SonicDBConnector) -> None: + """Displays memory statistics configuration.""" + try: + config = db_connector.get_memory_statistics_config() + enabled = format_field_value("enabled", config.get("enabled", "Unknown")) + retention = format_field_value("retention_period", config.get("retention_period", "Unknown")) + sampling = format_field_value("sampling_interval", config.get("sampling_interval", "Unknown")) + + click.echo(f"{'Configuration Field':<30}{'Value'}") + click.echo("-" * 50) + click.echo(f"{'Enabled':<30}{enabled}") + click.echo(f"{'Retention Time (days)':<30}{retention}") + click.echo(f"{'Sampling Interval (minutes)':<30}{sampling}") + except Exception as e: + error_msg = f"Failed to retrieve configuration: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise click.ClickException(error_msg) + + +def display_statistics(ctx: click.Context, from_time: str, to_time: str, select_metric: str) -> None: + """Retrieves and displays memory statistics.""" + request_data = { + "type": "system", + "metric_name": select_metric, + "from": from_time, + "to": to_time + } + + try: + response = send_data("memory_statistics_command_request_handler", request_data) + if isinstance(response, Dict2Obj): + clean_and_print(response.to_dict()) + else: + error_msg = f"Unexpected response type: {type(response)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise click.ClickException(error_msg) + except Exception as e: + error_msg = f"Failed to retrieve memory statistics: {str(e)}" + syslog.syslog(syslog.LOG_ERR, error_msg) + raise click.ClickException(error_msg) + + +def format_field_value(field_name: str, value: str) -> str: + """Formats configuration field values for display.""" + if field_name == "enabled": + return "True" if value.lower() == "true" else "False" + return value if value != "Unknown" else "Not configured" + + +def clean_and_print(data: Dict[str, Any]) -> None: + """Formats and prints memory statistics.""" + if isinstance(data, dict): + memory_stats = data.get("data", "") + cleaned_output = memory_stats.replace("\n", "\n").strip() + print(f"Memory Statistics:\n{cleaned_output}") + else: + error_msg = "Invalid data format received" + syslog.syslog(syslog.LOG_ERR, error_msg) + print(f"Error: {error_msg}") + + +def main(): + """Entry point for the CLI application.""" + cli.add_command(show) + cli() + + +if __name__ == '__main__': + valid_commands = ['show'] + user_input = sys.argv[1:] + if user_input: + command = user_input[0] + if command not in valid_commands: + error_msg = f"Error: Invalid command '{command}'." + syslog.syslog(syslog.LOG_ERR, error_msg) + raise click.UsageError(error_msg) + main() diff --git a/tests/memory_statistics_test.py b/tests/memory_statistics_test.py new file mode 100644 index 0000000000..29c1c3a18c --- /dev/null +++ b/tests/memory_statistics_test.py @@ -0,0 +1,408 @@ +import unittest +from unittest.mock import patch, MagicMock +import socket +import os +import click +from click.testing import CliRunner +import syslog +import pytest +import json +from unittest.mock import Mock +# from show.memory_statistics import cli +from show.memory_statistics import ( + Config, + Dict2Obj, + send_data, + SonicDBConnector, + SocketManager, + display_config, + format_field_value, + clean_and_print, + validate_command, +) + + +class TestDict2Obj(unittest.TestCase): + """Test cases for Dict2Obj class""" + def test_dict_conversion(self): + """Test dictionary to object conversion""" + test_dict = { + "name": "test", + "values": [1, 2, 3], + "nested": {"key": "value"} + } + obj = Dict2Obj(test_dict) + self.assertEqual(obj.name, "test") + self.assertEqual(obj.values, [1, 2, 3]) + self.assertEqual(obj.nested.key, "value") + + def test_list_conversion(self): + """Test list conversion""" + test_list = [{"a": 1}, {"b": 2}] + obj = Dict2Obj(test_list) + self.assertEqual(obj.items[0].a, 1) + self.assertEqual(obj.items[1].b, 2) + + def test_invalid_input(self): + """Test invalid input handling""" + with self.assertRaises(ValueError): + Dict2Obj("invalid") + + def test_to_dict_conversion(self): + """Test converting object back to dictionary""" + test_dict = { + "name": "test", + "nested": {"key": "value"}, + "list": [{"item": 1}, {"item": 2}] + } + obj = Dict2Obj(test_dict) + result = obj.to_dict() + self.assertEqual(result, test_dict) + + def test_repr_method(self): + """Test string representation of Dict2Obj""" + test_dict = {"name": "test"} + obj = Dict2Obj(test_dict) + expected_repr = "" + self.assertEqual(repr(obj), expected_repr) + + +class TestSonicDBConnector(unittest.TestCase): + def setUp(self): + self.mock_config_db = MagicMock() + self.patcher = patch('show.memory_statistics.ConfigDBConnector', + return_value=self.mock_config_db) + self.patcher.start() + + def tearDown(self): + self.patcher.stop() + + @patch('show.memory_statistics.ConfigDBConnector') + def test_get_memory_statistics_config(self, mock_config_db): + """Test retrieving memory statistics configuration""" + test_config = { + 'memory_statistics': { + 'enabled': 'true', + 'retention_period': '7', + 'sampling_interval': '1' + } + } + mock_config_db.return_value.get_table.return_value = test_config + connector = SonicDBConnector() + config = connector.get_memory_statistics_config() + self.assertEqual(config, test_config['memory_statistics']) + + @patch('show.memory_statistics.ConfigDBConnector') + def test_get_default_config(self, mock_config_db): + """Test retrieving default configuration when none exists""" + mock_config_db.return_value.get_table.return_value = {} + connector = SonicDBConnector() + config = connector.get_memory_statistics_config() + self.assertEqual(config, Config.DEFAULT_CONFIG) + + def test_successful_connection(self): + """Test successful database connection on first attempt""" + SonicDBConnector() + self.mock_config_db.connect.assert_called_once() + + def test_connection_retry_success(self): + """Test successful connection after initial failures""" + self.mock_config_db.connect.side_effect = [Exception("First try"), Exception("Second try"), None] + SonicDBConnector() + self.assertEqual(self.mock_config_db.connect.call_count, 3) + + def test_get_memory_statistics_config_success(self): + """Test successful retrieval of memory statistics configuration""" + test_config = { + 'memory_statistics': { + 'enabled': 'true', + 'retention_period': '7', + 'sampling_interval': '1' + } + } + connector = SonicDBConnector() + self.mock_config_db.get_table.return_value = test_config + config = connector.get_memory_statistics_config() + self.assertEqual(config, test_config['memory_statistics']) + + def test_get_memory_statistics_config_empty(self): + """Test handling of empty configuration""" + connector = SonicDBConnector() + self.mock_config_db.get_table.return_value = {} + config = connector.get_memory_statistics_config() + self.assertEqual(config, Config.DEFAULT_CONFIG) + + def test_get_memory_statistics_config_error(self): + """Test error handling in configuration retrieval""" + connector = SonicDBConnector() + self.mock_config_db.get_table.side_effect = Exception("Database error") + with self.assertRaises(RuntimeError) as context: + connector.get_memory_statistics_config() + self.assertIn("Error retrieving memory statistics configuration", str(context.exception)) + + +class TestSocketManager(unittest.TestCase): + """Test cases for SocketManager class""" + def setUp(self): + self.socket_path = '/tmp/test_socket' + self.socket_manager = SocketManager(self.socket_path) + + @patch('socket.socket') + def test_successful_connection(self, mock_socket): + """Test successful socket connection""" + mock_socket.return_value.connect.return_value = None + self.socket_manager.connect() + mock_socket.assert_called_with(socket.AF_UNIX, socket.SOCK_STREAM) + + @patch('socket.socket') + def test_connection_retry(self, mock_socket): + """Test connection retry mechanism""" + mock_socket.return_value.connect.side_effect = [ + socket.error("Connection failed"), + None + ] + self.socket_manager.connect() + self.assertEqual(mock_socket.return_value.connect.call_count, 2) + + @patch('os.path.exists') + def test_validate_socket_path_success(self, mock_exists): + """Test successful socket path validation""" + mock_exists.return_value = True + self.socket_manager._validate_socket_path() + mock_exists.assert_called_once_with(os.path.dirname(self.socket_path)) + + @patch('socket.socket') + def test_connect_success(self, mock_socket): + """Test successful socket connection""" + mock_sock = MagicMock() + mock_socket.return_value = mock_sock + self.socket_manager.connect() + mock_sock.settimeout.assert_called_with(Config.SOCKET_TIMEOUT) + mock_sock.connect.assert_called_with(self.socket_path) + + @patch('socket.socket') + def test_connect_retry_success(self, mock_socket): + """Test successful connection after retries""" + mock_sock = MagicMock() + mock_socket.return_value = mock_sock + mock_sock.connect.side_effect = [socket.error(), socket.error(), None] + self.socket_manager.connect() + self.assertEqual(mock_sock.connect.call_count, 3) + + @patch('socket.socket') + def test_receive_all_success(self, mock_socket): + """Test successful data reception""" + mock_sock = MagicMock() + mock_socket.return_value = mock_sock + mock_sock.recv.side_effect = [b'test', b'data', b''] + self.socket_manager.sock = mock_sock + result = self.socket_manager.receive_all() + self.assertEqual(result, 'testdata') + + def test_close_success(self): + """Test successful socket closure""" + mock_sock = MagicMock() + self.socket_manager.sock = mock_sock + self.socket_manager.close() + mock_sock.close.assert_called_once() + self.assertIsNone(self.socket_manager.sock) + + def test_close_with_error(self): + """Test socket closure with error""" + mock_sock = MagicMock() + mock_sock.close.side_effect = Exception("Close error") + self.socket_manager.sock = mock_sock + self.socket_manager.close() + self.assertIsNone(self.socket_manager.sock) + + @patch('socket.socket') + def test_send_data_success(self, mock_socket): + """Test successful data sending""" + mock_sock = MagicMock() + self.socket_manager.sock = mock_sock + test_data = "test message" + self.socket_manager.send(test_data) + mock_sock.sendall.assert_called_with(test_data.encode('utf-8')) + + def test_to_dict_method_with_list_of_dict2obj(self): + """Test to_dict method with a list of Dict2Obj instances.""" + class TestDict2Obj(Dict2Obj): + def to_dict(self): + return {"test_key": "test_value"} + + test_obj = Dict2Obj({"items": [TestDict2Obj({"key": "value"}), "plain_value"]}) + result = test_obj.to_dict() + assert result == [{"test_key": "test_value"}, "plain_value"] + + @patch('click.echo') + def test_display_config_exception(self, mock_echo): + """Test display_config function with database connection error.""" + mock_db_connector = Mock() + mock_db_connector.get_memory_statistics_config.side_effect = Exception("DB Error") + + with patch('syslog.syslog') as mock_syslog: + with pytest.raises(click.ClickException) as excinfo: + display_config(mock_db_connector) + + assert "Failed to retrieve configuration: DB Error" in str(excinfo.value) + mock_syslog.assert_called_once_with( + syslog.LOG_ERR, "Failed to retrieve configuration: DB Error" + ) + + +class TestCLICommands(unittest.TestCase): + """Test cases for CLI commands""" + def setUp(self): + self.runner = CliRunner() + self.ctx = click.Context(click.Command('test')) + + def test_validate_command_invalid_with_suggestion(self): + """Test command validation with invalid command but close match""" + valid_commands = ['show', 'config'] + with self.assertRaises(click.UsageError) as context: + validate_command('shw', valid_commands) + self.assertIn("Did you mean 'show'", str(context.exception)) + + def test_validate_command_invalid_no_suggestion(self): + """Test command validation with no close match""" + valid_commands = ['show', 'config'] + with self.assertRaises(click.UsageError): + validate_command('unknown', valid_commands) + + def test_format_field_value_valid(self): + """Test formatting field values.""" + # Call the function with the correct order of arguments + formatted_value = format_field_value("enabled", "true") + # Assert the formatted output + self.assertEqual(formatted_value, "True") + + formatted_value = format_field_value("enabled", "false") + self.assertEqual(formatted_value, "False") + + formatted_value = format_field_value("enabled", "unknown") + self.assertEqual(formatted_value, "False") + + formatted_value = format_field_value("some_field", "Unknown") + self.assertEqual(formatted_value, "Not configured") + + def test_clean_and_print_success(self): + """Test cleaning and printing of memory statistics.""" + with patch('builtins.print') as mock_print: + # Provide the required dictionary input + test_data = {"data": "Example memory statistics\nAnother line"} + clean_and_print(test_data) + # Verify the print output + mock_print.assert_called_once_with("Memory Statistics:\nExample memory statistics\nAnother line") + + +class TestSendData(unittest.TestCase): + """Test cases for the send_data function.""" + + @patch('show.memory_statistics.SocketManager') + def test_send_data_success(self, mock_socket_manager): + """Test successful execution of send_data.""" + mock_socket = MagicMock() + mock_socket_manager.return_value = mock_socket + mock_socket.receive_all.return_value = json.dumps({ + "status": True, + "data": {"key": "value"} + }) + + result = send_data("test_command", {"key": "value"}) + self.assertTrue(result.status) + self.assertEqual(result.data.key, "value") + + @patch('show.memory_statistics.SocketManager') + def test_send_data_no_response(self, mock_socket_manager): + """Test no response received from the server.""" + # Mock the SocketManager instance + mock_socket = MagicMock() + mock_socket_manager.return_value = mock_socket + + # Simulate no response from the server + mock_socket.receive_all.return_value = "" + + # Ensure a ConnectionError is raised with the expected message + with self.assertRaises(ConnectionError) as context: + send_data("test_command", {"key": "value"}) + self.assertIn("No response received from memory statistics service", str(context.exception)) + + @patch('show.memory_statistics.SocketManager') + def test_send_data_json_parse_error(self, mock_socket_manager): + """Test JSON parsing error from server response.""" + mock_socket = MagicMock() + mock_socket_manager.return_value = mock_socket + mock_socket.receive_all.return_value = "Invalid JSON" + + with patch('syslog.syslog') as mock_syslog: + with self.assertRaises(ValueError) as context: + send_data("test_command", {"key": "value"}) + self.assertIn("Failed to parse server response", str(context.exception)) + mock_syslog.assert_called_once() + + @patch('show.memory_statistics.SocketManager') + def test_send_data_invalid_response_format(self, mock_socket_manager): + """Test invalid response format (not a dictionary).""" + mock_socket = MagicMock() + mock_socket_manager.return_value = mock_socket + mock_socket.receive_all.return_value = json.dumps(["not", "a", "dict"]) + + with self.assertRaises(ValueError) as context: + send_data("test_command", {"key": "value"}) + self.assertIn("Invalid response format from server", str(context.exception)) + + @patch('show.memory_statistics.SocketManager') + def test_send_data_failure_status(self, mock_socket_manager): + """Test response with failure status.""" + mock_socket = MagicMock() + mock_socket_manager.return_value = mock_socket + mock_socket.receive_all.return_value = json.dumps({ + "status": False, + "msg": "Simulated error" + }) + + with self.assertRaises(RuntimeError) as context: + send_data("test_command", {"key": "value"}) + self.assertIn("Simulated error", str(context.exception)) + + @patch('show.memory_statistics.SocketManager') + @patch('click.echo') + def test_send_data_quiet_mode(self, mock_click_echo, mock_socket_manager): + """Test send_data in quiet mode, ensuring no output on error.""" + mock_socket = MagicMock() + mock_socket_manager.return_value = mock_socket + mock_socket.receive_all.return_value = "" + + with self.assertRaises(ConnectionError): + send_data("test_command", {"key": "value"}, quiet=True) + mock_click_echo.assert_not_called() + + # def test_main_valid_command(self): + # """Test main CLI with a valid command.""" + # runner = CliRunner() + + # # Mock sys.argv to simulate valid command input + # with patch("sys.argv", ["main", "show"]): + # # Pass the correct prog_name explicitly + # result = runner.invoke(main, ["show"], prog_name="main") + + # # Validate the result + # assert result.exit_code == 0, f"Unexpected exit code: {result.exit_code}. Output: {result.output}" + # assert "Expected output from show command" in result.output + + # def test_main_invalid_command(self): + # """Test main CLI with an invalid command.""" + # # Mock `sys.argv` to simulate an invalid command + # with patch("sys.argv", ["main", "invalid_command"]): + # with pytest.raises(click.UsageError) as exc_info: + # main() + + # # Verify the error message + # assert "Error: Invalid command 'invalid_command'." in str(exc_info.value) + + # @patch("show.memory_statistics.show.show_memory_statistics") + # def test_show(self, mock_show_memory_statistics): + # """Test 'show' command""" + # result = self.runner.invoke(show) + # self.assertEqual(result.exit_code, 0) + # mock_show_memory_statistics.assert_called_once()