Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix line number detection bug #254

Merged
merged 2 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 206 additions & 33 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ def find_module_name(data_block):
return ""


def get_task_blocks(fpath="", yaml_str="", task_dict_list=None):
def get_task_blocks(
fpath="",
yaml_str="",
task_dict_list=None,
jsonpath_prefix="",
):
d = None
yaml_lines = ""
if yaml_str:
Expand Down Expand Up @@ -143,8 +148,9 @@ def get_task_blocks(fpath="", yaml_str="", task_dict_list=None):
if not isinstance(d, list):
return None, None
tasks = []
for task_dict in d:
task_dict_loop = flatten_block_tasks(task_dict)
for i, task_dict in enumerate(d):
jsonpath = f"{jsonpath_prefix}.{i}"
task_dict_loop = flatten_block_tasks(task_dict=task_dict, jsonpath_prefix=jsonpath)
tasks.extend(task_dict_loop)
return tasks, yaml_lines

Expand All @@ -159,7 +165,7 @@ def get_task_blocks(fpath="", yaml_str="", task_dict_list=None):
# - some_module2
# - some_module3
#
def flatten_block_tasks(task_dict, module_defaults={}):
def flatten_block_tasks(task_dict, jsonpath_prefix="", module_defaults={}):
if task_dict is None:
return []
tasks = []
Expand All @@ -176,35 +182,202 @@ def flatten_block_tasks(task_dict, module_defaults={}):

# load normal tasks first
if "block" in task_dict:
task_jsonpath = jsonpath_prefix + ".block"
tasks_in_block = task_dict.get("block", [])
if isinstance(tasks_in_block, list):
for t_dict in tasks_in_block:
tasks_in_item = flatten_block_tasks(t_dict, _module_defaults)
for i, t_dict in enumerate(tasks_in_block):
_current_prefix = task_jsonpath + f".{i}"
tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults)
tasks.extend(tasks_in_item)
else:
tasks = [task_dict]
tasks = [(task_dict, task_jsonpath)]
else:
tasks = [task_dict]
task_jsonpath = jsonpath_prefix
tasks = [(task_dict, task_jsonpath)]

# then add "rescue" block
if "rescue" in task_dict:
task_jsonpath = jsonpath_prefix + ".rescue"
tasks_in_rescue = task_dict.get("rescue", [])
if isinstance(tasks_in_rescue, list):
for t_dict in tasks_in_rescue:
tasks_in_item = flatten_block_tasks(t_dict, _module_defaults)
for i, t_dict in enumerate(tasks_in_rescue):
_current_prefix = task_jsonpath + f".{i}"
tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults)
tasks.extend(tasks_in_item)

# finally add "always" block
if "always" in task_dict:
task_jsonpath = jsonpath_prefix + ".always"
tasks_in_always = task_dict.get("always", [])
if isinstance(tasks_in_always, list):
for t_dict in tasks_in_always:
tasks_in_item = flatten_block_tasks(t_dict, _module_defaults)
for i, t_dict in enumerate(tasks_in_always):
_current_prefix = task_jsonpath + f".{i}"
tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults)
tasks.extend(tasks_in_item)

return tasks


def identify_lines_with_jsonpath(fpath: str = "", yaml_str: str = "", jsonpath: str = "") -> tuple[str, tuple[int, int]]:
if not jsonpath:
return None, None

d = None
yaml_lines = ""
if yaml_str:
try:
d = yaml.load(yaml_str, Loader=Loader)
yaml_lines = yaml_str
except Exception as e:
logger.debug("failed to load this yaml string to identify lines; {}".format(e.args[0]))
return None, None
elif fpath:
if not os.path.exists(fpath):
return None, None
with open(fpath, "r") as file:
try:
yaml_lines = file.read()
d = yaml.load(yaml_lines, Loader=Loader)
except Exception as e:
logger.debug("failed to load this yaml file to identify lines; {}".format(e.args[0]))
return None, None
if not d:
return None, None

path_parts = jsonpath.strip(".").split(".")
current_lines = yaml_lines
current_line_num = 1
line_num_tuple = None
for p in path_parts:
if p == "plays":
pass
elif p in ["pre_tasks", "tasks", "post_tasks", "handlers", "block", "rescue", "always"]:
blocks = find_child_yaml_block(current_lines, key=p, line_num_offset=current_line_num)
current_lines, line_num_tuple = blocks[0]
current_line_num = line_num_tuple[0]
else:
try:
p_num = int(p)
blocks = find_child_yaml_block(current_lines, line_num_offset=current_line_num)
current_lines, line_num_tuple = blocks[p_num]
current_line_num = line_num_tuple[0]
except Exception:
print(p)
pass
return current_lines, line_num_tuple


def find_child_yaml_block(yaml_str: str, key: str = "", line_num_offset: int = -1) -> list:
skip_condition_funcs = [lambda x: x.strip().startswith("#"), lambda x: x.strip() == "---"]

def match_condition_func(x):
if key:
return x.strip().startswith(f"{key}:")
else:
return x.strip().startswith("- ")

def get_indent_level(x):
return len(x) - len(x.lstrip())

top_level_indent = 100
for line in yaml_str.splitlines():
skip = False
for skip_cond_func in skip_condition_funcs:
if skip_cond_func(line):
skip = True
break
if skip:
continue
if match_condition_func(line):
current_indent = get_indent_level(line)
if current_indent < top_level_indent:
top_level_indent = current_indent
if top_level_indent == 100:
return []

