Skip to content

Commit

Permalink
fix line number detection bug (#254)
Browse files Browse the repository at this point in the history
* fix line number detection bug

Signed-off-by: hirokuni-kitahara <[email protected]>

* add testdata for line number test

Signed-off-by: hirokuni-kitahara <[email protected]>

---------

Signed-off-by: hirokuni-kitahara <[email protected]>
  • Loading branch information
hirokuni-kitahara authored Aug 29, 2024
1 parent 69f1917 commit 56e2307
Show file tree
Hide file tree
Showing 5 changed files with 316 additions and 48 deletions.
239 changes: 206 additions & 33 deletions ansible_risk_insight/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ def find_module_name(data_block):
return ""


def get_task_blocks(fpath="", yaml_str="", task_dict_list=None):
def get_task_blocks(
fpath="",
yaml_str="",
task_dict_list=None,
jsonpath_prefix="",
):
d = None
yaml_lines = ""
if yaml_str:
Expand Down Expand Up @@ -143,8 +148,9 @@ def get_task_blocks(fpath="", yaml_str="", task_dict_list=None):
if not isinstance(d, list):
return None, None
tasks = []
for task_dict in d:
task_dict_loop = flatten_block_tasks(task_dict)
for i, task_dict in enumerate(d):
jsonpath = f"{jsonpath_prefix}.{i}"
task_dict_loop = flatten_block_tasks(task_dict=task_dict, jsonpath_prefix=jsonpath)
tasks.extend(task_dict_loop)
return tasks, yaml_lines

Expand All @@ -159,7 +165,7 @@ def get_task_blocks(fpath="", yaml_str="", task_dict_list=None):
# - some_module2
# - some_module3
#
def flatten_block_tasks(task_dict, module_defaults={}):
def flatten_block_tasks(task_dict, jsonpath_prefix="", module_defaults={}):
if task_dict is None:
return []
tasks = []
Expand All @@ -176,35 +182,202 @@ def flatten_block_tasks(task_dict, module_defaults={}):

# load normal tasks first
if "block" in task_dict:
task_jsonpath = jsonpath_prefix + ".block"
tasks_in_block = task_dict.get("block", [])
if isinstance(tasks_in_block, list):
for t_dict in tasks_in_block:
tasks_in_item = flatten_block_tasks(t_dict, _module_defaults)
for i, t_dict in enumerate(tasks_in_block):
_current_prefix = task_jsonpath + f".{i}"
tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults)
tasks.extend(tasks_in_item)
else:
tasks = [task_dict]
tasks = [(task_dict, task_jsonpath)]
else:
tasks = [task_dict]
task_jsonpath = jsonpath_prefix
tasks = [(task_dict, task_jsonpath)]

# then add "rescue" block
if "rescue" in task_dict:
task_jsonpath = jsonpath_prefix + ".rescue"
tasks_in_rescue = task_dict.get("rescue", [])
if isinstance(tasks_in_rescue, list):
for t_dict in tasks_in_rescue:
tasks_in_item = flatten_block_tasks(t_dict, _module_defaults)
for i, t_dict in enumerate(tasks_in_rescue):
_current_prefix = task_jsonpath + f".{i}"
tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults)
tasks.extend(tasks_in_item)

# finally add "always" block
if "always" in task_dict:
task_jsonpath = jsonpath_prefix + ".always"
tasks_in_always = task_dict.get("always", [])
if isinstance(tasks_in_always, list):
for t_dict in tasks_in_always:
tasks_in_item = flatten_block_tasks(t_dict, _module_defaults)
for i, t_dict in enumerate(tasks_in_always):
_current_prefix = task_jsonpath + f".{i}"
tasks_in_item = flatten_block_tasks(t_dict, _current_prefix, _module_defaults)
tasks.extend(tasks_in_item)

return tasks


def identify_lines_with_jsonpath(fpath: str = "", yaml_str: str = "", jsonpath: str = "") -> tuple[str, tuple[int, int]]:
if not jsonpath:
return None, None

