Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/linux_mcp_server/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ class CommandGroup(BaseModel):
# === Services ===
"list_services": CommandGroup(
commands={
"default": CommandSpec(args=("systemctl", "list-units", "--type=service", "--all", "--no-pager")),
"default": CommandSpec(
args=("systemctl", "list-units", "--type=service", "--all", "--no-pager", "--output=json")
),
}
),
"running_services": CommandGroup(
Expand All @@ -103,12 +105,14 @@ class CommandGroup(BaseModel):
),
"service_status": CommandGroup(
commands={
"default": CommandSpec(args=("systemctl", "status", "{service_name}", "--no-pager", "--full")),
"default": CommandSpec(args=("systemctl", "show", "{service_name}", "--no-pager")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really the same thing at all ... systemctl show returns the complete expanded list of properties which includes some status information, but systemctl status provides the most relevant information included information about things like cgroups, and journalctl logs that aren't in the systemctl show.

show also eats a lot more tokens of context.

$ systemctl show avahi-daemon.service --no-pager  | wc -c
8729
$ systemctl status avahi-daemon.service --no-pager  | wc -c
1830

(divide by 3 or so to get tokens.) From my perspective, returning structured output has some advantages we get extra validation, it works better if someone is trying to use the MCP in "code mode", and models are good at reading JSON - better than humans. but we still need to try and focus the result on the most relevant information or we are going to get bad perfomance.

[This is an advantage of guarded command execution - the model itself can figure out what information it needs and write commands to get just that.]

}
),
"service_logs": CommandGroup(
commands={
"default": CommandSpec(args=("journalctl", "-u", "{service_name}", "-n", "{lines}", "--no-pager")),
"default": CommandSpec(
args=("journalctl", "-u", "{service_name}", "-n", "{lines}", "--no-pager", "--output=json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #275 - it uses entries: list[str] for log entries rather than JSON form, and that made sense to me - models are trained on human readable logs, and the JSON output from systemd logs have a lot of noise in them.

We don't want to expand:

Feb 22 15:56:09 fedora chronyd[1146]: Selected source 72.30.35.89 (2.fedora.pool.ntp.org)

to:

{
        "__SEQNUM_ID" : "9f9a06c9ff7f4d21aa0235bff47c87ad",
        "_SELINUX_CONTEXT" : "system_u:system_r:chronyd_t:s0",
        "_SYSTEMD_INVOCATION_ID" : "b62f1616c8b9453c980910f96f651610",
        "_HOSTNAME" : "fedora",
        "SYSLOG_FACILITY" : "3",
        "__REALTIME_TIMESTAMP" : "1771793899017757",
        "_COMM" : "chronyd",
        "SYSLOG_PID" : "1146",
        "_SYSTEMD_SLICE" : "system.slice",
        "_PID" : "1146",
        "SYSLOG_TIMESTAMP" : "Feb 22 15:58:19 ",
        "_BOOT_ID" : "60ef1101e7f34a23a1c486b7e72913ce",
        "__CURSOR" : "s=9f9a06c9ff7f4d21aa0235bff47c87ad;i=116ab3f;b=60ef1101e7f34a23a1c486b7e72913ce;m=728f9c938;t=64b6fe9f7561d;x=bfa5e635d89c531a",
        "_GID" : "992",
        "_EXE" : "/usr/bin/chronyd",
        "_MACHINE_ID" : "bbe3408bfdf24dae909eb91f836a87f4",
        "_CMDLINE" : "/usr/sbin/chronyd -n -F 2",
        "_RUNTIME_SCOPE" : "system",
        "_TRANSPORT" : "syslog",
        "__MONOTONIC_TIMESTAMP" : "30752229688",
        "_CAP_EFFECTIVE" : "2000400",
        "_SOURCE_REALTIME_TIMESTAMP" : "1771793899017664",
        "SYSLOG_IDENTIFIER" : "chronyd",
        "MESSAGE" : "Selected source 44.190.5.123 (2.fedora.pool.ntp.org)",
        "_UID" : "994",
        "PRIORITY" : "6",
        "_SYSTEMD_CGROUP" : "/system.slice/chronyd.service",
        "__SEQNUM" : "18262847",
        "_SYSTEMD_UNIT" : "chronyd.service"
}

),
}
),
# === Network ===
Expand Down
17 changes: 17 additions & 0 deletions src/linux_mcp_server/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,23 @@ def parse_service_count(stdout: str) -> int:
return count


def parse_systemctl_show(stdout: str) -> dict[str, str]:
"""Parse systemctl show output into key-value pairs.

Args:
stdout: Raw output from systemctl show command.

Returns:
Dictionary of key-value pairs.
"""
result: dict[str, str] = {}
for line in stdout.strip().split("\n"):
if "=" in line:
key, value = line.split("=", 1)
result[key.strip()] = value.strip()
return result


def parse_directory_listing(
stdout: str,
sort_by: str,
Expand Down
72 changes: 40 additions & 32 deletions src/linux_mcp_server/tools/services.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
"""Service management tools."""

import json
import typing as t

from fastmcp.exceptions import ToolError
from mcp.types import ToolAnnotations
from pydantic import Field

from linux_mcp_server.audit import log_tool_call
from linux_mcp_server.commands import get_command
from linux_mcp_server.formatters import format_service_logs
from linux_mcp_server.formatters import format_service_status
from linux_mcp_server.formatters import format_services_list
from linux_mcp_server.parsers import parse_service_count
from linux_mcp_server.parsers import parse_systemctl_show
from linux_mcp_server.server import mcp
from linux_mcp_server.utils.decorators import disallow_local_execution_in_containers
from linux_mcp_server.utils.types import Host
Expand All @@ -27,27 +26,25 @@
@disallow_local_execution_in_containers
async def list_services(
host: Host = None,
) -> str:
) -> list[dict[str, str]]:
"""List all systemd services.

Retrieves all systemd service units with their load state, active state,
sub-state, and description. Also includes a count of currently running services.
sub-state, and description.

Returns:
list[dict[str, str]]: A list of dictionaries containing information about each service.

Raises:
ToolError: If an error occurs while listing services.
"""
cmd = get_command("list_services")
returncode, stdout, stderr = await cmd.run(host=host)

if returncode != 0:
return f"Error listing services: {stderr}"

# Get running services count
running_cmd = get_command("running_services")
returncode_summary, stdout_summary, _ = await running_cmd.run(host=host)
raise ToolError(f"Error listing services: {stderr}")

running_count = None
if returncode_summary == 0:
running_count = parse_service_count(stdout_summary)

return format_services_list(stdout, running_count)
return t.cast(list[dict[str, str]], json.loads(stdout))


@mcp.tool(
Expand All @@ -67,27 +64,34 @@ async def get_service_status(
),
],
host: Host = None,
) -> str:
) -> dict[str, str]:
"""Get status of a specific systemd service.

Retrieves detailed service information including active/enabled state,
main PID, memory usage, CPU time, and recent log entries from the journal.
main PID, memory usage, etc.

Returns:
A dictionary containing the service status information.

Raises:
ToolError: If there was an error getting the service status.
"""
# Ensure service name has .service suffix if not present
if not service_name.endswith(".service") and "." not in service_name:
service_name = f"{service_name}.service"