blocks = []
line_buffer = []
isolated_line_buffer = []
buffer_begin = -1
if key:
for i, line in enumerate(yaml_str.splitlines()):
line_num = i + 1
current_indent = get_indent_level(line)
if current_indent == top_level_indent:
if line_buffer and not blocks:
block_str = "\n".join(line_buffer)
begin = buffer_begin
end = line_num - 1
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
if match_condition_func(line):
buffer_begin = line_num + 1
if buffer_begin > 0 and line_num >= buffer_begin:
line_buffer.append(line)
if line_buffer and not blocks:
block_str = "\n".join(line_buffer)
begin = buffer_begin
end = line_num
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
else:
for i, line in enumerate(yaml_str.splitlines()):
line_num = i + 1
current_indent = get_indent_level(line)
if current_indent == top_level_indent:
new_block = False
if match_condition_func(line):
skip = False
for skip_cond_func in skip_condition_funcs:
if skip_cond_func(line):
skip = True
break
if not skip:
new_block = True
if new_block:
if line_buffer:
block_str = ""
if isolated_line_buffer:
block_str += "\n".join(isolated_line_buffer)
buffer_begin = 1
block_str += "\n".join(line_buffer)
begin = buffer_begin
end = line_num - 1
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
line_buffer = []
isolated_line_buffer = []
buffer_begin = line_num
line_buffer.append(line)
else:
if buffer_begin < 0:
isolated_line_buffer.append(line)
else:
line_buffer.append(line)
if line_buffer:
block_str = ""
if isolated_line_buffer:
block_str += "\n".join(isolated_line_buffer)
block_str += "\n".join(line_buffer)
begin = buffer_begin
end = line_num
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
return blocks


def search_module_files(path, module_dir_paths=[]):
file_list = []
# must copy the input here; otherwise, the added items are kept forever
Expand Down Expand Up @@ -741,36 +914,36 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1):

def update_line_with_space(new_line_content, old_line_content, leading_spaces=0):
"""
Returns the line of the input lines with mutation, having spaces
exactly same as input yaml lines
Returns the line of the input lines with mutation, having spaces
exactly same as input yaml lines
"""
new_line_content = new_line_content.lstrip(' ')
new_line_content = new_line_content.lstrip(" ")
if not leading_spaces:
leading_spaces = len(old_line_content) - len(old_line_content.lstrip())
return ' ' * leading_spaces + new_line_content
return " " * leading_spaces + new_line_content


def populate_new_data_list(data, line_number_list):
"""
Function to check diff in line between the
first mutated results, and then copy the in
between lines to mutated data lines
Function to check diff in line between the
first mutated results, and then copy the in
between lines to mutated data lines
"""
input_line_number = 0
for each in line_number_list:
input_line_number = int(each.lstrip("L").split("-")[0])
break
temp_data = data.splitlines(keepends=True)
return temp_data[0:input_line_number - 1]
return temp_data[0 : input_line_number - 1]


def check_and_add_diff_lines(start_line, stop_line, lines, data_copy):
"""
Function to check diff in line between the mutated results,
and then copy the in between lines to mutated data lines
Function to check diff in line between the mutated results,
and then copy the in between lines to mutated data lines
"""
diff_in_line = stop_line - start_line
data_copy.append('\n')
data_copy.append("\n")
for i in range(start_line, (start_line + diff_in_line) - 1):
line = lines[i]
data_copy.append(line)
Expand Down Expand Up @@ -803,7 +976,7 @@ def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy):
def update_the_yaml_target(file_path, line_number_list, new_content_list):
try:
# Read the original YAML file
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
data = file.read()
yaml = FormattedYAML(
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
Expand Down Expand Up @@ -845,32 +1018,32 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
if 0 <= line_number < len(lines):
# Preserve the original indentation
old_line_content = lines[line_number]
if '---' in old_line_content:
if "---" in old_line_content:
continue
if new_line_content in old_line_content:
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
temp_content.append(new_line_content)
new_line_content = new_line_content.lstrip(' ')
lines[line_number] = ' ' * leading_spaces + new_line_content
new_line_content = new_line_content.lstrip(" ")
lines[line_number] = " " * leading_spaces + new_line_content
data_copy.append(lines[line_number])
else:
new_line_key = new_line_content.split(':')
new_key = new_line_key[0].strip(' ')
for k in range(start, end):
if k < len(lines):
old_line_key = lines[k].split(':')
if '---' in old_line_key[0]:
old_line_key = lines[k].split(":")
if "---" in old_line_key[0]:
continue
old_key = old_line_key[0].strip(' ')
if '-' in old_line_key[0] and ':' not in lines[k] and '-' in new_key:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
for i in range(k, k + diff_in_lines):
if lines[i] == '\n':
if lines[i] == "\n":
lines.pop(i - 1)
break
elif i < len(lines) and ':' not in lines[i]:
elif i < len(lines) and ":" not in lines[i]:
lines.pop(i)
else:
break
Expand All @@ -893,12 +1066,12 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
# and copy the old content that's not updated by ARI mutation
data_copy = check_diff_and_copy_olddata_to_newdata(line_number_list, lines, data_copy)
# Join the lines back to a single string
updated_data = ''.join(data_copy)
updated_data = "".join(data_copy)
# Parse the updated YAML content to ensure it is valid
updated_parsed_data = yaml.load(updated_data)
# Write the updated YAML content back to the file
if updated_parsed_data:
with open(file_path, 'w') as file:
with open(file_path, "w") as file:
yaml.dump(updated_parsed_data, file)
except Exception as ex:
logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex)
Loading
Loading