Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions python/vyos/utils/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import re
import string

from enum import StrEnum
from dataclasses import dataclass
from decimal import Decimal
from pwd import getpwall
from pwd import getpwnam
from enum import StrEnum
from typing import List
from typing import Optional

from vyos.utils.process import cmd

# Minimum UID used when adding system users
Expand Down Expand Up @@ -146,12 +148,58 @@ def get_current_user() -> str:
current_user = os.environ['USER']
return current_user

@dataclass
class PasswdEntry:
pw_name: str
pw_passwd: str
pw_uid: int
pw_gid: int
pw_gecos: str
pw_dir: str
pw_shell: str

def get_local_passwd_entries(uid: Optional[int] = None) -> Optional[PasswdEntry] | List[PasswdEntry]:
"""
If uid is None: return a list of all passwd entries.
If uid is given: return the matching entry or None.
"""
entries = []
with open('/etc/passwd', 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parts = line.split(":")
if len(parts) != 7:
continue

entry = PasswdEntry(
pw_name=parts[0],
pw_passwd=parts[1],
pw_uid=int(parts[2]),
pw_gid=int(parts[3]),
pw_gecos=parts[4],
pw_dir=parts[5],
pw_shell=parts[6],
)

# If searching for a specific UID, return immediately if found
if uid is not None and entry.pw_uid == uid:
return entry

entries.append(entry)

# uid given but not found
if uid is not None:
return None

return entries

def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID) -> list:
"""Return list of dynamically allocated users (see Debian Policy Manual)"""
local_users = []

for s_user in getpwall():
for s_user in get_local_passwd_entries():
if s_user.pw_uid < min_uid:
continue
if s_user.pw_uid > max_uid:
Expand All @@ -162,7 +210,9 @@ def get_local_users(min_uid=MIN_USER_UID, max_uid=MAX_USER_UID) -> list:

return local_users


def get_user_home_dir(user: str) -> str:
"""Return user's home directory"""
return getpwnam(user).pw_dir
for u in get_local_passwd_entries():
if u.pw_name == user:
return u.pw_dir
raise KeyError
5 changes: 2 additions & 3 deletions smoketest/scripts/cli/test_service_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
import re
import unittest

from pwd import getpwall

from base_vyostest_shim import VyOSUnitTestSHIM

from vyos.configsession import ConfigSessionError
from vyos.defaults import config_files
from vyos.utils.auth import get_local_passwd_entries
from vyos.utils.process import cmd
from vyos.utils.process import is_systemd_service_running
from vyos.utils.process import process_named_running
Expand Down Expand Up @@ -311,7 +310,7 @@ def test_ssh_login(self):
self.cli_commit()

# After deletion the test user is not allowed to remain in /etc/passwd
usernames = [x[0] for x in getpwall()]
usernames = [x.pw_name for x in get_local_passwd_entries()]
self.assertNotIn(test_user, usernames)

def test_ssh_dynamic_protection(self):
Expand Down
4 changes: 2 additions & 2 deletions smoketest/scripts/cli/test_system_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
from gzip import GzipFile
from subprocess import Popen
from subprocess import PIPE
from pwd import getpwall

from vyos.configsession import ConfigSessionError
from vyos.configquery import ConfigTreeQuery
from vyos.utils.auth import get_current_user
from vyos.utils.auth import get_local_passwd_entries
from vyos.utils.process import cmd
from vyos.utils.file import read_file
from vyos.utils.file import write_file
Expand Down Expand Up @@ -175,7 +175,7 @@ def tearDown(self):
self.cli_commit()

# After deletion, a user is not allowed to remain in /etc/passwd
usernames = [x[0] for x in getpwall()]
usernames = [x.pw_name for x in get_local_passwd_entries()]
for user in users:
self.assertNotIn(user, usernames)
# always forward to base class
Expand Down
7 changes: 3 additions & 4 deletions src/conf_mode/system_login.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
from copy import deepcopy
from passlib.hosts import linux_context
from psutil import users
from pwd import getpwall
from pwd import getpwuid
from sys import exit
from time import sleep

Expand All @@ -38,6 +36,7 @@
from vyos.utils.auth import EPasswdStrength
from vyos.utils.auth import evaluate_strength
from vyos.utils.auth import get_current_user
from vyos.utils.auth import get_local_passwd_entries
from vyos.utils.auth import get_local_users
from vyos.utils.auth import get_user_home_dir
from vyos.utils.auth import MIN_USER_UID
Expand Down Expand Up @@ -136,7 +135,7 @@ def verify(login):
raise ConfigError(f'Attempting to delete current user: {tmp}')

if 'user' in login:
system_users = getpwall()
system_users = get_local_passwd_entries()
for user, user_config in login['user'].items():
# Linux system users range up until UID 1000, we can not create a
# VyOS CLI user which already exists as system user
Expand Down Expand Up @@ -432,7 +431,7 @@ def apply(login):
# retrieve current owner of home directory and adjust on demand
dir_owner = None
try:
dir_owner = getpwuid(os.stat(home_dir).st_uid).pw_name
dir_owner = get_local_passwd_entries(os.stat(home_dir).st_uid).pw_name
except:
pass

