Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions ssg/controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,6 @@ class Level(ssg.entities.common.XCCDFEntity):
id (str): The unique identifier for the level.
inherits_from (str or None): The identifier of the level from which this level inherits, if any.

Args:
level_dict (dict): A dictionary containing the level data.

Returns:
Level: An instance of the Level class.
"""
Expand Down Expand Up @@ -805,7 +802,7 @@ def __init__(self, controls_dirs: List[str], env_yaml=None, existing_rules=None)
Initializes the Controls class.

Args:
controls_dir (str): The directory where control files are located.
controls_dirs (List[str]): The directory where control files are located.
env_yaml (str, optional): Path to the environment YAML file. Defaults to None.
existing_rules (dict, optional): Dictionary of existing rules. Defaults to None.
"""
Expand Down Expand Up @@ -972,7 +969,7 @@ def get_all_controls_dict(self, policy_id: str) -> Dict[str, Control]:
policy = self._get_policy(policy_id)
return policy.controls_by_id

def _get_policy(self, policy_id):
def _get_policy(self, policy_id) -> Policy:
"""
Retrieve a policy by its ID.

Expand All @@ -987,12 +984,12 @@ def _get_policy(self, policy_id):
"""
try:
policy = self.policies[policy_id]
except KeyError:
msg = "policy '%s' doesn't exist" % (policy_id)
raise ValueError(msg)
except KeyError as e:
msg = f"policy '{policy_id}' doesn't exist"
raise ValueError(msg) from e
return policy

def get_all_controls_of_level(self, policy_id, level_id):
def get_all_controls_of_level(self, policy_id: str, level_id: str) -> List[Control]:
"""
Retrieve all controls associated with a specific policy and level, including inherited levels.

Expand All @@ -1001,8 +998,8 @@ def get_all_controls_of_level(self, policy_id, level_id):
variables defined in lower levels.

Args:
policy_id (int): The unique identifier of the policy.
level_id (int): The unique identifier of the level within the policy.
policy_id (str): The unique identifier of the policy.
level_id (str): The unique identifier of the level within the policy.

Returns:
list: A list of controls that are eligible for the specified level, with variables
Expand All @@ -1012,7 +1009,7 @@ def get_all_controls_of_level(self, policy_id, level_id):
levels = policy.get_level_with_ancestors_sequence(level_id)
all_policy_controls = self.get_all_controls(policy_id)
eligible_controls = []
already_defined_variables = set()
already_defined_variables: Set[str] = set()
# we will go level by level, from top to bottom
# this is done to enable overriding of variables by higher levels
for lv in levels:
Expand All @@ -1036,7 +1033,7 @@ def _get_control_without_variables(variables_to_remove, control):
Remove specified variables from a control object.

Args:
variables_to_remove (list): A list of variable names to be removed from the control.
variables_to_remove (set): A set of variable names to be removed from the control.
control (object): The control object from which variables will be removed.

Returns:
Expand Down
48 changes: 26 additions & 22 deletions utils/controleval.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#!/usr/bin/python3
#!/usr/bin/env python3
import argparse
import collections
import json
import os
from typing import Dict, Optional, List, Set

import yaml

# NOTE: This is not to be confused with the https://pypi.org/project/ssg/
Expand Down Expand Up @@ -36,14 +38,14 @@
SSG_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))


def print_options(opts):
def print_options(opts) -> None:
if len(opts) > 0:
print("Available options are:\n - " + "\n - ".join(opts))
else:
print("The controls file is not written appropriately.")


def validate_args(ctrlmgr, args):
def validate_args(ctrlmgr: controls.ControlsManager, args: argparse.Namespace) -> None:
""" Validates that the appropriate args were given
and that they're valid entries in the control manager."""

Expand All @@ -63,7 +65,7 @@ def validate_args(ctrlmgr, args):
exit(1)


