Skip to content

Commit

Permalink
fix issue around role/taskfile detection (#255)
Browse files Browse the repository at this point in the history
Signed-off-by: hirokuni-kitahara <[email protected]>
  • Loading branch information
hirokuni-kitahara authored Sep 2, 2024
1 parent 56e2307 commit d2ed516
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 59 deletions.
39 changes: 20 additions & 19 deletions ansible_risk_insight/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
get_role_metadata,
split_name_and_version,
)
from ..finder import list_scan_target, update_the_yaml_target
from ..finder import list_scan_target, update_the_yaml_target, get_yml_list
import ansible_risk_insight.logger as logger


Expand Down Expand Up @@ -72,9 +72,7 @@ def __init__(self):
help="if true, do scanning per playbook, role or taskfile (this reduces memory usage while scanning)",
)
parser.add_argument(
"--fix",
action="store_true",
help="if true, fix the scanned playbook after performing the inpline replace with ARI suggestions"
"--fix", action="store_true", help="if true, fix the scanned playbook after performing the inpline replace with ARI suggestions"
)
parser.add_argument(
"--task-num-threshold",
Expand Down Expand Up @@ -230,46 +228,46 @@ def run(self):
logger.debug("ARI suggestion file path: %s", ari_suggestion_file_path)
with open(ari_suggestion_file_path) as f:
ari_suggestion_data = json.load(f)
targets = ari_suggestion_data['targets']
targets = ari_suggestion_data["targets"]
for i in reversed(range(len(targets))):
logger.debug("Nodes dir number: %s", i)
nodes = targets[i]['nodes']
nodes = targets[i]["nodes"]
line_number_list = []
mutated_yaml_list = []
target_file_path = ''
temp_file_path = ''
target_file_path = ""
temp_file_path = ""
for j in range(1, len(nodes)):
node_rules = nodes[j]['rules']
node_rules = nodes[j]["rules"]
for k in reversed(range(len(node_rules))): # loop through from rule 11, as that has the mutation
w007_rule = node_rules[k]
if (w007_rule['rule']['rule_id']).lower() == 'w007':
if not w007_rule.get('verdict') and w007_rule:
if (w007_rule["rule"]["rule_id"]).lower() == "w007":
if not w007_rule.get("verdict") and w007_rule:
break
mutated_yaml = w007_rule['detail']['mutated_yaml']
if mutated_yaml == '':
mutated_yaml = w007_rule["detail"]["mutated_yaml"]
if mutated_yaml == "":
break
temp_data = index_data[each]
if w007_rule['file'][0] not in temp_data:
target_file_path = os.path.join(args.target_name, temp_data, w007_rule['file'][0])
if temp_file_path != '' and target_file_path != temp_file_path:
if w007_rule["file"][0] not in temp_data:
target_file_path = os.path.join(args.target_name, temp_data, w007_rule["file"][0])
if temp_file_path != "" and target_file_path != temp_file_path:
update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list)
line_number_list = []
mutated_yaml_list = []
mutated_yaml_list.append(mutated_yaml)
temp_file_path = target_file_path
else:
target_file_path = os.path.join(args.target_name, temp_data)
if temp_file_path != '' and target_file_path != temp_file_path:
if temp_file_path != "" and target_file_path != temp_file_path:
update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list)
line_number_list = []
mutated_yaml_list = []
mutated_yaml_list.append(mutated_yaml)
temp_file_path = target_file_path
line_number = w007_rule['file'][1]
line_number = w007_rule["file"][1]
line_number_list.append(line_number)
break # w007 rule with mutated yaml is processed, breaking out of iteration
try:
if target_file_path == '' or not mutated_yaml_list or not line_number_list:
if target_file_path == "" or not mutated_yaml_list or not line_number_list:
continue
update_the_yaml_target(target_file_path, line_number_list, mutated_yaml_list)
except Exception as ex:
Expand All @@ -280,6 +278,8 @@ def run(self):
root_install = not args.skip_install
if not silent and not pretty:
print("Start scanning")
_yml_list = get_yml_list(target_name)
yaml_label_list = [(x["filepath"], x["label"], x["role_info"]) for x in _yml_list]
c.evaluate(
type=args.target_type,
name=target_name,
Expand All @@ -294,6 +294,7 @@ def run(self):
include_test_contents=args.include_tests,
load_all_taskfiles=load_all_taskfiles,
save_only_rule_result=save_only_rule_result,
yaml_label_list=yaml_label_list,
objects=args.objects,
out_dir=args.out_dir,
)
35 changes: 21 additions & 14 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