cmd = get_command("service_status")
_, stdout, stderr = await cmd.run(host=host, service_name=service_name)
returncode, stdout, stderr = await cmd.run(host=host, service_name=service_name)

# Note: systemctl status returns non-zero for inactive services, but that's expected
if not stdout and stderr:
# Service not found
if "not found" in stderr.lower() or "could not be found" in stderr.lower():
return f"Service '{service_name}' not found on this system."
return f"Error getting service status: {stderr}"
if returncode != 0:
raise ToolError(f"Error getting service status: {stderr}")

status = parse_systemctl_show(stdout)

if status.get("LoadState") == "not-found":
raise ToolError(f"Service '{service_name}' not found on this system.")

return format_service_status(stdout, service_name)
return status


@mcp.tool(
Expand All @@ -108,11 +112,17 @@ async def get_service_logs(
],
lines: t.Annotated[int, Field(description="Number of log lines to retrieve.", ge=1, le=10_000)] = 50,
host: Host = None,
) -> str:
) -> list[dict[str, str]]:
"""Get recent logs for a specific systemd service.

Retrieves journal entries for the specified service unit, including
timestamps, priority levels, and log messages.

Raises:
ToolError: If an error occurs while retrieving logs.

Returns:
list[dict[str, str]]: A list of dictionaries containing log entries.
"""
# Ensure service name has .service suffix if not present
if not service_name.endswith(".service") and "." not in service_name:
Expand All @@ -122,11 +132,9 @@ async def get_service_logs(
returncode, stdout, stderr = await cmd.run(host=host, service_name=service_name, lines=lines)

if returncode != 0:
if "not found" in stderr.lower() or "no entries" in stderr.lower():
return f"No logs found for service '{service_name}'. The service may not exist or has no log entries."
return f"Error getting service logs: {stderr}"
raise ToolError(f"Error getting service logs: {stderr}")

if is_empty_output(stdout):
return f"No log entries found for service '{service_name}'."
raise ToolError(f"No log entries found for service '{service_name}'.")

return format_service_logs(stdout, service_name, lines)
return t.cast(list[dict[str, str]], json.loads(stdout))
33 changes: 33 additions & 0 deletions tests/parsers/test_parse_systemctl_show.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Tests for parse_systemctl_show"""

import pytest

from linux_mcp_server.parsers import parse_systemctl_show


@pytest.mark.parametrize(
"stdout, expected",
[
(
"""
ActiveState=active
SubState=running
LoadState=loaded
""",
{
"ActiveState": "active",
"SubState": "running",
"LoadState": "loaded",
},
),
(
"""
Field=value
EmptyField=
""",
{"Field": "value", "EmptyField": ""},
),
],
)
def test_parse_systemctl_show(stdout, expected):
assert parse_systemctl_show(stdout) == expected
Loading
Loading