def get_available_products():
def get_available_products() -> List[str]:
products_dir = os.path.join(SSG_ROOT, "products")
try:
return os.listdir(products_dir)
Expand All @@ -72,15 +74,15 @@ def get_available_products():
exit(1)


def validate_product(product):
def validate_product(product: str) -> None:
products = get_available_products()
if product not in products:
print(f"Error: Product '{product}' is not valid.")
print_options(products)
exit(1)


def get_parameter_from_yaml(yaml_file: str, section: str) -> list:
def get_parameter_from_yaml(yaml_file: str, section: str) -> Optional[List]:
with open(yaml_file, 'r') as file:
try:
yaml_content = yaml.safe_load(file)
Expand All @@ -89,30 +91,32 @@ def get_parameter_from_yaml(yaml_file: str, section: str) -> list:
print(e)


def get_controls_from_profiles(controls: list, profiles_files: list, used_controls: set) -> set:
def get_controls_from_profiles(controls: List[controls.Control], profiles_files: List[str],
used_controls: Set) -> Set[str]:
for file in profiles_files:
selections = get_parameter_from_yaml(file, 'selections')
for selection in selections:
if any(selection.startswith(control) for control in controls):
used_controls.add(selection.split(':')[0])
if selections:
for selection in selections:
if any(selection.startswith(control) for control in controls):
used_controls.add(selection.split(':')[0])
return used_controls


def get_controls_used_by_products(ctrls_mgr: controls.ControlsManager, products: list) -> list:
used_controls = set()
controls = ctrls_mgr.policies.keys()
def get_policies_used_by_products(ctrls_mgr: controls.ControlsManager, products: List[str]) -> Set[str]:
used_policies = set()
policies = ctrls_mgr.policies.keys()
for product in products:
profiles_files = get_product_profiles_files(product)
used_controls = get_controls_from_profiles(controls, profiles_files, used_controls)
return used_controls
used_policies = get_controls_from_profiles(policies, profiles_files, used_policies)
return used_policies


def get_policy_levels(ctrls_mgr: object, control_id: str) -> list:
def get_policy_levels(ctrls_mgr: controls.ControlsManager, control_id: str) -> List[str]:
policy = ctrls_mgr._get_policy(control_id)
return policy.levels_by_id.keys()


def get_product_dir(product):
def get_product_dir(product) -> str:
validate_product(product)
return os.path.join(SSG_ROOT, "products", product)

Expand All @@ -122,7 +126,7 @@ def get_product_profiles_files(product: str) -> list:
return ssg.products.get_profile_files_from_root(product_yaml, product_yaml)


def get_product_yaml(product):
def get_product_yaml(product) -> str:
product_dir = get_product_dir(product)
product_yml = os.path.join(product_dir, "product.yml")
if os.path.exists(product_yml):
Expand All @@ -131,12 +135,12 @@ def get_product_yaml(product):
exit(1)


def load_product_yaml(product: str) -> yaml:
def load_product_yaml(product: str) -> ssg.products.Product:
product_yaml = get_product_yaml(product)
return ssg.products.load_product_yaml(product_yaml)