fqcn_module_name_re = re.compile(r"^[a-z0-9_]+\.[a-z0-9_]+\.[a-z0-9_]+$")
module_name_re = re.compile(r"^[a-z0-9_.]+$")
taskfile_path_in_role_re = r".*roles/[^/]+/(tasks|handlers)/.*\.ya?ml"

module_dir_patterns = [
"library",
Expand Down Expand Up @@ -417,7 +418,7 @@ def find_module_dirs(role_root_dir):
return module_dirs


def search_taskfiles_for_playbooks(path, taskfile_dir_paths=[]):
def search_taskfiles_for_playbooks(path, taskfile_dir_paths: list = []):
# must copy the input here; otherwise, the added items are kept forever
search_targets = [p for p in taskfile_dir_paths]
for playbook_taskfile_dir_pattern in playbook_taskfile_dir_patterns:
Expand All @@ -428,7 +429,7 @@ def search_taskfiles_for_playbooks(path, taskfile_dir_paths=[]):
found = safe_glob(patterns, recursive=True)
for f in found:
# taskfiles in role will be loaded when the role is loaded, so skip
if "/roles/" in f:
if re.match(taskfile_path_in_role_re, f):
continue
# if it is a playbook, skip it
if could_be_playbook(f):
Expand Down Expand Up @@ -701,6 +702,12 @@ def get_role_info_from_path(fpath: str):

_path = relative_path.rsplit(_target, 1)[0]
role_name = _path.split("/")[-1]

# if the path is something like "xxxx/roles/tasks"
# it is not an actual role, so skip it
if role_name == p.strip("/"):
continue

role_path = os.path.join(parent_dir, _path)
found = True
break
Expand Down Expand Up @@ -951,8 +958,8 @@ def check_and_add_diff_lines(start_line, stop_line, lines, data_copy):

def check_diff_and_copy_olddata_to_newdata(line_number_list, lines, new_data):
"""
Function to find the old lines which weren't mutated by ARI rules,
it need to be copied to new content as is
Function to find the old lines which weren't mutated by ARI rules,
it need to be copied to new content as is
"""
if line_number_list and isinstance(line_number_list, list):
new_content_last_set = line_number_list[-1]
Expand All @@ -965,12 +972,12 @@ def check_diff_and_copy_olddata_to_newdata(line_number_list, lines, new_data):

def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy):
"""
Function to get the leading space for the new ARI mutated line, with
its equivaltent old line with space similar to the old line
Function to get the leading space for the new ARI mutated line, with
its equivaltent old line with space similar to the old line
"""
line_with_adjusted_space = update_line_with_space(new_line, old_line, leading_spaces)
data_copy.append(line_with_adjusted_space)
return ''
return ""