d = None
yaml_lines = ""
if yaml_str:
try:
d = yaml.load(yaml_str, Loader=Loader)
yaml_lines = yaml_str
except Exception as e:
logger.debug("failed to load this yaml string to identify lines; {}".format(e.args[0]))
return None, None
elif fpath:
if not os.path.exists(fpath):
return None, None
with open(fpath, "r") as file:
try:
yaml_lines = file.read()
d = yaml.load(yaml_lines, Loader=Loader)
except Exception as e:
logger.debug("failed to load this yaml file to identify lines; {}".format(e.args[0]))
return None, None
if not d:
return None, None

path_parts = jsonpath.strip(".").split(".")
current_lines = yaml_lines
current_line_num = 1
line_num_tuple = None
for p in path_parts:
if p == "plays":
pass
elif p in ["pre_tasks", "tasks", "post_tasks", "handlers", "block", "rescue", "always"]:
blocks = find_child_yaml_block(current_lines, key=p, line_num_offset=current_line_num)
current_lines, line_num_tuple = blocks[0]
current_line_num = line_num_tuple[0]
else:
try:
p_num = int(p)
blocks = find_child_yaml_block(current_lines, line_num_offset=current_line_num)
current_lines, line_num_tuple = blocks[p_num]
current_line_num = line_num_tuple[0]
except Exception:
print(p)
pass
return current_lines, line_num_tuple


def find_child_yaml_block(yaml_str: str, key: str = "", line_num_offset: int = -1) -> list:
skip_condition_funcs = [lambda x: x.strip().startswith("#"), lambda x: x.strip() == "---"]

def match_condition_func(x):
if key:
return x.strip().startswith(f"{key}:")
else:
return x.strip().startswith("- ")

def get_indent_level(x):
return len(x) - len(x.lstrip())

top_level_indent = 100
for line in yaml_str.splitlines():
skip = False
for skip_cond_func in skip_condition_funcs:
if skip_cond_func(line):
skip = True
break
if skip:
continue
if match_condition_func(line):
current_indent = get_indent_level(line)
if current_indent < top_level_indent:
top_level_indent = current_indent
if top_level_indent == 100:
return []

blocks = []
line_buffer = []
isolated_line_buffer = []
buffer_begin = -1
if key:
for i, line in enumerate(yaml_str.splitlines()):
line_num = i + 1
current_indent = get_indent_level(line)
if current_indent == top_level_indent:
if line_buffer and not blocks:
block_str = "\n".join(line_buffer)
begin = buffer_begin
end = line_num - 1
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
if match_condition_func(line):
buffer_begin = line_num + 1
if buffer_begin > 0 and line_num >= buffer_begin:
line_buffer.append(line)
if line_buffer and not blocks:
block_str = "\n".join(line_buffer)
begin = buffer_begin
end = line_num
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
else:
for i, line in enumerate(yaml_str.splitlines()):
line_num = i + 1
current_indent = get_indent_level(line)
if current_indent == top_level_indent:
new_block = False
if match_condition_func(line):
skip = False
for skip_cond_func in skip_condition_funcs:
if skip_cond_func(line):
skip = True
break
if not skip:
new_block = True
if new_block:
if line_buffer:
block_str = ""
if isolated_line_buffer:
block_str += "\n".join(isolated_line_buffer)
buffer_begin = 1
block_str += "\n".join(line_buffer)
begin = buffer_begin
end = line_num - 1
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
line_buffer = []
isolated_line_buffer = []
buffer_begin = line_num
line_buffer.append(line)
else:
if buffer_begin < 0:
isolated_line_buffer.append(line)
else:
line_buffer.append(line)
if line_buffer:
block_str = ""
if isolated_line_buffer:
block_str += "\n".join(isolated_line_buffer)
block_str += "\n".join(line_buffer)
begin = buffer_begin
end = line_num
if line_num_offset > 0:
begin += line_num_offset - 1
end += line_num_offset - 1
line_num_tuple = (begin, end)
blocks.append((block_str, line_num_tuple))
return blocks


def search_module_files(path, module_dir_paths=[]):
file_list = []
# must copy the input here; otherwise, the added items are kept forever
Expand Down Expand Up @@ -741,36 +914,36 @@ def list_scan_target(root_dir: str, task_num_threshold: int = -1):