def load_controls_manager(controls_dir: str, product: str) -> object:
def load_controls_manager(controls_dir: str, product: str) -> controls.ControlsManager:
product_yaml = load_product_yaml(product)
product_controls_dir = os.path.join(product_yaml['product_dir'], 'controls')
ctrls_mgr = controls.ControlsManager([controls_dir, product_controls_dir],
Expand All @@ -145,13 +149,13 @@ def load_controls_manager(controls_dir: str, product: str) -> object:
return ctrls_mgr


def get_formatted_name(text_name):
def get_formatted_name(text_name: str) -> str:
for special_char in '-. ':
text_name = text_name.replace(special_char, '_')
return text_name


def count_implicit_status(ctrls, status_count):
def count_implicit_status(ctrls: List[controls.Control], status_count: Dict[str, int]) -> Dict[str, int]:
automated = status_count[controls.Status.AUTOMATED]
documentation = status_count[controls.Status.DOCUMENTATION]
inherently_met = status_count[controls.Status.INHERENTLY_MET]
Expand Down
58 changes: 44 additions & 14 deletions utils/controleval_metrics.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#!/usr/bin/python3
#!/usr/bin/env python3
import argparse
import os
import sys

from prometheus_client import CollectorRegistry, Gauge, generate_latest, write_to_textfile
from utils.controleval import (
count_controls_by_status,
count_rules_and_vars,
get_controls_used_by_products,
get_policies_used_by_products,
get_policy_levels,
load_controls_manager
load_controls_manager,
load_product_yaml
)

try:
Expand All @@ -34,7 +37,7 @@ def create_prometheus_content_metric(policy_id: str, registry: CollectorRegistry


def append_prometheus_content_metric(
metric: object, level: str, content_type: str, value: float) -> Gauge:
metric: Gauge, level: str, content_type: str, value: float) -> Gauge:
metric.labels(level=level, content_type=content_type).set(value)
return metric

Expand All @@ -47,7 +50,7 @@ def create_prometheus_policy_metric(policy_id: str, registry: CollectorRegistry)


def append_prometheus_policy_metric(
metric: object, level: str, status: str, value: float) -> Gauge:
metric: Gauge, level: str, status: str, value: float) -> Gauge:
metric.labels(level=level, status=status).set(value)
return metric

Expand Down Expand Up @@ -77,23 +80,48 @@ def get_prometheus_metrics_registry(
return registry


def prometheus(args):
ctrls_mgr = load_controls_manager(args.controls_dir, args.products[0])
used_controls = get_controls_used_by_products(ctrls_mgr, args.products)
registry = get_prometheus_metrics_registry(used_controls, ctrls_mgr)
def prometheus(args: argparse.Namespace) -> None:
"""
Generate Prometheus metrics for control policies across products.

Creates a single ControlsManager with all controls directories (root + all products)
to ensure all policies are visible. Product-specific controls override root controls
following the same precedence as the build system.
"""
registry = CollectorRegistry()

# Build a single ControlsManager with all control directories
all_controls_dirs = [args.controls_dir]
for product in sorted(args.products):
product_yaml = load_product_yaml(product)
product_controls_dir = os.path.join(product_yaml['product_dir'], 'controls')
all_controls_dirs.append(product_controls_dir)

# Use the first product's yaml for env_yaml (required by ControlsManager)
# This is arbitrary since we're loading all controls
first_product_yaml = load_product_yaml(sorted(args.products)[0])
ctrls_mgr = controls.ControlsManager(all_controls_dirs, dict(first_product_yaml))
ctrls_mgr.load()

# Find all policies used across all products
used_policies = get_policies_used_by_products(ctrls_mgr, args.products)

for policy_id in sorted(used_policies):
registry = get_prometheus_metrics(ctrls_mgr, policy_id, registry)

if args.output_file:
write_to_textfile(args.output_file, registry)
else:
metrics = generate_latest(registry)
print(metrics.decode('utf-8'))


subcmds = dict(
subcommands = dict(
prometheus=prometheus
)


def parse_arguments():
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Tool used to evaluate control files",
epilog="Usage example: utils/controleval.py prometheus -p rhel9")
Expand All @@ -108,16 +136,18 @@ def parse_arguments():
help="calculate and return benchmarks metrics in Prometheus format")
prometheus_parser.add_argument(
'-p', '--products', nargs='+', required=True,
help="list of products to process the respective controls files")
help=("list of products to process the respective controls files. "
"Metrics are aggregated across all specified products by building "
"a single ControlsManager from all control directories"))
prometheus_parser.add_argument(
'-f', '--output-file',
help="save policy metrics in a file instead of showing in stdout")
return parser.parse_args()


def main():
def main() -> None:
args = parse_arguments()
subcmds[args.subcmd](args)
subcommands[args.subcmd](args)


if __name__ == "__main__":
Expand Down
Loading