From d5273dc8027dae1f2b60292eb584daec07dea490 Mon Sep 17 00:00:00 2001 From: Poeloe <22234727+Poeloe@users.noreply.github.com> Date: Thu, 19 Sep 2024 09:23:55 -0700 Subject: [PATCH] Refactor AuthLogPlugin Fixes #286 --- dissect/target/plugins/os/unix/log/auth.py | 282 +++++++++++++++++++-- tests/plugins/os/unix/log/test_auth.py | 204 ++++++++++++++- 2 files changed, 452 insertions(+), 34 deletions(-) diff --git a/dissect/target/plugins/os/unix/log/auth.py b/dissect/target/plugins/os/unix/log/auth.py index 721a2fcd3..d62ae20f9 100644 --- a/dissect/target/plugins/os/unix/log/auth.py +++ b/dissect/target/plugins/os/unix/log/auth.py @@ -1,39 +1,268 @@ import re +from datetime import datetime +from enum import Enum +from functools import lru_cache from itertools import chain -from typing import Iterator +from pathlib import Path +from typing import Iterator, Union +from dissect.target import Target from dissect.target.exceptions import UnsupportedPluginError -from dissect.target.helpers.record import TargetRecordDescriptor +from dissect.target.helpers.record import DynamicDescriptor, TargetRecordDescriptor from dissect.target.helpers.utils import year_rollover_helper from dissect.target.plugin import Plugin, export -AuthLogRecord = TargetRecordDescriptor( - "linux/log/auth", - [ - ("datetime", "ts"), - ("string", "message"), - ("path", "source"), - ], -) - _TS_REGEX = r"^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}" RE_TS = re.compile(_TS_REGEX) RE_TS_AND_HOSTNAME = re.compile(_TS_REGEX + r"\s\S+\s") +class AuthLogServicesEnum(str, Enum): + cron = "CRON" + su = "su" + sudo = "sudo" + sshd = "sshd" + systemd = "systemd" + systemd_logind = "systemd-logind" + pkexec = "pkexec" + + +class AuthLogRecordBuilder: + RECORD_NAME = "linux/log/auth" + # Generic regular expressions + IPV4_ADDRESS_REGEX = re.compile( + r"((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}" # First three octets + r"(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)" # Last octet + ) + PAM_UNIX_REGEX = re.compile( + r"pam_unix\([^\s]+:session\):\s(?Psession\s\w+) " # Session action, usually opened or closed + r"for\suser\s(?P[^\s\(]+)(?:\(uid=(?P\d+)\))?" # User may contain uid like: root(uid=0) + r"(?:\sby\s\(uid=(?P\d+)\))?$" # Opened action also contains this "by" addition + ) + USER_REGEX = re.compile(r"for ([^\s]+)") + # sudo regular expressions + SUDO_COMMAND_REGEX = re.compile( + r"TTY=(?P\w+\/\w+)\s;\s" # The TTY -> TTY=pts/0 ; + r"PWD=(?P[\/\w]+)\s;\s" # The current working directory -> PWD="/home/user" ; + r"USER=(?P\w+)\s;\s" # The effective user -> USER=root ; + r"COMMAND=(?P.+)$" # The command -> COMMAND=/usr/bin/whoami + ) + # su regular expressions + SU_BY_REGEX = re.compile(r"by\s([^\s]+)") + SU_ON_REGEX = re.compile(r"on\s([^\s]+)") + SU_COMMAND_REGEX = re.compile(r"'(.*?)'") + # pkexec regular expressions + PKEXEC_COMMAND_REGEX = re.compile( + r"(?P.*?):\sExecuting\scommand\s" # Starts with actual user -> user: + r"\[USER=(?P[^\]]+)\]\s" # The impersonated user -> [USER=root] + r"\[TTY=(?P[^\]]+)\]\s" # The tty -> [TTY=unknown] + r"\[CWD=(?P[^\]]+)\]\s" # Current working directory -> [CWD=/home/user] + r"\[COMMAND=(?P[^\]]+)\]" # Command performed -> [COMMAND=/usr/lib/example] + ) + # sshd regular expressions + SSHD_PORT_REGEX = re.compile(r"port\s(\d+)") + USER_REGEX = re.compile(r"for\s([^\s]+)") + # systemd-logind regular expressions + SYSTEMD_LOGIND_WATCHING_REGEX = re.compile( + r"(?PWatching\ssystem\sbuttons)\s" # Action is "Watching system buttons" + r"on\s(?P[^\s]+)\s" # The device the button is related to -> /dev/input/event0 + r"\((?P.*?)\)" # The device (button) name -> "(Power button)" + ) + + def __init__(self, target: Target): + self._create_event_descriptor = lru_cache(4096)(self._create_event_descriptor) + self.target = target + + def _parse_sshd_message(self, message: str) -> dict[str, Union[str, int]]: + """Parse message from sshd""" + additional_fields = {} + if ip_address := self.IPV4_ADDRESS_REGEX.search(message): + field_name = "host_ip" if "listening" in message else "remote_ip" + additional_fields[field_name] = ip_address.group(0) + if port := self.SSHD_PORT_REGEX.search(message): + additional_fields["port"] = int(port.group(1)) + if user := self.USER_REGEX.search(message): + additional_fields["user"] = user.group(1) + # Accepted publickey for test_user from 8.8.8.8 IP port 12345 ssh2: RSA SHA256:123456789asdfghjklöertzuio + if "Accepted publickey" in message: + ssh_protocol, encryption_algo, key_info = message.split()[-3:] + hash_algo, key_hash = key_info.split(":") + additional_fields["ssh_protocol"] = ssh_protocol.strip(":") + additional_fields["encryption_algorithm"] = encryption_algo + additional_fields["hash_algorithm"] = hash_algo + additional_fields["key_hash"] = key_hash + if (failed := "Failed" in message) or "Accepted" in message: + action_type = "failed" if failed else "accepted" + additional_fields["action"] = f"{action_type} authentication" + additional_fields["authentication_type"] = "password" if "password" in message else "publickey" + + return additional_fields + + def _parse_systemd_logind_message(self, message: str) -> dict[str, str]: + """Parse auth log message from systemd-logind""" + additional_fields = {} + # Example: Nov 14 07:14:09 ubuntu-1 systemd-logind[4]: Removed session 4. + if "Removed" in message: + additional_fields["action"] = "Removed" + additional_fields["session"] = message.split()[-1].strip(".") + elif "Watching" in message and (match := self.SYSTEMD_LOGIND_WATCHING_REGEX.search(message)): + for key, value in match.groupdict().items(): + additional_fields[key] = value + # Example: New session 4 of user sampleuser. + elif "New session" in message: + parts = message.removeprefix("New session ").split() + additional_fields["action"] = "New session" + additional_fields["session"] = parts[0] + additional_fields["user"] = parts[-1].strip(".") + # Example: Session 4 logged out. Waiting for processes to exit. + elif "logged out" in message: + session = message.removeprefix("Session ").split(maxsplit=1)[0] + additional_fields["action"] = "Logged out" + additional_fields["session"] = session + # Example: New seat seat0. + elif "New seat" in message: + seat = message.split()[-1].strip(".") + additional_fields["action"] = "New seat" + additional_fields["seat"] = seat + + return additional_fields + + def _parse_sudo_message(self, message: str) -> dict[str, str]: + """Parse auth log message from sudo""" + if not (match := self.SUDO_COMMAND_REGEX.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + additional_fields[key] = value + + return additional_fields + + def _parse_su_message(self, message: str) -> dict[str, str]: + additional_fields = {} + if user := self.USER_REGEX.search(message): + additional_fields["user"] = user.group(1) + if by := self.SU_BY_REGEX.search(message): + additional_fields["by"] = by.group(1) + if on := self.SU_ON_REGEX.search(message): + additional_fields["device"] = on.group(1) + if command := self.SU_COMMAND_REGEX.search(message): + additional_fields["command"] = command.group(1) + if (failed := "failed" in message) or "Successful" in message: + additional_fields["su_result"] = "failed" if failed else "success" + + return additional_fields + + def _parse_pkexec_message(self, message: str) -> dict[str, str]: + """Parse auth log message from pkexec""" + additional_fields = {} + if exec_cmd := self.PKEXEC_COMMAND_REGEX.search(message): + additional_fields["action"] = "executing command" + for key, value in exec_cmd.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value + + return additional_fields + + def _parse_pam_unix_message(self, message: str) -> dict[str, str]: + """Parse auth log message from pluggable authentication modules (PAM)""" + if not (match := self.PAM_UNIX_REGEX.search(message)): + return {} + + additional_fields = {} + for key, value in match.groupdict().items(): + if value and value.isdigit(): + value = int(value) + additional_fields[key] = value + + return additional_fields + + def _parse_additional_fields(self, service: str, message: str) -> dict[str, any]: + """Parse additional fields in the message based on the service""" + if service not in [item.value for item in AuthLogServicesEnum] and "pam_unix(" not in message: + self.target.log.debug("Service %s is not recognised, no additional fields could be parsed", service) + return {} + + additional_fields = {} + try: + if "pam_unix(" in message: + additional_fields.update(self._parse_pam_unix_message(message)) + elif service == AuthLogServicesEnum.sshd: + additional_fields.update(self._parse_sshd_message(message)) + elif service == AuthLogServicesEnum.sudo: + additional_fields.update(self._parse_sudo_message(message)) + elif service == AuthLogServicesEnum.su: + additional_fields.update(self._parse_su_message(message)) + elif service == AuthLogServicesEnum.systemd_logind: + additional_fields.update(self._parse_systemd_logind_message(message)) + elif service == AuthLogServicesEnum.pkexec: + additional_fields.update(self._parse_pkexec_message(message)) + except Exception as e: + self.target.log.warning( + "Parsing additional fields in message '%s' for service %s failed", message, service, exc_info=e + ) + self.target.log.debug("", exc_info=e) + + return additional_fields + + def build_record(self, ts: datetime, source: Path, service: str, pid: int, message: str) -> TargetRecordDescriptor: + """Builds an AuthLog event record""" + record_fields = [ + ("datetime", "ts"), + ("path", "source"), + ("string", "service"), + # PID should be string, since it can be "None" + ("string", "pid"), + ("string", "message"), + ] + + record_values = {} + record_values["ts"] = ts + record_values["source"] = source + record_values["service"] = service + record_values["pid"] = pid + record_values["message"] = message + record_values["_target"] = self.target + + for key, value in self._parse_additional_fields(service, message).items(): + record_type = "bytes" + if isinstance(value, list): + record_type = "string[]" + elif isinstance(value, int): + record_type = "varint" + elif isinstance(value, str): + record_type = "string" + + record_fields.append((record_type, key)) + record_values[key] = value + + # tuple conversion here is needed for lru_cache + desc = self._create_event_descriptor(tuple(record_fields)) + return desc(**record_values) + + def _create_event_descriptor(self, record_fields) -> TargetRecordDescriptor: + return TargetRecordDescriptor(self.RECORD_NAME, record_fields) + + class AuthPlugin(Plugin): + def __init__(self, target: Target): + super().__init__(target) + self.target + self._auth_log_builder = AuthLogRecordBuilder(target) + def check_compatible(self) -> None: var_log = self.target.fs.path("/var/log") if not any(var_log.glob("auth.log*")) and not any(var_log.glob("secure*")): raise UnsupportedPluginError("No auth log files found") - @export(record=[AuthLogRecord]) - def securelog(self) -> Iterator[AuthLogRecord]: + @export(record=DynamicDescriptor(["datetime", "path", "string"])) + def securelog(self) -> Iterator[any]: """Return contents of /var/log/auth.log* and /var/log/secure*.""" return self.authlog() - @export(record=[AuthLogRecord]) - def authlog(self) -> Iterator[AuthLogRecord]: + @export(record=DynamicDescriptor(["datetime", "path", "string"])) + def authlog(self) -> Iterator[any]: """Return contents of /var/log/auth.log* and /var/log/secure*.""" # Assuming no custom date_format template is set in syslog-ng or systemd (M d H:M:S) @@ -44,17 +273,20 @@ def authlog(self) -> Iterator[AuthLogRecord]: var_log = self.target.fs.path("/var/log") for auth_file in chain(var_log.glob("auth.log*"), var_log.glob("secure*")): - for ts, line in year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo): + for idx, (ts, line) in enumerate(year_rollover_helper(auth_file, RE_TS, "%b %d %H:%M:%S", tzinfo)): ts_and_hostname = re.search(RE_TS_AND_HOSTNAME, line) if not ts_and_hostname: - self.target.log.warning("No timstamp and hostname found on one of the lines in %s.", auth_file) - self.target.log.debug("Skipping this line: %s", line) + self.target.log.warning("No timestamp and hostname found on line %d for file %s.", idx, auth_file) + self.target.log.debug("Skipping line %d: %s", idx, line) continue - message = line.replace(ts_and_hostname.group(0), "").strip() - yield AuthLogRecord( - ts=ts, - message=message, - source=auth_file, - _target=self.target, - ) + info = line.replace(ts_and_hostname.group(0), "").strip() + service, _message = info.split(":", maxsplit=1) + message = _message.strip() + # Get the PID, if present. Example: CRON[1] --> pid=1 + pid = None + if "[" in service: + service, _pid = service.split("[")[:2] + pid = _pid.strip("]") + + yield self._auth_log_builder.build_record(ts, auth_file, service, pid, message) diff --git a/tests/plugins/os/unix/log/test_auth.py b/tests/plugins/os/unix/log/test_auth.py index f17fce280..b5b68f2bb 100644 --- a/tests/plugins/os/unix/log/test_auth.py +++ b/tests/plugins/os/unix/log/test_auth.py @@ -1,12 +1,15 @@ from datetime import datetime, timezone from io import BytesIO +from pathlib import Path +from typing import Union from unittest.mock import patch from zoneinfo import ZoneInfo +import pytest from flow.record.fieldtypes import datetime as dt from dissect.target.filesystem import VirtualFilesystem -from dissect.target.plugins.os.unix.log.auth import AuthLogRecord, AuthPlugin +from dissect.target.plugins.os.unix.log.auth import AuthPlugin from tests._utils import absolute_path @@ -26,9 +29,8 @@ def test_auth_plugin(target_unix, fs_unix: VirtualFilesystem): results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Europe/Amsterdam")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem): @@ -50,9 +52,8 @@ def test_auth_plugin_with_gz(target_unix, fs_unix: VirtualFilesystem): results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("Pacific/Honolulu")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem): @@ -74,9 +75,8 @@ def test_auth_plugin_with_bz(target_unix, fs_unix: VirtualFilesystem): results = list(target_unix.authlog()) assert len(results) == 10 - assert isinstance(results[0], type(AuthLogRecord())) assert results[-1].ts == dt(2022, 11, 14, 6, 39, 1, tzinfo=ZoneInfo("America/Nuuk")) - assert results[-1].message == "CRON[1]: pam_unix(cron:session): session opened for user root by (uid=0)" + assert results[-1].message == "pam_unix(cron:session): session opened for user root by (uid=0)" def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem): @@ -96,7 +96,193 @@ def test_auth_plugin_year_rollover(target_unix, fs_unix: VirtualFilesystem): assert len(results) == 2 results.reverse() - assert isinstance(results[0], type(AuthLogRecord())) - assert isinstance(results[1], type(AuthLogRecord())) assert results[0].ts == dt(2021, 12, 31, 3, 14, 0, tzinfo=ZoneInfo("Etc/UTC")) assert results[1].ts == dt(2022, 1, 1, 13, 37, 0, tzinfo=ZoneInfo("Etc/UTC")) + + +@pytest.mark.parametrize( + "message, results", + [ + pytest.param( + "Mar 29 10:43:01 ubuntu-1 sshd[1193]: Accepted password for test_user from 8.8.8.8 port 52942 ssh2", + { + "service": "sshd", + "pid": "1193", + "action": "accepted authentication", + "authentication_type": "password", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 52942, + }, + id="sshd: accepted password", + ), + pytest.param( + "Jun 4 22:14:15 ubuntu-1 sshd[41458]: Failed password for root from 8.8.8.8 port 22 ssh2", + { + "service": "sshd", + "pid": "41458", + "action": "failed authentication", + "authentication_type": "password", + "user": "root", + "remote_ip": "8.8.8.8", + "port": 22, + }, + id="sshd: failed password", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Accepted publickey for test_user " + "from 8.8.8.8 port 12345 ssh2: RSA SHA256:123456789asdfghjklöertzuio", + { + "service": "sshd", + "pid": "1361", + "action": "accepted authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + "ssh_protocol": "ssh2", + "encryption_algorithm": "RSA", + "hash_algorithm": "SHA256", + "key_hash": "123456789asdfghjklöertzuio", + }, + id="sshd: accepted publickey", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: Failed publickey for test_user from 8.8.8.8 port 12345 ssh2.", + { + "service": "sshd", + "pid": "1361", + "action": "failed authentication", + "authentication_type": "publickey", + "user": "test_user", + "remote_ip": "8.8.8.8", + "port": 12345, + }, + id="sshd: failed publickey", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 sshd[1291]: Server listening on 127.0.0.1 port 22.", + { + "service": "sshd", + "pid": "1291", + "host_ip": "127.0.0.1", + "port": 22, + }, + id="sshd: listening", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened for user test_user by (uid=0)", + { + "service": "sshd", + "pid": "1361", + "action": "session opened", + "user": "test_user", + "user_uid": None, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:08:09 ubuntu-1 sshd[1361]: pam_unix(sshd:session): session opened " + "for user root(uid=0) by (uid=0)", + { + "service": "sshd", + "pid": "1361", + "action": "session opened", + "user": "root", + "user_uid": 0, + "by_uid": 0, + }, + id="sshd: pam_unix", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: Watching system buttons " + "on /dev/input/event0 (Power Button)", + { + "service": "systemd-logind", + "pid": "1118", + "action": "Watching system buttons", + "device": "/dev/input/event0", + "device_name": "Power Button", + }, + id="systemd-logind: watching system buttons", + ), + pytest.param( + "Mar 27 13:06:56 ubuntu-1 systemd-logind[1118]: New seat seat0.", + { + "service": "systemd-logind", + "pid": "1118", + "action": "New seat", + "seat": "seat0", + }, + id="systemd-logind: new seat", + ), + pytest.param( + "Mar 27 13:10:08 ubuntu-1 sudo: ubuntu : TTY=pts/0 ; PWD=/home/test_user ; " + "USER=root ; COMMAND=/usr/bin/apt-key add -", + { + "service": "sudo", + "pid": None, + "tty": "pts/0", + "pwd": "/home/test_user", + "effective_user": "root", + "command": "/usr/bin/apt-key add -", + }, + id="sudo: command", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1521]: Successful su for user by root", + {"service": "su", "pid": "1521", "su_result": "success", "user": "user", "by": "root"}, + id="su: success", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 su[1531]: 'su root' failed for user by root", + { + "service": "su", + "pid": "1531", + "su_result": "failed", + "command": "su root", + "user": "user", + "by": "root", + }, + id="su: failed", + ), + pytest.param( + "Apr 3 12:32:23 ubuntu-1 pkexec[1531]: user: Executing command [USER=root] " + "[TTY=unknown] [CWD=/home/user] [COMMAND=/usr/lib/update-notifier/package-system-locked]", + { + "service": "pkexec", + "pid": "1531", + "action": "executing command", + "user": "user", + "effective_user": "root", + "tty": "unknown", + "cwd": "/home/user", + "command": "/usr/lib/update-notifier/package-system-locked", + }, + id="pkexec: executing command", + ), + pytest.param( + "Mar 27 13:17:01 ubuntu-1 CRON[2623]: pam_unix(cron:session): session closed for user root", + { + "service": "CRON", + "pid": "2623", + "action": "session closed", + "user": "root", + }, + id="cron: pam_unix", + ), + ], +) +def test_auth_plugin_additional_fields( + target_unix, fs_unix: VirtualFilesystem, tmp_path: Path, message: str, results: dict[str, Union[str, int]] +): + data_path = tmp_path / "auth.log" + data_path.write_text(message) + fs_unix.map_file("var/log/auth.log", data_path) + + target_unix.add_plugin(AuthPlugin) + record = list(target_unix.authlog())[0] + + for key, value in results.items(): + assert getattr(record, key) == value