diff --git a/dissect/target/plugins/os/unix/log/auth.py b/dissect/target/plugins/os/unix/log/auth.py index 393502e14..4be43260e 100644 --- a/dissect/target/plugins/os/unix/log/auth.py +++ b/dissect/target/plugins/os/unix/log/auth.py @@ -1,38 +1,311 @@ +from __future__ import annotations + import itertools import logging import re +from abc import ABC, abstractmethod +from datetime import datetime +from functools import lru_cache from itertools import chain -from typing import Iterator +from pathlib import Path +from typing import Any, Iterator +from dissect.target import Target from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers.fsutil import TargetPath, open_decompress +from dissect.target.helpers.fsutil import open_decompress from dissect.target.helpers.record import DynamicDescriptor, TargetRecordDescriptor from dissect.target.helpers.utils import year_rollover_helper from dissect.target.plugin import Plugin, alias, export log = logging.getLogger(__name__) -_RE_TS = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}" -_RE_TS_ISO = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}" - -RE_TS = re.compile(_RE_TS) -RE_TS_ISO = re.compile(_RE_TS_ISO) +RE_TS = re.compile(r"^[A-Za-z]{3}\s*\d{1,2}\s\d{1,2}:\d{2}:\d{2}") +RE_TS_ISO = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}\+\d{2}:\d{2}") RE_LINE = re.compile( - rf"(?P{_RE_TS}|{_RE_TS_ISO})\s(?P\S+)\s(?P\S+?)(\[(?P\d+)\])?:\s(?P.+)$" + r""" + \d{2}:\d{2}\s # First match on the similar ending of the different timestamps + (?P\S+)\s # The hostname + (?P\S+?)(\[(?P\d+)\])?: # The service with optionally the PID between brackets + \s*(?P.+?)\s*$ # The log message stripped from spaces left and right + """, + re.VERBOSE, ) -AuthLogRecord = TargetRecordDescriptor( - "linux/log/auth", - [ - ("datetime", "ts"), - ("string", "message"), - ("path", "source"), - ], +# Generic regular expressions +RE_IPV4_ADDRESS = re.compile( + r""" + ((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3} # First three octets + (25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?) # Last octet + """, + re.VERBOSE, ) +RE_USER = re.compile(r"for ([^\s]+)") + + +class BaseService(ABC): + @classmethod + @abstractmethod + def parse(cls, message: str) -> dict[str, any]: + pass + + +class SudoService(BaseService): + """Parsing of sudo service messages in the auth log.""" + + RE_SUDO_COMMAND = re.compile( + r""" + TTY=(?P\w+\/\w+)\s;\s # The TTY -> TTY=pts/0 ; + PWD=(?P[\/\w]+)\s;\s # The current working directory -> PWD="/home/user" ; + USER=(?P\w+)\s;\s # The effective user -> USER=root ; + COMMAND=(?P.+)$ # The command -> COMMAND=/usr/bin/whoami + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + """Parse auth log message from sudo.""" + if not (match := cls.RE_SUDO_COMMAND.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + additional_fields[key] = value + + return additional_fields + + +class SshdService(BaseService): + """Class for parsing sshd messages in the auth log.""" + + RE_SSHD_PORTREGEX = re.compile(r"port\s(\d+)") + RE_USER = re.compile(r"for\s([^\s]+)") + + @classmethod + def parse(cls, message: str) -> dict[str, str | int]: + """Parse message from sshd""" + additional_fields = {} + if ip_address := RE_IPV4_ADDRESS.search(message): + field_name = "host_ip" if "listening" in message else "remote_ip" + additional_fields[field_name] = ip_address.group(0) + if port := cls.RE_SSHD_PORTREGEX.search(message): + additional_fields["port"] = int(port.group(1)) + if user := cls.RE_USER.search(message): + additional_fields["user"] = user.group(1) + # Accepted publickey for test_user from 8.8.8.8 IP port 12345 ssh2: RSA SHA256:123456789asdfghjklertzuio + if "Accepted publickey" in message: + ssh_protocol, encryption_algo, key_info = message.split()[-3:] + hash_algo, key_hash = key_info.split(":") + additional_fields["ssh_protocol"] = ssh_protocol.strip(":") + additional_fields["encryption_algorithm"] = encryption_algo + additional_fields["hash_algorithm"] = hash_algo + additional_fields["key_hash"] = key_hash + if (failed := "Failed" in message) or "Accepted" in message: + action_type = "failed" if failed else "accepted" + additional_fields["action"] = f"{action_type} authentication" + additional_fields["authentication_type"] = "password" if "password" in message else "publickey" + + return additional_fields + + +class SystemdLogindService(BaseService): + """Class for parsing systemd-logind messages in the auth log.""" + + RE_SYSTEMD_LOGIND_WATCHING = re.compile( + r""" + (?PWatching\ssystem\sbuttons)\s # Action is "Watching system buttons" + on\s(?P[^\s]+)\s # The device the button is related to -> /dev/input/event0 + \((?P.*?)\) # The device (button) name -> (Power button) + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str): + """Parse auth log message from systemd-logind.""" + additional_fields = {} + # Example: Nov 14 07:14:09 ubuntu-1 systemd-logind[4]: Removed session 4. + if "Removed" in message: + additional_fields["action"] = "removed session" + additional_fields["session"] = message.split()[-1].strip(".") + elif "Watching" in message and (match := cls.RE_SYSTEMD_LOGIND_WATCHING.search(message)): + for key, value in match.groupdict().items(): + additional_fields[key] = value + # Example: New session 4 of user sampleuser. + elif "New session" in message: + parts = message.removeprefix("New session ").split() + additional_fields["action"] = "new session" + additional_fields["session"] = parts[0] + additional_fields["user"] = parts[-1].strip(".") + # Example: Session 4 logged out. Waiting for processes to exit. + elif "logged out" in message: + session = message.removeprefix("Session ").split(maxsplit=1)[0] + additional_fields["action"] = "logged out session" + additional_fields["session"] = session + # Example: New seat seat0. + elif "New seat" in message: + seat = message.split()[-1].strip(".") + additional_fields["action"] = "new seat" + additional_fields["seat"] = seat + + return additional_fields + + +class SuService(BaseService): + """Class for parsing su messages in the auth log.""" + + RE_SU_BY = re.compile(r"by\s([^\s]+)") + RE_SU_ON = re.compile(r"on\s([^\s]+)") + RE_SU_COMMAND = re.compile(r"'(.*?)'") + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + additional_fields = {} + if user := RE_USER.search(message): + additional_fields["user"] = user.group(1) + if by := cls.RE_SU_BY.search(message): + additional_fields["by"] = by.group(1) + if on := cls.RE_SU_ON.search(message): + additional_fields["device"] = on.group(1) + if command := cls.RE_SU_COMMAND.search(message): + additional_fields["command"] = command.group(1) + if (failed := "failed" in message) or "Successful" in message: + additional_fields["su_result"] = "failed" if failed else "success" + + return additional_fields + + +class PkexecService(BaseService): + """Class for parsing pkexec messages in the auth log.""" + + RE_PKEXEC_COMMAND = re.compile( + r""" + (?P\S+?):\sExecuting\scommand\s # Starts with actual user -> user: + \[USER=(?P[^\]]+)\]\s # The impersonated user -> [USER=root] + \[TTY=(?P[^\]]+)\]\s # The tty -> [TTY=unknown] + \[CWD=(?P[^\]]+)\]\s # Current working directory -> [CWD=/home/user] + \[COMMAND=(?P[^\]]+)\] # Command -> [COMMAND=/usr/lib/example] + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message: str) -> dict[str, str]: + """Parse auth log message from pkexec""" + additional_fields = {} + if exec_cmd := cls.RE_PKEXEC_COMMAND.search(message): + additional_fields["action"] = "executing command" + for key, value in exec_cmd.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value + + return additional_fields + + +class PamUnixService(BaseService): + RE_PAM_UNIX = re.compile( + r""" + pam_unix\([^\s]+:session\):\s(?Psession\s\w+)\s # Session action, usually opened or closed + for\suser\s(?P[^\s\(]+)(?:\(uid=(?P\d+)\))? # User may contain uid like: root(uid=0) + (?:\sby\s\(uid=(?P\d+)\))?$ # Opened action also contains by + """, + re.VERBOSE, + ) + + @classmethod + def parse(cls, message): + """Parse auth log message from pluggable authentication modules (PAM).""" + if not (match := cls.RE_PAM_UNIX.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value + + return additional_fields + + +class AuthLogRecordBuilder: + """Class for dynamically creating auth log records.""" + + RECORD_NAME = "linux/log/auth" + SERVICES: dict[str, BaseService] = { + "su": SuService, + "sudo": SudoService, + "sshd": SshdService, + "systemd-logind": SystemdLogindService, + "pkexec": PkexecService, + } + + def __init__(self, target: Target): + self._create_event_descriptor = lru_cache(4096)(self._create_event_descriptor) + self.target = target + + def _parse_additional_fields(self, service: str | None, message: str) -> dict[str, Any]: + """Parse additional fields in the message based on the service.""" + if "pam_unix(" in message: + return PamUnixService.parse(message) + + if service not in self.SERVICES: + self.target.log.debug("Service %s is not recognised, no additional fields could be parsed", service) + return {} + + try: + return self.SERVICES[service].parse(message) + except Exception as e: + self.target.log.warning("Parsing additional fields in message '%s' for service %s failed", message, service) + self.target.log.debug("", exc_info=e) + raise e + + def build_record(self, ts: datetime, source: Path, line: str) -> TargetRecordDescriptor: + """Builds an ``AuthLog`` event record.""" + + record_fields = [ + ("datetime", "ts"), + ("path", "source"), + ("string", "service"), + ("varint", "pid"), + ("string", "message"), + ] + + record_values = { + "ts": ts, + "message": line, + "service": None, + "pid": None, + "source": source, + "_target": self.target, + } + + match = RE_LINE.search(line) + if match: + record_values.update(match.groupdict()) + + for key, value in self._parse_additional_fields(record_values["service"], line).items(): + record_type = "string" + if isinstance(value, int): + record_type = "varint" + + record_fields.append((record_type, key)) + record_values[key] = value + + # tuple conversion here is needed for lru_cache + desc = self._create_event_descriptor(tuple(record_fields)) + return desc(**record_values) + + def _create_event_descriptor(self, record_fields) -> TargetRecordDescriptor: + return TargetRecordDescriptor(self.RECORD_NAME, record_fields) class AuthPlugin(Plugin): - """Unix auth log plugin.""" + """Unix authentication log plugin.""" + + def __init__(self, target: Target): + super().__init__(target) + self._auth_log_builder = AuthLogRecordBuilder(target) def check_compatible(self) -> None: var_log = self.target.fs.path("/var/log") @@ -41,7 +314,7 @@ def check_compatible(self) -> None: @alias("securelog") @export(record=DynamicDescriptor(["datetime", "path", "string"])) - def authlog(self) -> Iterator[any]: + def authlog(self) -> Iterator[Any]: """Yield contents of ``/var/log/auth.log*`` and ``/var/log/secure*`` files. Order of returned events is not guaranteed to be chronological because of year @@ -69,7 +342,6 @@ def authlog(self) -> Iterator[any]: for auth_file in chain(var_log.glob("auth.log*"), var_log.glob("secure*")): if is_iso_fmt(auth_file): iterable = iso_readlines(auth_file) - else: iterable = year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo) @@ -77,9 +349,8 @@ def authlog(self) -> Iterator[any]: yield self._auth_log_builder.build_record(ts, auth_file, line) -def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]: +def iso_readlines(file: Path) -> Iterator[tuple[datetime, str]]: """Iterator reading the provided auth log file in ISO format. Mimics ``year_rollover_helper`` behaviour.""" - with open_decompress(file, "rt") as fh: for line in fh: if not (match := RE_TS_ISO.match(line)): @@ -89,7 +360,6 @@ def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]: try: ts = datetime.strptime(match[0], "%Y-%m-%dT%H:%M:%S.%f%z") - except ValueError as e: log.warning("Unable to parse ISO timestamp in line: %s", line) log.debug("", exc_info=e) @@ -98,6 +368,6 @@ def iso_readlines(file: TargetPath) -> Iterator[tuple[datetime, str]]: yield ts, line -def is_iso_fmt(file: TargetPath) -> bool: +def is_iso_fmt(file: Path) -> bool: """Determine if the provided auth log file uses new ISO format logging or not.""" return any(itertools.islice(iso_readlines(file), 0, 2)) diff --git a/tests/plugins/os/unix/log/test_auth.py b/tests/plugins/os/unix/log/test_auth.py index 9de617716..4bfb63646 100644 --- a/tests/plugins/os/unix/log/test_auth.py +++ b/tests/plugins/os/unix/log/test_auth.py @@ -1,8 +1,12 @@ +from __future__ import annotations + from datetime import datetime, timezone from io import BytesIO +from pathlib import Path from unittest.mock import patch from zoneinfo import ZoneInfo +import pytest from flow.record.fieldtypes import datetime as dt from dissect.target.filesystem import VirtualFilesystem @@ -27,9 +31,8 @@ def test_auth_plugin(target_unix: Target, fs_unix: VirtualFilesystem) -> None: results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Europe/Amsterdam")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_with_gz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: @@ -51,9 +54,8 @@ def test_auth_plugin_with_gz(target_unix: Target, fs_unix: VirtualFilesystem) -> results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Pacific/Honolulu")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_with_bz(target_unix: Target, fs_unix: VirtualFilesystem) -> None: @@ -75,9 +77,8 @@ def test_auth_plugin_with_bz(target_unix: Target, fs_unix: VirtualFilesystem) -> results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("America/Nuuk")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_year_rollover(target_unix: Target, fs_unix: VirtualFilesystem) -> None: @@ -97,12 +98,198 @@ def test_auth_plugin_year_rollover(target_unix: Target, fs_unix: VirtualFilesyst assert len(results) == 2 results.reverse() - assert isinstance(results[0], type(AuthLogRecord())) - assert isinstance(results[1], type(AuthLogRecord())) assert results[0].ts == dt(2021, 12, 31, 3, 14, 0, tzinfo=ZoneInfo("Etc/UTC")) assert results[1].ts == dt(2022, 1, 1, 13, 37, 0, tzinfo=ZoneInfo("Etc/UTC")) +@pytest.mark.parametrize( + "message, results", + [ + pytest.param( + "Mar 29 10:43:01 ubuntu-1 sshd[1193]: Accepted password for test_user from 8.8.8.8 port 52942 ssh2", + { + "service": "sshd", + "pid": 1193, + "action": "accepted authentication", + "authentication_type": "password", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 52942, + }, + id="sshd: accepted password", + ), + pytest.param( + "Jun 4 22:14:15 ubuntu-1 sshd[41458]: Failed password for root from 8.8.8.8 port 22 ssh2", + { + "service": "sshd", + "pid": 41458, + "action": "failed authentication", + "authentication_type": "password", + "user": "root", + "remote_ip": "8.8.8.8", + "port": 22, + }, + id="sshd: failed password", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Accepted publickey for test_user " + "from 8.8.8.8 port 12345 ssh2: RSA SHA256:123456789asdfghjklertzuio", + { + "service": "sshd", + "pid": 1361, + "action": "accepted authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + "ssh_protocol": "ssh2", + "encryption_algorithm": "RSA", + "hash_algorithm": "SHA256", + "key_hash": "123456789asdfghjklertzuio", + }, + id="sshd: accepted publickey", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Failed publickey for test_user from 8.8.8.8 port 12345 ssh2.", + { + "service": "sshd", + "pid": 1361, + "action": "failed authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + }, + id="sshd: failed publickey", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 sshd[1291]: Server listening on 127.0.0.1 port 22.", + { + "service": "sshd", + "pid": 1291, + "host_ip": "127.0.0.1", + "port": 22, + }, + id="sshd: listening", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened for user test_user by (uid=0)", + { + "service": "sshd", + "pid": 1361, + "action": "session opened", + "user": "test_user", + "user_uid": None, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened " + "for user root(uid=0) by (uid=0)", + { + "service": "sshd", + "pid": 1361, + "action": "session opened", + "user": "root", + "user_uid": 0, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: Watching system buttons " + "on /dev/input/event0 (Power Button)", + { + "service": "systemd-logind", + "pid": 1118, + "action": "Watching system buttons", + "device": "/dev/input/event0", + "device_name": "Power Button", + }, + id="systemd-logind: watching system buttons", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: New seat seat0.", + { + "service": "systemd-logind", + "pid": 1118, + "action": "new seat", + "seat": "seat0", + }, + id="systemd-logind: new seat", + ), + pytest.param( + "Mar 27 13:10:08 ubuntu-1 sudo: ubuntu : TTY=pts/0 ; PWD=/home/test_user ; " + "USER=root ; COMMAND=/usr/bin/apt-key add -", + { + "service": "sudo", + "pid": None, + "tty": "pts/0", + "pwd": "/home/test_user", + "effective_user": "root", + "command": "/usr/bin/apt-key add -", + }, + id="sudo: command", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1521]: Successful su for user by root", + {"service": "su", "pid": 1521, "su_result": "success", "user": "user", "by": "root"}, + id="su: success", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1531]: 'su root' failed for user by root", + { + "service": "su", + "pid": 1531, + "su_result": "failed", + "command": "su root", + "user": "user", + "by": "root", + }, + id="su: failed", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 pkexec[1531]: user: Executing command [USER=root] " + "[TTY=unknown] [CWD=/home/user] [COMMAND=/usr/lib/update-notifier/package-system-locked]", + { + "service": "pkexec", + "pid": 1531, + "action": "executing command", + "user": "user", + "effective_user": "root", + "tty": "unknown", + "cwd": "/home/user", + "command": "/usr/lib/update-notifier/package-system-locked", + }, + id="pkexec: executing command", + ), + pytest.param( + "Mar 27 13:17:01 ubuntu-1 CRON[2623]: pam_unix(cron:session): session closed for user root", + { + "service": "CRON", + "pid": 2623, + "action": "session closed", + "user": "root", + }, + id="cron: pam_unix", + ), + ], +) +def test_auth_plugin_additional_fields( + target_unix, fs_unix: VirtualFilesystem, tmp_path: Path, message: str, results: dict[str, str | int] +) -> None: + data_path = tmp_path / "auth.log" + data_path.write_text(message) + fs_unix.map_file("var/log/auth.log", data_path) + + target_unix.add_plugin(AuthPlugin) + record = list(target_unix.authlog())[0] + + for key, value in results.items(): + assert getattr(record, key) == value + + def test_auth_plugin_iso_date_format(target_unix: Target, fs_unix: VirtualFilesystem) -> None: """test if we correctly handle Ubuntu 24.04 ISO formatted dates."""