def update_the_yaml_target(file_path, line_number_list, new_content_list):
Expand Down Expand Up @@ -1005,7 +1012,7 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
temp_content = []
start = start_line_number - 1
end = stop_line_number - 1
data_copy.append('\n')
data_copy.append("\n")
for i in range(start, end):
line_number = i
if len(lines) == i:
Expand All @@ -1027,15 +1034,15 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
lines[line_number] = " " * leading_spaces + new_line_content
data_copy.append(lines[line_number])
else:
new_line_key = new_line_content.split(':')
new_key = new_line_key[0].strip(' ')
new_line_key = new_line_content.split(":")
new_key = new_line_key[0].strip(" ")
for k in range(start, end):
if k < len(lines):
old_line_key = lines[k].split(":")
if "---" in old_line_key[0]:
continue
old_key = old_line_key[0].strip(' ')
if '-' in old_line_key[0] and ':' not in lines[k] and '-' in new_key:
old_key = old_line_key[0].strip(" ")
if "-" in old_line_key[0] and ":" not in lines[k] and "-" in new_key:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
Expand All @@ -1052,10 +1059,10 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
elif old_key == new_key:
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
break
elif old_key.rstrip('\n') == new_key:
elif old_key.rstrip("\n") == new_key:
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
break
elif old_key.rstrip('\n') in new_key.split('.'):
elif old_key.rstrip("\n") in new_key.split("."):
new_line_content = update_and_append_new_line(new_line_content, lines[k], 0, data_copy)
break
if new_line_content: # if there wasn't a match with old line, so this seems updated by ARI and added to w/o any change
Expand Down
62 changes: 46 additions & 16 deletions ansible_risk_insight/model_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def load_files(path, basedir="", yaml_label_list=None, role_name="", collection_
return []

files = []
for (fpath, label, role_info) in yaml_label_list:
for fpath, label, role_info in yaml_label_list:
if not fpath:
continue
if not label:
Expand Down Expand Up @@ -486,7 +486,7 @@ def load_play(
if task_blocks is None:
continue
last_task_line_num = -1
for (task_dict, task_jsonpath) in task_blocks:
for task_dict, task_jsonpath in task_blocks:
task_loading["total"] += 1
i = task_count
error = None
Expand Down Expand Up @@ -533,7 +533,7 @@ def load_play(
if task_blocks is None:
continue
last_task_line_num = -1
for (task_dict, task_jsonpath) in task_blocks:
for task_dict, task_jsonpath in task_blocks:
i = task_count
task_loading["total"] += 1
error = None
Expand Down Expand Up @@ -580,7 +580,7 @@ def load_play(
if task_blocks is None:
continue
last_task_line_num = -1
for (task_dict, task_jsonpath) in task_blocks:
for task_dict, task_jsonpath in task_blocks:
i = task_count
task_loading["total"] += 1
error = None
Expand Down Expand Up @@ -627,7 +627,7 @@ def load_play(
if task_blocks is None:
continue
last_task_line_num = -1
for (task_dict, task_jsonpath) in task_blocks:
for task_dict, task_jsonpath in task_blocks:
i = task_count
task_loading["total"] += 1
error = None
Expand Down Expand Up @@ -872,10 +872,11 @@ def load_playbooks(
patterns.append(os.path.join(path, "tests/**/*.ya?ml"))
patterns.append(os.path.join(path, "molecule/**/*.ya?ml"))
candidates = safe_glob(patterns, recursive=True)
candidates = [(c, False) for c in candidates]

# add files if yaml_label_list is given
if yaml_label_list:
for (fpath, label, role_info) in yaml_label_list:
for fpath, label, role_info in yaml_label_list:
if label == "playbook":
# if it is a playbook in role, it should be scanned by load_role
if role_info:
Expand All @@ -885,17 +886,21 @@ def load_playbooks(
if not _fpath.startswith(basedir):
_fpath = os.path.join(basedir, fpath)
if _fpath not in candidates:
candidates.append(_fpath)
candidates.append((_fpath, True))

playbooks = []
playbook_names = []
candidates = sorted(list(set(candidates)))
for fpath in candidates:
candidates = sorted(candidates, key=lambda x: x[0])
loaded = set()
for fpath, from_list in candidates:
if fpath in loaded:
continue

if could_be_playbook(fpath=fpath) and could_be_playbook_detail(fpath=fpath):
relative_path = ""
if fpath.startswith(path):
relative_path = fpath[len(path) :]
if "/roles/" in relative_path:
if "/roles/" in relative_path and not from_list:
continue
p = None
try:
Expand All @@ -920,6 +925,7 @@ def load_playbooks(
else:
playbooks.append(p.defined_in)
playbook_names.append(p.defined_in)
loaded.add(fpath)
if not load_children:
playbooks = sorted(playbooks)
return playbooks
Expand Down Expand Up @@ -1192,10 +1198,33 @@ def load_roles(
if found_roles:
roles_dir_path = found_roles[0]

def is_role_dir(found_dirs: list) -> bool:
# From ansible role doc
# if none of the following dirs are found, we don't treat it as a role dir
role_dir_patterns = set(
[
"tasks",
"handlers",
"templates",
"files",
"vars",
"defaults",
"meta",
"library",
"module_utils",
"lookup_plugins",
]
)
return len(role_dir_patterns.intersection(set(found_dirs))) > 0

role_dirs = []
if roles_dir_path:
dirs = os.listdir(roles_dir_path)
role_dirs = [os.path.join(roles_dir_path, dir_name) for dir_name in dirs]
dirs = sorted(os.listdir(roles_dir_path))
for dir_name in dirs:
candidate = os.path.join(roles_dir_path, dir_name)
dirs_in_cand = os.listdir(candidate)
if is_role_dir(dirs_in_cand):
role_dirs.append(candidate)

if include_test_contents:
test_targets_dir = os.path.join(path, "tests/integration/targets")
Expand All @@ -1215,7 +1244,7 @@ def load_roles(

# add role dirs if yaml_label_list is given
if yaml_label_list:
for (fpath, label, role_info) in yaml_label_list:
for fpath, label, role_info in yaml_label_list:
if role_info and isinstance(role_info, dict):
role_path = role_info.get("path", "")
_role_path = role_path
Expand Down Expand Up @@ -1734,12 +1763,10 @@ def load_taskfiles(path, basedir="", yaml_label_list=None, load_children=True):
return []

taskfile_paths = search_taskfiles_for_playbooks(path)
if len(taskfile_paths) == 0:
return []

# add files if yaml_label_list is given
if yaml_label_list:
for (fpath, label, role_info) in yaml_label_list:
for fpath, label, role_info in yaml_label_list:
if label == "taskfile":
# if it is a taskfile in role, it should be scanned by load_role
if role_info:
Expand All @@ -1751,6 +1778,9 @@ def load_taskfiles(path, basedir="", yaml_label_list=None, load_children=True):
if _fpath not in taskfile_paths:
taskfile_paths.append(_fpath)

if len(taskfile_paths) == 0:
return []

taskfiles = []
for taskfile_path in taskfile_paths:
try:
Expand Down
2 changes: 1 addition & 1 deletion ansible_risk_insight/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ def run(self, load_data=None, load_json_path="", collection_name_of_project=""):
try:
label = "others"
if ld.yaml_label_list:
for (_fpath, _label, _) in ld.yaml_label_list:
for _fpath, _label, _ in ld.yaml_label_list:
if _fpath == file_path:
label = _label
f = load_file(
Expand Down
2 changes: 1 addition & 1 deletion ansible_risk_insight/ram_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def run(self):
if self._parallel:
joblib.Parallel(n_jobs=-1)(joblib.delayed(self.scan)(i, num, _type, _name) for (i, num, _type, _name) in input_list)
else:
for (i, num, _type, _name) in input_list:
for i, num, _type, _name in input_list:
self.scan(i, num, _type, _name)

def scan(self, i, num, type, name):
Expand Down
2 changes: 1 addition & 1 deletion ansible_risk_insight/rules/sample_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SampleRule(Rule):
name: str = "EchoTaskContent"
version: str = "v0.0.1"
severity: Severity = Severity.NONE
tags: tuple = ("sample")
tags: tuple = "sample"

def match(self, ctx: AnsibleRunContext) -> bool:
# specify targets to be checked
Expand Down
Loading

0 comments on commit d2ed516

Please sign in to comment.