def update_line_with_space(new_line_content, old_line_content, leading_spaces=0):
"""
Returns the line of the input lines with mutation, having spaces
exactly same as input yaml lines
Returns the line of the input lines with mutation, having spaces
exactly same as input yaml lines
"""
new_line_content = new_line_content.lstrip(' ')
new_line_content = new_line_content.lstrip(" ")
if not leading_spaces:
leading_spaces = len(old_line_content) - len(old_line_content.lstrip())
return ' ' * leading_spaces + new_line_content
return " " * leading_spaces + new_line_content


def populate_new_data_list(data, line_number_list):
"""
Function to check diff in line between the
first mutated results, and then copy the in
between lines to mutated data lines
Function to check diff in line between the
first mutated results, and then copy the in
between lines to mutated data lines
"""
input_line_number = 0
for each in line_number_list:
input_line_number = int(each.lstrip("L").split("-")[0])
break
temp_data = data.splitlines(keepends=True)
return temp_data[0:input_line_number - 1]
return temp_data[0 : input_line_number - 1]


def check_and_add_diff_lines(start_line, stop_line, lines, data_copy):
"""
Function to check diff in line between the mutated results,
and then copy the in between lines to mutated data lines
Function to check diff in line between the mutated results,
and then copy the in between lines to mutated data lines
"""
diff_in_line = stop_line - start_line
data_copy.append('\n')
data_copy.append("\n")
for i in range(start_line, (start_line + diff_in_line) - 1):
line = lines[i]
data_copy.append(line)
Expand Down Expand Up @@ -803,7 +976,7 @@ def update_and_append_new_line(new_line, old_line, leading_spaces, data_copy):
def update_the_yaml_target(file_path, line_number_list, new_content_list):
try:
# Read the original YAML file
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
data = file.read()
yaml = FormattedYAML(
# Ansible only uses YAML 1.1, but others files should use newer 1.2 (ruamel.yaml defaults to 1.2)
Expand Down Expand Up @@ -845,32 +1018,32 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
if 0 <= line_number < len(lines):
# Preserve the original indentation
old_line_content = lines[line_number]
if '---' in old_line_content:
if "---" in old_line_content:
continue
if new_line_content in old_line_content:
leading_spaces = len(lines[line_number]) - len(lines[line_number].lstrip())
temp_content.append(new_line_content)
new_line_content = new_line_content.lstrip(' ')
lines[line_number] = ' ' * leading_spaces + new_line_content
new_line_content = new_line_content.lstrip(" ")
lines[line_number] = " " * leading_spaces + new_line_content
data_copy.append(lines[line_number])
else:
new_line_key = new_line_content.split(':')
new_key = new_line_key[0].strip(' ')
for k in range(start, end):
if k < len(lines):
old_line_key = lines[k].split(':')
if '---' in old_line_key[0]:
old_line_key = lines[k].split(":")
if "---" in old_line_key[0]:
continue
old_key = old_line_key[0].strip(' ')
if '-' in old_line_key[0] and ':' not in lines[k] and '-' in new_key:
# diff_in_lines = len(lines) - len(new_lines)
leading_spaces = len(lines[k]) - len(lines[k].lstrip())
if diff_in_lines > len(lines):
for i in range(k, k + diff_in_lines):
if lines[i] == '\n':
if lines[i] == "\n":
lines.pop(i - 1)
break
elif i < len(lines) and ':' not in lines[i]:
elif i < len(lines) and ":" not in lines[i]:
lines.pop(i)
else:
break
Expand All @@ -893,12 +1066,12 @@ def update_the_yaml_target(file_path, line_number_list, new_content_list):
# and copy the old content that's not updated by ARI mutation
data_copy = check_diff_and_copy_olddata_to_newdata(line_number_list, lines, data_copy)
# Join the lines back to a single string
updated_data = ''.join(data_copy)
updated_data = "".join(data_copy)
# Parse the updated YAML content to ensure it is valid
updated_parsed_data = yaml.load(updated_data)
# Write the updated YAML content back to the file
if updated_parsed_data:
with open(file_path, 'w') as file:
with open(file_path, "w") as file:
yaml.dump(updated_parsed_data, file)
except Exception as ex:
logger.warning("YAML LINES: ARI fix update yaml by lines failed for file: '%s', with error: '%s'", file_path, ex)
Loading

0 comments on commit 56e2307

Please sign in to comment.