From 170072a327e3b2793687bf1f301cb4c2467107b8 Mon Sep 17 00:00:00 2001 From: xumia Date: Thu, 6 Jul 2023 07:07:09 +0000 Subject: [PATCH] Add test cases --- scripts/hostcfgd | 29 +++--- tests/common/mock_configdb.py | 14 ++- tests/hostcfgd/hostcfgd_fips_test.py | 130 +++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 18 deletions(-) create mode 100644 tests/hostcfgd/hostcfgd_fips_test.py diff --git a/scripts/hostcfgd b/scripts/hostcfgd index 7d6634fb..442e801d 100644 --- a/scripts/hostcfgd +++ b/scripts/hostcfgd @@ -1801,8 +1801,8 @@ class FipsCfg(object): """ def __init__(self, state_db_conn): - self.enabled = False - self.enforced = False + self.enable = False + self.enforce = False self.restart_services = DEFAULT_FIPS_RESTART_SERVICES self.state_db_conn = state_db_conn @@ -1822,21 +1822,15 @@ class FipsCfg(object): syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped the FIPS config, the FIPS setting is empty.') return self.read_config() - mode = common_config.get('mode', '').lower() - self.enabled = False - self.enforced = False - if mode == 'enforce': - self.enabled = True - self.enforced = True - elif mode == 'noneenforce': - self.enabled = True + self.enforce = is_true(common_config.get('enforce', 'false')) + self.enable = self.enforce or is_true(common_config.get('enable', 'false')) self.update() def fips_handler(self, data): self.load(data) def update(self): - syslog.syslog(syslog.LOG_DEBUG, f'FipsCfg: update fips option enabled: {self.enabled}, enforced: {self.enforced}.') + syslog.syslog(syslog.LOG_DEBUG, f'FipsCfg: update fips option enable: {self.enable}, enforce: {self.enforce}.') self.update_enforce_config() self.update_noneenforce_config() self.state_db_conn.hset('FIPS_STATE|state', 'config_datetime', datetime.utcnow().isoformat()) @@ -1849,7 +1843,7 @@ class FipsCfg(object): cur_fips_enabled = f.read().strip() expected_fips_enabled = '0' - if self.enabled: + if self.enable: expected_fips_enabled = '1' # If the runtime config is not as expected, change the config @@ -1875,22 +1869,23 @@ class FipsCfg(object): # Restart the services required and in the running state output = run_cmd_output(['sudo', 'systemctl', '-t', 'service', '--state=running', '--no-pager', '-o', 'json']) - services = [s for s in json.dumps(output)] + services = [s['unit'] for s in json.loads(output)] for service in self.restart_services: if service in services or service + '.service' in services: + syslog.syslog(syslog.LOG_INFO, f'FipsCfg: restart service {service}.') run_cmd(['sudo', 'systemctl', 'restart', service]) def update_enforce_config(self): fips_state = run_cmd_output(['sudo', 'sonic-installer', 'get-fips']) next_enforced = 'enabled' in fips_state - if next_enforced != self.enforced: - syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped to configure the enforce option {self.enforced}, since the config has already been set.') + if next_enforced == self.enforce: + syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped to configure the enforce option {self.enforce}, since the config has already been set.') return fips_option = '--disable-fips' - if self.enforced: + if self.enforce: fips_option = '--enable-fips' - syslog.syslog(syslog.LOG_INFO, f'FipsCfg: update the FIPS enforce option {self.enforced}.') + syslog.syslog(syslog.LOG_INFO, f'FipsCfg: update the FIPS enforce option {self.enforce}.') run_cmd(['sudo', 'sonic-installer', 'set-fips', fips_option]) class HostConfigDaemon: diff --git a/tests/common/mock_configdb.py b/tests/common/mock_configdb.py index 7c7a7510..15abc00b 100644 --- a/tests/common/mock_configdb.py +++ b/tests/common/mock_configdb.py @@ -139,4 +139,16 @@ def pop(self): class MockDBConnector(): def __init__(self, db, val, tcpFlag=False, name=None): - pass + self.data = {} + + def hget(self, key, field): + if key not in self.data: + return None + if field not in self.data[key]: + return None + return self.data[key][field] + + def hset(self, key, field, value): + if key not in self.data: + self.data[key] = {} + self.data[key][field] = value diff --git a/tests/hostcfgd/hostcfgd_fips_test.py b/tests/hostcfgd/hostcfgd_fips_test.py new file mode 100644 index 00000000..7d6190df --- /dev/null +++ b/tests/hostcfgd/hostcfgd_fips_test.py @@ -0,0 +1,130 @@ +import importlib.machinery +import importlib.util +import filecmp +import json +import shutil +import os +import sys +from swsscommon import swsscommon + +from parameterized import parameterized +from unittest import TestCase, mock +from tests.common.mock_configdb import MockConfigDb, MockDBConnector +from sonic_py_common.general import getstatusoutput_noshell +from mock_open import MockOpen + + +test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +modules_path = os.path.dirname(test_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) + +# Load the file under test +hostcfgd_path = os.path.join(scripts_path, 'hostcfgd') +loader = importlib.machinery.SourceFileLoader('hostcfgd', hostcfgd_path) +spec = importlib.util.spec_from_loader(loader.name, loader) +hostcfgd = importlib.util.module_from_spec(spec) +loader.exec_module(hostcfgd) +sys.modules['hostcfgd'] = hostcfgd +original_syslog = hostcfgd.syslog + +# Mock swsscommon classes +hostcfgd.ConfigDBConnector = MockConfigDb +hostcfgd.DBConnector = MockDBConnector +hostcfgd.Table = mock.Mock() +running_services = [{"unit":"ssh.service","load":"loaded","active":"active","sub":"running","description":"OpenBSD Secure Shell server"}, + {"unit":"restapi.service","load":"loaded","active":"active","sub":"running","description":"SONiC Restful API Service"}] + +class TestHostcfgdFIPS(TestCase): + """ + Test hostcfd daemon - FIPS + """ + def run_diff(self, file1, file2): + _, output = getstatusoutput_noshell(['diff', '-uR', file1, file2]) + return output + + def setUp(self): + self._workPath =os.path.join('/tmp/test_fips/', self._testMethodName) + self.test_data = {'DEVICE_METADATA':{},'FIPS': {}} + self.test_data['DEVICE_METADATA'] = {'localhost': {'hostname': 'fips'}} + self.test_data['FIPS']['global'] = {'enable': 'false', 'enforce': 'false'} + hostcfgd.FIPS_CONFIG_FILE = os.path.join(self._workPath + 'eips.json') + hostcfgd.OPENSSL_FIPS_CONFIG_FILE = os.path.join(self._workPath, 'fips_enabled') + hostcfgd.PROC_CMDLINE = os.path.join(self._workPath, 'cmdline') + os.makedirs(self._workPath, exist_ok=True) + with open(hostcfgd.PROC_CMDLINE, 'w') as f: + f.write('swiotlb=65536 sonic_fips=0') + with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE, 'w') as f: + f.write('0') + + def tearDown(self): + shutil.rmtree(self._workPath, ignore_errors=True) + + def assert_fips_runtime_config(self, result='1'): + with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE) as f: + assert f.read() == result + + @mock.patch('syslog.syslog') + @mock.patch('subprocess.check_output', side_effect=['', json.dumps(running_services)]) + @mock.patch('subprocess.check_call') + def test_hostcfgd_fips_enable(self, mock_check_call, mock_check_output, mock_syslog): + with open(hostcfgd.PROC_CMDLINE, 'w') as f: + f.write('swiotlb=65536 sonic_fips=0') + self.test_data['FIPS']['global']['enable'] = 'true' + MockConfigDb.set_config_db(self.test_data) + host_config_daemon = hostcfgd.HostConfigDaemon() + host_config_daemon.fips_config_handler("FIPS", '', self.test_data) + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.') + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.') + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option False, since the config has already been set.') + mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.') + self.assert_fips_runtime_config() + + @mock.patch('syslog.syslog') + @mock.patch('subprocess.check_output', side_effect=['', json.dumps(running_services)]) + @mock.patch('subprocess.check_call') + def test_hostcfgd_fips_disable(self, mock_check_call, mock_check_output, mock_syslog): + with open(hostcfgd.PROC_CMDLINE, 'w') as f: + f.write('swiotlb=65536 sonic_fips=0') + with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE, 'w') as f: + f.write('1') + self.test_data['FIPS']['global']['enable'] = 'false' + MockConfigDb.set_config_db(self.test_data) + host_config_daemon = hostcfgd.HostConfigDaemon() + host_config_daemon.fips_config_handler("FIPS", '', self.test_data) + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.') + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.') + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option False, since the config has already been set.') + mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.') + self.assert_fips_runtime_config('0') + + @mock.patch('syslog.syslog') + @mock.patch('subprocess.check_output', side_effect=['FIPS is disabled', json.dumps(running_services)]) + @mock.patch('subprocess.check_call') + def test_hostcfgd_fips_enforce(self, mock_check_call, mock_check_output, mock_syslog): + with open(hostcfgd.PROC_CMDLINE, 'w') as f: + f.write('swiotlb=65536 sonic_fips=0') + self.test_data['FIPS']['global']['enforce'] = 'true' + MockConfigDb.set_config_db(self.test_data) + host_config_daemon = hostcfgd.HostConfigDaemon() + host_config_daemon.fips_config_handler("FIPS", '', self.test_data) + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.') + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.') + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: update the FIPS enforce option True.') + mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.') + self.assert_fips_runtime_config() + + @mock.patch('syslog.syslog') + @mock.patch('subprocess.check_output', side_effect=['FIPS is enabled', json.dumps(running_services)]) + @mock.patch('subprocess.check_call') + def test_hostcfgd_fips_enforce_reconf(self, mock_check_call, mock_check_output, mock_syslog): + with open(hostcfgd.PROC_CMDLINE, 'w') as f: + f.write('swiotlb=65536 sonic_fips=1') + self.test_data['FIPS']['global']['enforce'] = 'true' + MockConfigDb.set_config_db(self.test_data) + host_config_daemon = hostcfgd.HostConfigDaemon() + host_config_daemon.fips_config_handler("FIPS", '', self.test_data) + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to restart services, since FIPS enforced.') + mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option True, since the config has already been set.') + mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.') + self.assert_fips_runtime_config()