Skip to content

fix(terraform): Skip evaluating each.value expressions in try functions #6766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,18 @@ def terraform_try(*args: Any) -> Any:
"try evaluates all of its argument expressions in turn and returns the result of the first one that does not
produce any errors."
"""
accepted_exception = "'NoneType' object is not subscriptable"
arg_with_accepted_exception = ''
for arg in args:
try:
return evaluate(arg) if isinstance(arg, str) else arg
except Exception as e:
logging.warning(f"Error in evaluate_try of argument {arg} - {e}")
if str(e) == accepted_exception and not arg_with_accepted_exception:
arg_with_accepted_exception = arg
continue
if arg_with_accepted_exception:
return arg_with_accepted_exception
raise Exception(f"No argument can be evaluated for try of {args}")


Expand Down Expand Up @@ -369,9 +375,24 @@ def evaluate(input_str: str) -> Any:

# Don't use str.replace to make sure we replace just the first occurrence
input_str = f"{TRY_STR_REPLACEMENT}{input_str[3:]}"
evaluated = eval(input_str, {"__builtins__": None}, SAFE_EVAL_DICT) # nosec
input_str = replace_each_value_for_try(input_str)
try:
evaluated = eval(input_str, {"__builtins__": None}, SAFE_EVAL_DICT) # nosec
except Exception:
try_args = input_str.replace(TRY_STR_REPLACEMENT, "")[1:-1].split(',')
evaluated = terraform_try(*try_args)
else:
evaluated = eval(input_str, {"__builtins__": None}, SAFE_EVAL_DICT) # nosec
return evaluated if not isinstance(evaluated, str) else remove_unicode_null(evaluated)


def remove_unicode_null(input_str: str) -> str:
return input_str.replace("\u0000", "\\0")


def replace_each_value_for_try(input_str: str) -> str:
each_location = input_str.find('each.value')
if each_location >= 0:
comma_location = input_str.find(',', each_location)
input_str = input_str.replace(input_str[each_location:comma_location], "raise Exception()")
return input_str
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
resource "aws_lb_target_group" "example" {
name = "example-tg"
}

resource "aws_lb" "example" {
name = "example-lb"
}

resource "aws_lb_listener" "fail" {
for_each = { for idx, lr in var.listener_rules : tostring(idx) => lr }
load_balancer_arn = aws_lb.example.arn
port = try(each.value.port, var.default_port)
protocol = try(each.value.protocol, var.default_protocol)

default_action {
type = "forward"
target_group_arn = aws_lb_target_group.example.arn
}
}

resource "aws_lb_listener" "pass" {
for_each = { for idx, lr in var.listener_rules : tostring(idx) => lr }
load_balancer_arn = aws_lb.example.arn
port = try(each.value.port, var.default_port)
protocol = try(each.value.protocol, var.default_protocol)

default_action {
type = "redirect"
redirect {
port = "443"
protocol = "HTTPS"
status_code = "HTTP_301"
}
}
}

variable "listener_rules" {
default = [{
"port": "0",
"protocol": "HTTPS",
},
{
"port": "80",
"protocol": "HTTPS",
}]
}

variable "default_protocol" {
description = "Default protocol used across the listener and target group"
type = string
default = "HTTP"
}

variable "default_port" {
type = string
default = "80"
}
32 changes: 32 additions & 0 deletions tests/terraform/checks/resource/aws/test_ALBListenerHTTPS.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import unittest
from pathlib import Path

import hcl2

from checkov.common.models.enums import CheckResult
from checkov.terraform.checks.resource.aws.ALBListenerHTTPS import check
from checkov.runner_filter import RunnerFilter
from checkov.terraform.runner import Runner


class TestALBListenerHTTPS(unittest.TestCase):
Expand Down Expand Up @@ -94,6 +97,35 @@ def test_unknown_not_rendered(self):
result = check.scan_resource_conf(resource_conf)
self.assertEqual(CheckResult.UNKNOWN, result)

def test_try_function(self):
# given
test_files_dir = Path(__file__).parent / "example_ALBListenerHTTPS"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"aws_lb_listener.pass"
}

failing_resources = {
"aws_lb_listener.fail",
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], 1)
self.assertEqual(summary["failed"], 1)
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == '__main__':
unittest.main()
13 changes: 13 additions & 0 deletions tests/terraform/graph/variable_rendering/test_string_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,19 @@ def test_try_then_merge_block(self):
result = evaluate_terraform(input_str)
self.assertEqual(expected, result)

def test_try_each_int (self):
# input_str = 'try(each.value,80)'
input_str = 'try(each.value.port,80)'
expected = 80
result = evaluate_terraform(input_str)
self.assertEqual(expected, result)

def test_try_each_str(self):
input_str = 'try(each.value.port,HTTP)'
expected = 'HTTP'
result = evaluate_terraform(input_str)
self.assertEqual(expected, result)


@pytest.mark.parametrize(
"origin_str,str_to_replace,new_value,expected",
Expand Down
Loading