Skip to content

Commit

Permalink
Add test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
xumia committed Jul 6, 2023
1 parent 4dff827 commit 170072a
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 18 deletions.
29 changes: 12 additions & 17 deletions scripts/hostcfgd
Original file line number Diff line number Diff line change
Expand Up @@ -1801,8 +1801,8 @@ class FipsCfg(object):
"""

def __init__(self, state_db_conn):
self.enabled = False
self.enforced = False
self.enable = False
self.enforce = False
self.restart_services = DEFAULT_FIPS_RESTART_SERVICES
self.state_db_conn = state_db_conn

Expand All @@ -1822,21 +1822,15 @@ class FipsCfg(object):
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped the FIPS config, the FIPS setting is empty.')
return
self.read_config()
mode = common_config.get('mode', '').lower()
self.enabled = False
self.enforced = False
if mode == 'enforce':
self.enabled = True
self.enforced = True
elif mode == 'noneenforce':
self.enabled = True
self.enforce = is_true(common_config.get('enforce', 'false'))
self.enable = self.enforce or is_true(common_config.get('enable', 'false'))
self.update()

def fips_handler(self, data):
self.load(data)

def update(self):
syslog.syslog(syslog.LOG_DEBUG, f'FipsCfg: update fips option enabled: {self.enabled}, enforced: {self.enforced}.')
syslog.syslog(syslog.LOG_DEBUG, f'FipsCfg: update fips option enable: {self.enable}, enforce: {self.enforce}.')
self.update_enforce_config()
self.update_noneenforce_config()
self.state_db_conn.hset('FIPS_STATE|state', 'config_datetime', datetime.utcnow().isoformat())
Expand All @@ -1849,7 +1843,7 @@ class FipsCfg(object):
cur_fips_enabled = f.read().strip()

expected_fips_enabled = '0'
if self.enabled:
if self.enable:
expected_fips_enabled = '1'

# If the runtime config is not as expected, change the config
Expand All @@ -1875,22 +1869,23 @@ class FipsCfg(object):

# Restart the services required and in the running state
output = run_cmd_output(['sudo', 'systemctl', '-t', 'service', '--state=running', '--no-pager', '-o', 'json'])
services = [s for s in json.dumps(output)]
services = [s['unit'] for s in json.loads(output)]
for service in self.restart_services:
if service in services or service + '.service' in services:
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: restart service {service}.')
run_cmd(['sudo', 'systemctl', 'restart', service])


def update_enforce_config(self):
fips_state = run_cmd_output(['sudo', 'sonic-installer', 'get-fips'])
next_enforced = 'enabled' in fips_state
if next_enforced != self.enforced:
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped to configure the enforce option {self.enforced}, since the config has already been set.')
if next_enforced == self.enforce:
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: skipped to configure the enforce option {self.enforce}, since the config has already been set.')
return
fips_option = '--disable-fips'
if self.enforced:
if self.enforce:
fips_option = '--enable-fips'
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: update the FIPS enforce option {self.enforced}.')
syslog.syslog(syslog.LOG_INFO, f'FipsCfg: update the FIPS enforce option {self.enforce}.')
run_cmd(['sudo', 'sonic-installer', 'set-fips', fips_option])

class HostConfigDaemon:
Expand Down
14 changes: 13 additions & 1 deletion tests/common/mock_configdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,16 @@ def pop(self):

class MockDBConnector():
def __init__(self, db, val, tcpFlag=False, name=None):
pass
self.data = {}

def hget(self, key, field):
if key not in self.data:
return None
if field not in self.data[key]:
return None
return self.data[key][field]

def hset(self, key, field, value):
if key not in self.data:
self.data[key] = {}
self.data[key][field] = value
130 changes: 130 additions & 0 deletions tests/hostcfgd/hostcfgd_fips_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import importlib.machinery
import importlib.util
import filecmp
import json
import shutil
import os
import sys
from swsscommon import swsscommon

from parameterized import parameterized
from unittest import TestCase, mock
from tests.common.mock_configdb import MockConfigDb, MockDBConnector
from sonic_py_common.general import getstatusoutput_noshell
from mock_open import MockOpen


test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
modules_path = os.path.dirname(test_path)
scripts_path = os.path.join(modules_path, "scripts")
sys.path.insert(0, modules_path)

# Load the file under test
hostcfgd_path = os.path.join(scripts_path, 'hostcfgd')
loader = importlib.machinery.SourceFileLoader('hostcfgd', hostcfgd_path)
spec = importlib.util.spec_from_loader(loader.name, loader)
hostcfgd = importlib.util.module_from_spec(spec)
loader.exec_module(hostcfgd)
sys.modules['hostcfgd'] = hostcfgd
original_syslog = hostcfgd.syslog

# Mock swsscommon classes
hostcfgd.ConfigDBConnector = MockConfigDb
hostcfgd.DBConnector = MockDBConnector
hostcfgd.Table = mock.Mock()
running_services = [{"unit":"ssh.service","load":"loaded","active":"active","sub":"running","description":"OpenBSD Secure Shell server"},
{"unit":"restapi.service","load":"loaded","active":"active","sub":"running","description":"SONiC Restful API Service"}]

class TestHostcfgdFIPS(TestCase):
"""
Test hostcfd daemon - FIPS
"""
def run_diff(self, file1, file2):
_, output = getstatusoutput_noshell(['diff', '-uR', file1, file2])
return output

def setUp(self):
self._workPath =os.path.join('/tmp/test_fips/', self._testMethodName)
self.test_data = {'DEVICE_METADATA':{},'FIPS': {}}
self.test_data['DEVICE_METADATA'] = {'localhost': {'hostname': 'fips'}}
self.test_data['FIPS']['global'] = {'enable': 'false', 'enforce': 'false'}
hostcfgd.FIPS_CONFIG_FILE = os.path.join(self._workPath + 'eips.json')
hostcfgd.OPENSSL_FIPS_CONFIG_FILE = os.path.join(self._workPath, 'fips_enabled')
hostcfgd.PROC_CMDLINE = os.path.join(self._workPath, 'cmdline')
os.makedirs(self._workPath, exist_ok=True)
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE, 'w') as f:
f.write('0')

def tearDown(self):
shutil.rmtree(self._workPath, ignore_errors=True)

def assert_fips_runtime_config(self, result='1'):
with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE) as f:
assert f.read() == result

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_enable(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
self.test_data['FIPS']['global']['enable'] = 'true'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option False, since the config has already been set.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config()

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_disable(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
with open(hostcfgd.OPENSSL_FIPS_CONFIG_FILE, 'w') as f:
f.write('1')
self.test_data['FIPS']['global']['enable'] = 'false'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option False, since the config has already been set.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config('0')

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['FIPS is disabled', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_enforce(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=0')
self.test_data['FIPS']['global']['enforce'] = 'true'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service ssh.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: restart service restapi.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: update the FIPS enforce option True.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config()

@mock.patch('syslog.syslog')
@mock.patch('subprocess.check_output', side_effect=['FIPS is enabled', json.dumps(running_services)])
@mock.patch('subprocess.check_call')
def test_hostcfgd_fips_enforce_reconf(self, mock_check_call, mock_check_output, mock_syslog):
with open(hostcfgd.PROC_CMDLINE, 'w') as f:
f.write('swiotlb=65536 sonic_fips=1')
self.test_data['FIPS']['global']['enforce'] = 'true'
MockConfigDb.set_config_db(self.test_data)
host_config_daemon = hostcfgd.HostConfigDaemon()
host_config_daemon.fips_config_handler("FIPS", '', self.test_data)
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to restart services, since FIPS enforced.')
mock_syslog.assert_any_call(original_syslog.LOG_INFO, 'FipsCfg: skipped to configure the enforce option True, since the config has already been set.')
mock_syslog.assert_called_with(original_syslog.LOG_DEBUG, 'FipsCfg: update fips option complete.')
self.assert_fips_runtime_config()

0 comments on commit 170072a

Please sign in to comment.