Expand Down
6 changes: 3 additions & 3 deletions src/op_mode/show_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import argparse
import pwd
import struct
import sys
from time import ctime

from tabulate import tabulate
from vyos.config import Config

from vyos.utils.auth import get_local_passwd_entries

class UserInfo:
def __init__(self, uid, name, user_type, is_locked, login_time, tty, host):
Expand Down Expand Up @@ -79,7 +79,7 @@ def list_users():
vyos_users = cfg.list_effective_nodes('system login user')
users = []
with open('/var/log/lastlog', 'rb') as lastlog_file:
for (name, _, uid, _, _, _, _) in pwd.getpwall():
for (name, _, uid, _, _, _, _) in get_local_passwd_entries():
lastlog_info = decode_lastlog(lastlog_file, uid)
if lastlog_info is None:
continue
Expand Down
45 changes: 0 additions & 45 deletions src/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import pwd
from unittest import TestCase

from vyos.utils import auth


class TestVyOSUtils(TestCase):
def test_key_mangling(self):
from vyos.utils.dict import mangle_dict_keys
Expand All @@ -41,44 +37,3 @@ def test_list_strip(self):
self.assertEqual(list_strip(lst, rsb, right=True), ['a', 'b', 'c'])
self.assertEqual(list_strip(lst, non), [])
self.assertEqual(list_strip(sub, lst), [])

class TestVyOSUtilsAuth(TestCase):

def test_get_local_users_returns_existing_usernames(self):
# Returned users exist, skip list is excluded, and UIDs are in range

all_users = set(s_user.pw_name for s_user in pwd.getpwall())
local_users = auth.get_local_users()

# All returned users must really exist
for user in local_users:
self.assertIn(user, all_users)

# Nobody in the skip list
for skipped in auth.SYSTEM_USER_SKIP_LIST:
self.assertNotIn(skipped, local_users)

# All are within UID range
for s_user in pwd.getpwall():
if s_user.pw_name in local_users:
self.assertGreaterEqual(s_user.pw_uid, auth.MIN_USER_UID)
self.assertLessEqual(s_user.pw_uid, auth.MAX_USER_UID)

def test_get_user_home_dir_for_real_user(self):
# User's homedir is a non-empty string for a valid user

local_users = auth.get_local_users()
if local_users:
for user in local_users:
home_dir = auth.get_user_home_dir(user)
self.assertIsInstance(home_dir, str)
self.assertTrue(bool(home_dir)) # Should not be empty
else:
self.skipTest("No suitable non-system users found on this system")

def test_get_user_home_dir_invalid_user(self):
# Raises KeyError for nonexistent username

user = "__this_user_does_not_exist__" # Test using unlikely username
with self.assertRaises(KeyError):
auth.get_user_home_dir(user)
75 changes: 75 additions & 0 deletions src/tests/test_utils_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright VyOS maintainers and contributors <[email protected]>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import pwd
import unittest

from vyos.utils import auth

class TestVyOSUtilsAuth(unittest.TestCase):
def test_uid_root(self):
self.assertEqual(auth.get_local_passwd_entries(0).pw_name, 'root')
self.assertEqual(auth.get_local_passwd_entries(0).pw_uid, 0)

def test_uid_daemon(self):
uid = None
for user in auth.get_local_passwd_entries():
if user.pw_name == 'daemon':
uid = user.pw_uid
break

self.assertEqual(auth.get_local_passwd_entries(uid).pw_name, 'daemon')
self.assertEqual(auth.get_local_passwd_entries(uid).pw_uid, uid)

def test_uid_not_found(self):
self.assertEqual(auth.get_local_passwd_entries(5465487635), None)

def test_get_local_users_returns_existing_usernames(self):
# Returned users exist, skip list is excluded, and UIDs are in range

all_users = set(s_user.pw_name for s_user in pwd.getpwall())
local_users = auth.get_local_users()

# All returned users must really exist
for user in local_users:
self.assertIn(user, all_users)

# Nobody in the skip list
for skipped in auth.SYSTEM_USER_SKIP_LIST:
self.assertNotIn(skipped, local_users)

# All are within UID range
for s_user in pwd.getpwall():
if s_user.pw_name in local_users:
self.assertGreaterEqual(s_user.pw_uid, auth.MIN_USER_UID)
self.assertLessEqual(s_user.pw_uid, auth.MAX_USER_UID)

def test_get_user_home_dir_for_real_user(self):
# User's homedir is a non-empty string for a valid user

local_users = auth.get_local_users()
if local_users:
for user in local_users:
home_dir = auth.get_user_home_dir(user)
self.assertIsInstance(home_dir, str)
self.assertTrue(bool(home_dir)) # Should not be empty
else:
self.skipTest("No suitable non-system users found on this system")

def test_get_user_home_dir_invalid_user(self):
# Raises KeyError for nonexistent username

user = "__this_user_does_not_exist__" # Test using unlikely username
with self.assertRaises(KeyError):
auth.get_user_home_dir(user)
Loading