diff --git a/python/vyos/utils/auth.py b/python/vyos/utils/auth.py index 4623323327..a904318231 100644 --- a/python/vyos/utils/auth.py +++ b/python/vyos/utils/auth.py @@ -18,10 +18,12 @@ import re import string -from enum import StrEnum +from dataclasses import dataclass from decimal import Decimal -from pwd import getpwall -from pwd import getpwnam +from enum import StrEnum +from typing import List +from typing import Optional + from vyos.utils.process import cmd # Minimum UID used when adding system users @@ -146,12 +148,58 @@ def get_current_user() -> str: current_user = os.environ['USER'] return current_user +@dataclass +class PasswdEntry: + pw_name: str + pw_passwd: str + pw_uid: int + pw_gid: int + pw_gecos: str + pw_dir: str + pw_shell: str + +def get_local_passwd_entries(uid: Optional[int] = None) -> Optional[PasswdEntry] | List[PasswdEntry]: + """ + If uid is None: return a list of all passwd entries. + If uid is given: return the matching entry or None. + """ + entries = [] + with open('/etc/passwd', 'r') as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + parts = line.split(":") + if len(parts) != 7: + continue + + entry = PasswdEntry( + pw_name=parts[0], + pw_passwd=parts[1], + pw_uid=int(parts[2]), + pw_gid=int(parts[3]), + pw_gecos=parts[4], + pw_dir=parts[5], + pw_shell=parts[6], + ) + + # If searching for a specific UID, return immediately if found + if uid is not None and entry.pw_uid == uid: + return entry + + entries.append(entry) + + # uid given but not found + if uid is not None: + return None + + return entries def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID) -> list: """Return list of dynamically allocated users (see Debian Policy Manual)""" local_users = [] - for s_user in getpwall(): + for s_user in get_local_passwd_entries(): if s_user.pw_uid < min_uid: continue if s_user.pw_uid > max_uid: @@ -162,7 +210,9 @@ def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID) -> list: return local_users - def get_user_home_dir(user: str) -> str: """Return user's home directory""" - return getpwnam(user).pw_dir + for u in get_local_passwd_entries(): + if u.pw_name == user: + return u.pw_dir + raise KeyError diff --git a/smoketest/scripts/cli/test_service_ssh.py b/smoketest/scripts/cli/test_service_ssh.py index 6935464a74..ce119ba733 100755 --- a/smoketest/scripts/cli/test_service_ssh.py +++ b/smoketest/scripts/cli/test_service_ssh.py @@ -19,12 +19,11 @@ import re import unittest -from pwd import getpwall - from base_vyostest_shim import VyOSUnitTestSHIM from vyos.configsession import ConfigSessionError from vyos.defaults import config_files +from vyos.utils.auth import get_local_passwd_entries from vyos.utils.process import cmd from vyos.utils.process import is_systemd_service_running from vyos.utils.process import process_named_running @@ -311,7 +310,7 @@ def test_ssh_login(self): self.cli_commit() # After deletion the test user is not allowed to remain in /etc/passwd - usernames = [x[0] for x in getpwall()] + usernames = [x.pw_name for x in get_local_passwd_entries()] self.assertNotIn(test_user, usernames) def test_ssh_dynamic_protection(self): diff --git a/smoketest/scripts/cli/test_system_login.py b/smoketest/scripts/cli/test_system_login.py index b67f3f5464..77027ef01a 100755 --- a/smoketest/scripts/cli/test_system_login.py +++ b/smoketest/scripts/cli/test_system_login.py @@ -28,11 +28,11 @@ from gzip import GzipFile from subprocess import Popen from subprocess import PIPE -from pwd import getpwall from vyos.configsession import ConfigSessionError from vyos.configquery import ConfigTreeQuery from vyos.utils.auth import get_current_user +from vyos.utils.auth import get_local_passwd_entries from vyos.utils.process import cmd from vyos.utils.file import read_file from vyos.utils.file import write_file @@ -175,7 +175,7 @@ def tearDown(self): self.cli_commit() # After deletion, a user is not allowed to remain in /etc/passwd - usernames = [x[0] for x in getpwall()] + usernames = [x.pw_name for x in get_local_passwd_entries()] for user in users: self.assertNotIn(user, usernames) # always forward to base class diff --git a/src/conf_mode/system_login.py b/src/conf_mode/system_login.py index 2692f427fd..3cff7a8066 100755 --- a/src/conf_mode/system_login.py +++ b/src/conf_mode/system_login.py @@ -21,8 +21,6 @@ from copy import deepcopy from passlib.hosts import linux_context from psutil import users -from pwd import getpwall -from pwd import getpwuid from sys import exit from time import sleep @@ -38,6 +36,7 @@ from vyos.utils.auth import EPasswdStrength from vyos.utils.auth import evaluate_strength from vyos.utils.auth import get_current_user +from vyos.utils.auth import get_local_passwd_entries from vyos.utils.auth import get_local_users from vyos.utils.auth import get_user_home_dir from vyos.utils.auth import MIN_USER_UID @@ -136,7 +135,7 @@ def verify(login): raise ConfigError(f'Attempting to delete current user: {tmp}') if 'user' in login: - system_users = getpwall() + system_users = get_local_passwd_entries() for user, user_config in login['user'].items(): # Linux system users range up until UID 1000, we can not create a # VyOS CLI user which already exists as system user @@ -432,7 +431,7 @@ def apply(login): # retrieve current owner of home directory and adjust on demand dir_owner = None try: - dir_owner = getpwuid(os.stat(home_dir).st_uid).pw_name + dir_owner = get_local_passwd_entries(os.stat(home_dir).st_uid).pw_name except: pass diff --git a/src/op_mode/show_users.py b/src/op_mode/show_users.py index bccfaf9917..17ae6d30a9 100755 --- a/src/op_mode/show_users.py +++ b/src/op_mode/show_users.py @@ -13,15 +13,15 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . + import argparse -import pwd import struct import sys from time import ctime from tabulate import tabulate from vyos.config import Config - +from vyos.utils.auth import get_local_passwd_entries class UserInfo: def __init__(self, uid, name, user_type, is_locked, login_time, tty, host): @@ -79,7 +79,7 @@ def list_users(): vyos_users = cfg.list_effective_nodes('system login user') users = [] with open('/var/log/lastlog', 'rb') as lastlog_file: - for (name, _, uid, _, _, _, _) in pwd.getpwall(): + for (name, _, uid, _, _, _, _) in get_local_passwd_entries(): lastlog_info = decode_lastlog(lastlog_file, uid) if lastlog_info is None: continue diff --git a/src/tests/test_utils.py b/src/tests/test_utils.py index 12fda84d0e..9cf6f7a1e1 100644 --- a/src/tests/test_utils.py +++ b/src/tests/test_utils.py @@ -12,12 +12,8 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import pwd from unittest import TestCase -from vyos.utils import auth - - class TestVyOSUtils(TestCase): def test_key_mangling(self): from vyos.utils.dict import mangle_dict_keys @@ -41,44 +37,3 @@ def test_list_strip(self): self.assertEqual(list_strip(lst, rsb, right=True), ['a', 'b', 'c']) self.assertEqual(list_strip(lst, non), []) self.assertEqual(list_strip(sub, lst), []) - -class TestVyOSUtilsAuth(TestCase): - - def test_get_local_users_returns_existing_usernames(self): - # Returned users exist, skip list is excluded, and UIDs are in range - - all_users = set(s_user.pw_name for s_user in pwd.getpwall()) - local_users = auth.get_local_users() - - # All returned users must really exist - for user in local_users: - self.assertIn(user, all_users) - - # Nobody in the skip list - for skipped in auth.SYSTEM_USER_SKIP_LIST: - self.assertNotIn(skipped, local_users) - - # All are within UID range - for s_user in pwd.getpwall(): - if s_user.pw_name in local_users: - self.assertGreaterEqual(s_user.pw_uid, auth.MIN_USER_UID) - self.assertLessEqual(s_user.pw_uid, auth.MAX_USER_UID) - - def test_get_user_home_dir_for_real_user(self): - # User's homedir is a non-empty string for a valid user - - local_users = auth.get_local_users() - if local_users: - for user in local_users: - home_dir = auth.get_user_home_dir(user) - self.assertIsInstance(home_dir, str) - self.assertTrue(bool(home_dir)) # Should not be empty - else: - self.skipTest("No suitable non-system users found on this system") - - def test_get_user_home_dir_invalid_user(self): - # Raises KeyError for nonexistent username - - user = "__this_user_does_not_exist__" # Test using unlikely username - with self.assertRaises(KeyError): - auth.get_user_home_dir(user) diff --git a/src/tests/test_utils_auth.py b/src/tests/test_utils_auth.py new file mode 100644 index 0000000000..f3b52ab293 --- /dev/null +++ b/src/tests/test_utils_auth.py @@ -0,0 +1,75 @@ +# Copyright VyOS maintainers and contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 or later as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pwd +import unittest + +from vyos.utils import auth + +class TestVyOSUtilsAuth(unittest.TestCase): + def test_uid_root(self): + self.assertEqual(auth.get_local_passwd_entries(0).pw_name, 'root') + self.assertEqual(auth.get_local_passwd_entries(0).pw_uid, 0) + + def test_uid_daemon(self): + uid = None + for user in auth.get_local_passwd_entries(): + if user.pw_name == 'daemon': + uid = user.pw_uid + break + + self.assertEqual(auth.get_local_passwd_entries(uid).pw_name, 'daemon') + self.assertEqual(auth.get_local_passwd_entries(uid).pw_uid, uid) + + def test_uid_not_found(self): + self.assertEqual(auth.get_local_passwd_entries(5465487635), None) + + def test_get_local_users_returns_existing_usernames(self): + # Returned users exist, skip list is excluded, and UIDs are in range + + all_users = set(s_user.pw_name for s_user in pwd.getpwall()) + local_users = auth.get_local_users() + + # All returned users must really exist + for user in local_users: + self.assertIn(user, all_users) + + # Nobody in the skip list + for skipped in auth.SYSTEM_USER_SKIP_LIST: + self.assertNotIn(skipped, local_users) + + # All are within UID range + for s_user in pwd.getpwall(): + if s_user.pw_name in local_users: + self.assertGreaterEqual(s_user.pw_uid, auth.MIN_USER_UID) + self.assertLessEqual(s_user.pw_uid, auth.MAX_USER_UID) + + def test_get_user_home_dir_for_real_user(self): + # User's homedir is a non-empty string for a valid user + + local_users = auth.get_local_users() + if local_users: + for user in local_users: + home_dir = auth.get_user_home_dir(user) + self.assertIsInstance(home_dir, str) + self.assertTrue(bool(home_dir)) # Should not be empty + else: + self.skipTest("No suitable non-system users found on this system") + + def test_get_user_home_dir_invalid_user(self): + # Raises KeyError for nonexistent username + + user = "__this_user_does_not_exist__" # Test using unlikely username + with self.assertRaises(KeyError): + auth.get_user_home_dir(user)