From 59a2b185222e330606690c95d43181ab6995c8de Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Tue, 21 Jan 2025 09:43:10 -0300 Subject: [PATCH 01/10] refactor: split cli methods to easy testing Signed-off-by: Felipe Zipitria --- APPROVED_TAGS | 97 ++++++++++++++ src/crs_linter/cli.py | 269 ++++++++++++++++++++------------------- src/crs_linter/linter.py | 13 +- tests/test_cli.py | 7 + 4 files changed, 255 insertions(+), 131 deletions(-) create mode 100644 APPROVED_TAGS create mode 100644 tests/test_cli.py diff --git a/APPROVED_TAGS b/APPROVED_TAGS new file mode 100644 index 0000000..5c48d8e --- /dev/null +++ b/APPROVED_TAGS @@ -0,0 +1,97 @@ +OWASP_CRS +PCI/12.1 +PCI/6.5.1 +PCI/6.5.10 +PCI/6.5.2 +PCI/6.5.4 +PCI/6.5.6 +anomaly-evaluation +application-multi +attack-deprecated-header +attack-disclosure +attack-fixation +attack-generic +attack-injection-generic +attack-injection-java +attack-injection-php +attack-lfi +attack-multipart-header +attack-protocol +attack-rce +attack-reputation-scanner +attack-rfi +attack-sqli +attack-ssrf +attack-xss +capec/1/180/77 +capec/1000/118/116 +capec/1000/118/116/54 +capec/1000/118/116/54/127 +capec/1000/118/224/541/310 +capec/1000/152 +capec/1000/152/137/15/460 +capec/1000/152/137/6 +capec/1000/152/175/253 +capec/1000/152/242 +capec/1000/152/242/63 +capec/1000/152/248 +capec/1000/152/248/136 +capec/1000/152/248/66 +capec/1000/152/248/88 +capec/1000/153/267 +capec/1000/210/272 +capec/1000/210/272/220 +capec/1000/210/272/220/273 +capec/1000/210/272/220/274 +capec/1000/210/272/220/33 +capec/1000/210/272/220/34 +capec/1000/225/122/17/650 +capec/1000/225/21/593/61 +capec/1000/225/664 +capec/1000/255/153 +capec/1000/255/153/126 +capec/1000/255/153/267 +capec/1000/255/153/267/120 +capec/1000/255/153/267/72 +capec/137/134 +capec/272/220 +header-allowlist +language-aspnet +language-java +language-javascript +language-ldap +language-multi +language-perl +language-php +language-powershell +language-ruby +language-shell +paranoia-level/1 +paranoia-level/2 +paranoia-level/3 +paranoia-level/4 +platform-apache +platform-db2 +platform-emc +platform-firebird +platform-frontbase +platform-hsqldb +platform-iis +platform-informix +platform-ingres +platform-interbase +platform-internet-explorer +platform-maxdb +platform-msaccess +platform-mssql +platform-multi +platform-mysql +platform-oracle +platform-pgsql +platform-sqlite +platform-sybase +platform-tomcat +platform-unix +platform-windows +reporting +xss-perf-disable diff --git a/src/crs_linter/cli.py b/src/crs_linter/cli.py index e7c2a0d..9bdc217 100755 --- a/src/crs_linter/cli.py +++ b/src/crs_linter/cli.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import glob import logging import pathlib import sys @@ -6,8 +7,6 @@ import difflib import argparse import re -from dulwich import porcelain -from dulwich.repo import Repo from dulwich.contrib.release_robot import get_current_version, get_recent_tags from semver import Version @@ -15,6 +14,8 @@ oformat = "native" +logger = logging.getLogger(__name__) + def errmsg(msg): if oformat == "github": print("::error::%s" % (msg)) @@ -114,75 +115,103 @@ def generate_version_string(directory): v4.5.0-6-g872a90ab -> "4.6.0-dev" v4.5.0-0-abcd01234 -> "4.5.0" """ - - current_version = Version.parse(get_current_version(projdir=str(directory.resolve()))) - next_minor = current_version.bump_minor() + if not directory.is_dir(): + raise ValueError(f"Directory {directory} does not exist") + + current_version = get_current_version(projdir=str(directory.resolve())) + if current_version is None: + raise ValueError(f"Can't get current version from {directory}") + parsed_version = Version.parse(current_version) + next_minor = parsed_version.bump_minor() version = next_minor.replace(prerelease="dev") - print(version) return f"OWASP_CRS/{version}" - -def main(): - logger = logging.getLogger(__name__) - parser = argparse.ArgumentParser(description="CRS Rules Check tool") - parser.add_argument("-o", "--output", dest="output", help="Output format native[default]|github", required=False) - parser.add_argument("-d", "--directory", dest="directory", type=pathlib.Path, - help='Directory path to CRS git repository', required=False) - parser.add_argument("-r", "--rules", metavar='/path/to/coreruleset/*.conf', type=str, - nargs='*', help='Directory path to CRS rules', required=True, - action="append") - parser.add_argument("-t", "--tags-list", dest="tagslist", help="Path to file with permitted tags", required=True) - parser.add_argument("-v", "--version", dest="version", help="Version string", required=False) - args = parser.parse_args() - crspath = [] - for l in args.rules: - crspath += l - - if args.output is not None: - if args.output not in ["native", "github"]: - print("--output can be one of the 'native' or 'github'. Default value is 'native'") - sys.exit(1) - oformat = args.output - - if args.version is None: - # if no --version/-v was given, get version from git describe --tags output - crsversion = generate_version_string(args.directory) - else: - crsversion = args.version.strip() - # if no "OWASP_CRS/" prefix, append it - if not crsversion.startswith("OWASP_CRS/"): - crsversion = "OWASP_CRS/" + crsversion - - tags = [] +def get_tags_from_file(filename): try: - with open(args.tagslist, "r") as fp: + with open(filename, "r") as fp: tags = [l.strip() for l in fp.readlines()] # remove empty items, if any tags = list(filter(lambda x: len(x) > 0, tags)) except: - errmsg("Can't open tags list: %s" % args.tagslist) + errmsg(f"Can't open tags list: {filename}") sys.exit(1) - retval = 0 + return tags + +def get_crs_version(directory, version=None): + crs_version = "" + if version is None: + # if no --version/-v was given, get version from git describe --tags output + crs_version = generate_version_string(directory) + else: + crs_version = version.strip() + # if no "OWASP_CRS/" prefix, append it + if not crs_version.startswith("OWASP_CRS/"): + crs_version = "OWASP_CRS/" + crs_version + + return crs_version + +def check_indentation(filename, content): + error = False + + ### make a diff to check the indentations try: - flist = crspath - flist.sort() + with open(filename, 'r') as fp: + from_lines = fp.readlines() + if f.startswith("crs-setup.conf.example"): + from_lines = remove_comments("".join(from_lines)).split("\n") + from_lines = [l + "\n" for l in from_lines] except: - errmsg("Can't open files in given path!") - sys.exit(1) - - if len(flist) == 0: - errmsg("List of files is empty!") - sys.exit(1) + errmsg(" Can't open file for indent check: %s" % (f)) + error = True + + # virtual output + writer = msc_pyparser.MSCWriter(content) + writer.generate() + output = [] + for l in writer.output: + if l == "\n": + output.append("\n") + else: + output += [l + "\n" for l in l.split("\n")] - parsed_structs = {} - txvars = {} + if len(from_lines) < len(output): + from_lines.append("\n") + elif len(from_lines) > len(output): + output.append("\n") - for f in flist: + diff = difflib.unified_diff(from_lines, output) + if from_lines == output: + msg(" Indentation check ok.") + else: + errmsg(" Indentation check found error(s)") + error = True + + for d in diff: + d = d.strip("\n") + r = re.match(r"^@@ -(\d+),(\d+) \+\d+,\d+ @@$", d) + if r: + line1, line2 = [int(i) for i in r.groups()] + e = { + 'indent': 2, + 'file': filename, + 'title': "Indentation error", + 'line': line1, + 'endLine': line1 + line2, + 'message': "an indentation error has found" + } + errmsgf(e) + errmsg(d.strip("\n")) + + return error + +def read_files(filenames): + parsed = {} + for f in filenames: try: - with open(f, 'r') as inputfile: - data = inputfile.read() + with open(f, 'r') as file: + data = file.read() # modify the content of the file, if it is the "crs-setup.conf.example" if f.startswith("crs-setup.conf.example"): data = remove_comments(data) @@ -191,12 +220,12 @@ def main(): sys.exit(1) ### check file syntax - msg("Config file: %s" % (f)) + logger.info(f"Config file: {f}") try: mparser = msc_pyparser.MSCParser() mparser.parser.parse(data) - msg(" Parsing ok.") - parsed_structs[f] = mparser.configlines + logger.info("Parsing OK") + parsed[f] = mparser.configlines except Exception as e: err = e.args[1] if err['cause'] == "lexer": @@ -214,17 +243,46 @@ def main(): retval = 1 continue - msg("Checking parsed rules...") - crsver = "" - for f in parsed_structs.keys(): + return parsed +def parse_args(argv): + parser = argparse.ArgumentParser(description="CRS Rules Check tool") + parser.add_argument("-o", "--output", dest="output", help="Output format native[default]|github", required=False) + parser.add_argument("-d", "--directory", dest="directory", default=pathlib.Path("."), type=pathlib.Path, + help='Directory path to CRS git repository', required=True) + parser.add_argument("-r", "--rules", type=str, dest="crs_rules", nargs='*', + help='Directory path to CRS rules', required=True) + parser.add_argument("-t", "--tags-list", dest="tagslist", help="Path to file with permitted tags", required=True) + parser.add_argument("-v", "--version", dest="version", help="Version string", required=False) + + return parser.parse_args(argv) + +def main(argv): + retval = 0 + args = parse_args(argv) + + files = glob.glob(args.crs_rules[0]) + + if args.output is not None: + if args.output not in ["native", "github"]: + print("--output can be one of the 'native' or 'github'. Default value is 'native'") + sys.exit(1) + oformat = args.output + + crs_version = get_crs_version(args.directory, args.version) + tags = get_tags_from_file(args.tagslist) + parsed = read_files(files) + txvars = {} + + logger.info("Checking parsed rules...") + for f in parsed.keys(): msg(f) - c = Check(parsed_structs[f], f, txvars) + c = Check(parsed[f], f, txvars) ### check case usings c.check_ignore_case() if len(c.caseerror) == 0: - msg(" Ignore case check ok.") + logger.info(" Ignore case check ok.") else: errmsg(" Ignore case check found error(s)") for a in c.caseerror: @@ -232,7 +290,6 @@ def main(): a['file'] = f a['title'] = "Case check" errmsgf(a) - retval = 1 ### check action's order c.check_action_order() @@ -245,55 +302,10 @@ def main(): a['file'] = f a['title'] = 'Action order check' errmsgf(a) - retval = 1 - - ### make a diff to check the indentations - try: - with open(f, 'r') as fp: - fromlines = fp.readlines() - if f.startswith("crs-setup.conf.example"): - fromlines = remove_comments("".join(fromlines)).split("\n") - fromlines = [l + "\n" for l in fromlines] - except: - errmsg(" Can't open file for indent check: %s" % (f)) - retval = 1 - # virtual output - mwriter = msc_pyparser.MSCWriter(parsed_structs[f]) - mwriter.generate() - # mwriter.output.append("") - output = [] - for l in mwriter.output: - if l == "\n": - output.append("\n") - else: - output += [l + "\n" for l in l.split("\n")] - if len(fromlines) < len(output): - fromlines.append("\n") - elif len(fromlines) > len(output): - output.append("\n") - - diff = difflib.unified_diff(fromlines, output) - if fromlines == output: - msg(" Indentation check ok.") - else: - errmsg(" Indentation check found error(s)") + error = check_indentation(f, parsed[f]) + if error: retval = 1 - for d in diff: - d = d.strip("\n") - r = re.match(r"^@@ -(\d+),(\d+) \+\d+,\d+ @@$", d) - if r: - line1, line2 = [int(i) for i in r.groups()] - e = { - 'indent': 2, - 'file': f, - 'title': "Indentation error", - 'line': line1, - 'endLine': line1 + line2, - 'message': "an indentation error has found" - } - errmsgf(e) - errmsg(d.strip("\n")) ### check `ctl:auditLogParts=+E` right place in chained rules c.check_ctl_audit_log() @@ -306,7 +318,6 @@ def main(): a['file'] = f a['title'] = "'ctl:auditLogParts' isn't allowed in CRS" errmsgf(a) - retval = 1 ### collect TX variables # this method collects the TX variables, which set via a @@ -325,7 +336,6 @@ def main(): a['file'] = f a['title'] = "'id' is duplicated" errmsgf(a) - retval = 1 ### check PL consistency c.check_pl_consistency() @@ -338,7 +348,7 @@ def main(): a['file'] = f a['title'] = "wrong or missing paranoia-level/N tag" errmsgf(a) - retval = 1 + if len(c.plscores) == 0: msg(" PL anomaly_scores are correct.") else: @@ -348,7 +358,6 @@ def main(): a['file'] = f a['title'] = "wrong (inbound|outbout)_anomaly_score variable or value" errmsgf(a) - retval = 1 ### check existence of used TX variables c.check_tx_variable() @@ -361,7 +370,7 @@ def main(): a['file'] = f a['title'] = "unset TX variable" errmsgf(a) - retval = 1 + ### check new unlisted tags c.check_tags(tags) if len(c.newtags) == 0: @@ -373,7 +382,7 @@ def main(): a['file'] = f a['title'] = "new unlisted tag" errmsgf(a) - retval = 1 + ### check for t:lowercase in combination with (?i) in regex c.check_lowercase_ignorecase() if len(c.ignorecase) == 0: @@ -385,7 +394,7 @@ def main(): a['file'] = f a['title'] = "t:lowercase and (?i)" errmsgf(a) - retval = 1 + ### check for tag:'OWASP_CRS' c.check_crs_tag() if len(c.nocrstags) == 0: @@ -397,9 +406,9 @@ def main(): a['file'] = f a['title'] = "tag:OWASP_CRS is missing" errmsgf(a) - retval = 1 + ### check for ver action - c.check_ver_action(crsversion) + c.check_ver_action(crs_version) if len(c.noveract) == 0: msg(" No rule without correct ver action.") else: @@ -409,7 +418,6 @@ def main(): a['file'] = f a['title'] = "ver is missing / incorrect" errmsgf(a) - retval = 1 c.check_capture_action() if len(c.nocaptact) == 0: @@ -421,10 +429,14 @@ def main(): a['file'] = f a['title'] = "capture is missing" errmsgf(a) - retval = 1 - msg("End of checking parsed rules") - msg("Cumulated report about unused TX variables") + # set it once if there is an error + if c.is_error(): + retval = 1 + + logger.info("End of checking parsed rules") + + logger.info("Cumulated report about unused TX variables") has_unused = False for tk in txvars: if txvars[tk]['used'] == False: @@ -435,14 +447,13 @@ def main(): a['title'] = "unused TX variable" a['message'] = "unused variable: %s" % (tk) errmsgf(a) - retval = 1 has_unused = True - if has_unused == False: - msg(" No unused TX variable") + if not has_unused: + logger.info(" No unused TX variable") - sys.exit(retval) + return retval if __name__ == "__main__": - main() + sys.exit(main(sys.argv[1:])) diff --git a/src/crs_linter/linter.py b/src/crs_linter/linter.py index 4faee97..6ccb5a7 100755 --- a/src/crs_linter/linter.py +++ b/src/crs_linter/linter.py @@ -90,6 +90,11 @@ def __init__(self, data, filename=None, txvars={}): self.current_ruleid = 0 # holds the rule id self.curr_lineno = 0 # current line number self.chained = False # holds the chained flag + self.re_tx_var = re.compile(r"%\{\}") + self.filename = filename + + # Any of these variables below are used to store the errors + self.caseerror = [] # list of case mismatch errors self.orderacts = [] # list of ordered action errors self.auditlogparts = [] # list of wrong ctl:auditLogParts @@ -104,8 +109,12 @@ def __init__(self, data, filename=None, txvars={}): self.noveract = [] # list of rules without ver action or incorrect ver self.nocaptact = [] # list of rules which uses TX.N without previous 'capture' - self.re_tx_var = re.compile(r"%\{\}") - self.filename = filename + def is_error(self): + """ Returns True if any error is found """ + return len(self.caseerror) > 0 or len(self.orderacts) > 0 or len(self.auditlogparts) > 0 or len( + self.undef_txvars) > 0 or len(self.pltags) > 0 or len(self.plscores) > 0 or len(self.dupes) > 0 or len( + self.ids) > 0 or len(self.newtags) > 0 or len(self.ignorecase) > 0 or len(self.nocrstags) > 0 or len( + self.noveract) > 0 or len(self.nocaptact) > 0 def store_error(self, msg): # store the error msg in the list diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..66a7387 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,7 @@ +from crs_linter.cli import main + + +def test_cli(): + ret = main(["-v", "4.10.0", "-r", "../examples/*.conf", "-t", "./APPROVED_TAGS", "-d", "."]) + + assert ret == 0 From ef41e02efcc68544d7e06feabc0b06b7a067816d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felipe=20Zipitr=C3=ADa?= <3012076+fzipi@users.noreply.github.com> Date: Wed, 22 Jan 2025 09:40:58 -0300 Subject: [PATCH 02/10] Apply suggestions from code review Co-authored-by: Max Leske <250711+theseion@users.noreply.github.com> --- src/crs_linter/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crs_linter/cli.py b/src/crs_linter/cli.py index 9bdc217..24c5a27 100755 --- a/src/crs_linter/cli.py +++ b/src/crs_linter/cli.py @@ -146,7 +146,7 @@ def get_crs_version(directory, version=None): crs_version = generate_version_string(directory) else: crs_version = version.strip() - # if no "OWASP_CRS/" prefix, append it + # if no "OWASP_CRS/" prefix, prepend it if not crs_version.startswith("OWASP_CRS/"): crs_version = "OWASP_CRS/" + crs_version @@ -261,7 +261,7 @@ def main(argv): retval = 0 args = parse_args(argv) - files = glob.glob(args.crs_rules[0]) + files = glob.glob(args.crs_rules[0]) if args.output is not None: if args.output not in ["native", "github"]: From 577c88741a635bf4ed555e36e636c2495545e95b Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Wed, 22 Jan 2025 09:56:39 -0300 Subject: [PATCH 03/10] fix: main cli invocation and test Signed-off-by: Felipe Zipitria --- src/crs_linter/cli.py | 7 ++++--- src/crs_linter/linter.py | 9 +++++---- tests/test_cli.py | 8 +++++++- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/crs_linter/cli.py b/src/crs_linter/cli.py index 24c5a27..2066a5a 100755 --- a/src/crs_linter/cli.py +++ b/src/crs_linter/cli.py @@ -257,9 +257,9 @@ def parse_args(argv): return parser.parse_args(argv) -def main(argv): +def main(): retval = 0 - args = parse_args(argv) + args = parse_args(sys.argv) files = glob.glob(args.crs_rules[0]) @@ -456,4 +456,5 @@ def main(argv): if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + sys.argv[0] = re.sub(r"(-script\.pyw|\.exe)?$", "", sys.argv[0]) + sys.exit(main()) diff --git a/src/crs_linter/linter.py b/src/crs_linter/linter.py index 6ccb5a7..8e5e5ca 100755 --- a/src/crs_linter/linter.py +++ b/src/crs_linter/linter.py @@ -111,10 +111,11 @@ def __init__(self, data, filename=None, txvars={}): def is_error(self): """ Returns True if any error is found """ - return len(self.caseerror) > 0 or len(self.orderacts) > 0 or len(self.auditlogparts) > 0 or len( - self.undef_txvars) > 0 or len(self.pltags) > 0 or len(self.plscores) > 0 or len(self.dupes) > 0 or len( - self.ids) > 0 or len(self.newtags) > 0 or len(self.ignorecase) > 0 or len(self.nocrstags) > 0 or len( - self.noveract) > 0 or len(self.nocaptact) > 0 + error_vars = [self.caseerror, self.orderacts, self.auditlogparts, + self.undef_txvars, self.pltags, self.plscores, self.dupes, + self.ids, self.newtags, self.ignorecase, self.nocrstags, + self.noveract, self.nocaptact] + return any([len(var) > 0 for var in error_vars]) def store_error(self, msg): # store the error msg in the list diff --git a/tests/test_cli.py b/tests/test_cli.py index 66a7387..54e344c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,7 +1,13 @@ +from pytest import mocker + from crs_linter.cli import main def test_cli(): - ret = main(["-v", "4.10.0", "-r", "../examples/*.conf", "-t", "./APPROVED_TAGS", "-d", "."]) + mocker.patch( + "sys.argv", + ["-v", "4.10.0", "-r", "../examples/*.conf", "-t", "./APPROVED_TAGS", "-d", "."] + ) + ret = main() assert ret == 0 From 7a237bc4c0436b50ba531c420b7601fa1ce5208c Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Wed, 22 Jan 2025 10:02:31 -0300 Subject: [PATCH 04/10] fix: add mocker module Signed-off-by: Felipe Zipitria --- pyproject.toml | 3 ++- tests/test_cli.py | 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d63c53d..9c5ad1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ crs-linter = 'crs_linter.cli:main' [dependency-groups] dev = [ - "pytest >=8.1.1,<9" + "pytest >=8.1.1,<9", + "pytest-mock>=3.14.0", ] [tool.semantic_release] diff --git a/tests/test_cli.py b/tests/test_cli.py index 54e344c..98599f6 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,9 +1,7 @@ -from pytest import mocker - from crs_linter.cli import main -def test_cli(): +def test_cli(mocker): mocker.patch( "sys.argv", ["-v", "4.10.0", "-r", "../examples/*.conf", "-t", "./APPROVED_TAGS", "-d", "."] From dcbd4c015a3098718f36d6beea04ffe301d0aa92 Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Wed, 22 Jan 2025 10:34:06 -0300 Subject: [PATCH 05/10] fix: add directory flag to test Signed-off-by: Felipe Zipitria --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b1f9619..bc34728 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -39,4 +39,4 @@ jobs: run: | curl -SLs https://github.com/coreruleset/coreruleset/archive/refs/tags/v${{ matrix.crs-version }}.tar.gz -o - | \ tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example" --wildcards "*/util/APPROVED_TAGS" - uv run crs-linter --output=github -r crs-setup.conf.example -r rules/*.conf -t util/APPROVED_TAGS -v ${{ matrix.crs-version }} \ No newline at end of file + uv run crs-linter --output=github -r crs-setup.conf.example -d . -r rules/*.conf -t util/APPROVED_TAGS -v ${{ matrix.crs-version }} From cd0c6d879b2e42f287dbe60fe73f7e2892eb9471 Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Wed, 22 Jan 2025 10:35:36 -0300 Subject: [PATCH 06/10] fix(ci): run only once Signed-off-by: Felipe Zipitria --- .github/workflows/test.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index bc34728..be36a9c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -1,7 +1,14 @@ --- name: Regression Tests -on: [push, pull_request] +on: + push: + branches: + - main + pull_request: + branches: + - main + jobs: regression: From 3da6a0c851bb67e374a30ce59b7e32478f0e6d02 Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Wed, 22 Jan 2025 11:10:26 -0300 Subject: [PATCH 07/10] fix: args parsing Signed-off-by: Felipe Zipitria --- src/crs_linter/cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/crs_linter/cli.py b/src/crs_linter/cli.py index 2066a5a..3fc69a2 100755 --- a/src/crs_linter/cli.py +++ b/src/crs_linter/cli.py @@ -159,7 +159,7 @@ def check_indentation(filename, content): try: with open(filename, 'r') as fp: from_lines = fp.readlines() - if f.startswith("crs-setup.conf.example"): + if filename.startswith("crs-setup.conf.example"): from_lines = remove_comments("".join(from_lines)).split("\n") from_lines = [l + "\n" for l in from_lines] except: @@ -259,7 +259,7 @@ def parse_args(argv): def main(): retval = 0 - args = parse_args(sys.argv) + args = parse_args(sys.argv[1:]) files = glob.glob(args.crs_rules[0]) From 87f2aecb81ac92c7f3ea5f019f68ad3adc35930a Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Wed, 22 Jan 2025 16:19:34 -0300 Subject: [PATCH 08/10] fix: test mocks Signed-off-by: Felipe Zipitria --- pyproject.toml | 3 +-- src/crs_linter/cli.py | 10 ++++++---- tests/test_cli.py | 15 ++++++++++----- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9c5ad1c..d63c53d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,8 +36,7 @@ crs-linter = 'crs_linter.cli:main' [dependency-groups] dev = [ - "pytest >=8.1.1,<9", - "pytest-mock>=3.14.0", + "pytest >=8.1.1,<9" ] [tool.semantic_release] diff --git a/src/crs_linter/cli.py b/src/crs_linter/cli.py index 3fc69a2..ca49a84 100755 --- a/src/crs_linter/cli.py +++ b/src/crs_linter/cli.py @@ -246,12 +246,12 @@ def read_files(filenames): return parsed def parse_args(argv): - parser = argparse.ArgumentParser(description="CRS Rules Check tool") + parser = argparse.ArgumentParser(prog="crs-linter", description="CRS Rules Check tool") parser.add_argument("-o", "--output", dest="output", help="Output format native[default]|github", required=False) parser.add_argument("-d", "--directory", dest="directory", default=pathlib.Path("."), type=pathlib.Path, help='Directory path to CRS git repository', required=True) - parser.add_argument("-r", "--rules", type=str, dest="crs_rules", nargs='*', - help='Directory path to CRS rules', required=True) + parser.add_argument("-r", "--rules", type=str, dest="crs_rules", + help='CRS rules file to check. Can be used multiple times.', action='append', required=True) parser.add_argument("-t", "--tags-list", dest="tagslist", help="Path to file with permitted tags", required=True) parser.add_argument("-v", "--version", dest="version", help="Version string", required=False) @@ -261,7 +261,9 @@ def main(): retval = 0 args = parse_args(sys.argv[1:]) - files = glob.glob(args.crs_rules[0]) + files = [] + for r in args.crs_rules: + files.extend(glob.glob(r)) if args.output is not None: if args.output not in ["native", "github"]: diff --git a/tests/test_cli.py b/tests/test_cli.py index 98599f6..c761774 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,11 +1,16 @@ +import sys + from crs_linter.cli import main -def test_cli(mocker): - mocker.patch( - "sys.argv", - ["-v", "4.10.0", "-r", "../examples/*.conf", "-t", "./APPROVED_TAGS", "-d", "."] - ) +def test_cli(monkeypatch): + monkeypatch.setattr(sys, "argv", + ["crs-linter", "-v", "4.10.0", + "-r", "../examples/test1.conf", + "-r", "../examples/test?.conf", + "-t", "./APPROVED_TAGS", "-d", "." + ]) + ret = main() assert ret == 0 From 13863c397a142c96909d6aebdf84f7d72f6ffa54 Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Wed, 22 Jan 2025 16:52:08 -0300 Subject: [PATCH 09/10] fix: gha test Signed-off-by: Felipe Zipitria --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index be36a9c..4c1a9f2 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -46,4 +46,4 @@ jobs: run: | curl -SLs https://github.com/coreruleset/coreruleset/archive/refs/tags/v${{ matrix.crs-version }}.tar.gz -o - | \ tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example" --wildcards "*/util/APPROVED_TAGS" - uv run crs-linter --output=github -r crs-setup.conf.example -d . -r rules/*.conf -t util/APPROVED_TAGS -v ${{ matrix.crs-version }} + uv run crs-linter --output=github -r crs-setup.conf.example -d . -r 'rules/*.conf' -t util/APPROVED_TAGS -v ${{ matrix.crs-version }} From 4477d11a0c7de0a6e32485ef88173b2b6221f792 Mon Sep 17 00:00:00 2001 From: Felipe Zipitria Date: Mon, 3 Feb 2025 13:39:18 -0300 Subject: [PATCH 10/10] fix: logger Signed-off-by: Felipe Zipitria --- .github/workflows/test.yml | 4 +- pyproject.toml | 3 +- src/crs_linter/cli.py | 195 ++++++++++--------------------------- src/crs_linter/logger.py | 43 ++++++++ 4 files changed, 100 insertions(+), 145 deletions(-) create mode 100644 src/crs_linter/logger.py diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4c1a9f2..1188ac1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -45,5 +45,5 @@ jobs: - name: "Run crs linter tests for ${{ matrix.crs-version }}" run: | curl -SLs https://github.com/coreruleset/coreruleset/archive/refs/tags/v${{ matrix.crs-version }}.tar.gz -o - | \ - tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example" --wildcards "*/util/APPROVED_TAGS" - uv run crs-linter --output=github -r crs-setup.conf.example -d . -r 'rules/*.conf' -t util/APPROVED_TAGS -v ${{ matrix.crs-version }} + tar xzvf - --strip-components=1 --wildcards "*/rules/*" --wildcards "*/crs-setup.conf.example" + uv run crs-linter -o github -d . -r crs-setup.conf.example -r 'rules/*.conf' -t APPROVED_TAGS -v ${{ matrix.crs-version }} diff --git a/pyproject.toml b/pyproject.toml index d63c53d..66f3773 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,8 @@ packages = [ dependencies = [ "msc_pyparser >=1.2.1", "dulwich (>=0.22.7,<0.23.0)", - "semver (>=3.0.2,<4.0.0)" + "semver (>=3.0.2,<4.0.0)", + "github-action-utils>=1.1.0,<2.0.0", ] [project.scripts] diff --git a/src/crs_linter/cli.py b/src/crs_linter/cli.py index ca49a84..0eec9d2 100755 --- a/src/crs_linter/cli.py +++ b/src/crs_linter/cli.py @@ -11,40 +11,7 @@ from semver import Version from crs_linter.linter import Check - -oformat = "native" - -logger = logging.getLogger(__name__) - -def errmsg(msg): - if oformat == "github": - print("::error::%s" % (msg)) - else: - print(msg) - - -def errmsgf(msg): - if oformat == "github": - if 'message' in msg and msg['message'].strip() != "": - print("::error%sfile={file},line={line},endLine={endLine},title={title}:: {message}".format(**msg) % ( - msg['indent'] * " ")) - else: - print("::error%sfile={file},line={line},endLine={endLine},title={title}::".format(**msg) % ( - msg['indent'] * " ")) - else: - if 'message' in msg and msg['message'].strip() != "": - print("%sfile={file}, line={line}, endLine={endLine}, title={title}: {message}".format(**msg) % ( - msg['indent'] * " ")) - else: - print("%sfile={file}, line={line}, endLine={endLine}, title={title}".format(**msg) % (msg['indent'] * " ")) - - -def msg(msg): - if oformat == "github": - print("::debug::%s" % (msg)) - else: - print(msg) - +from crs_linter.logger import Logger def remove_comments(data): """ @@ -127,6 +94,7 @@ def generate_version_string(directory): return f"OWASP_CRS/{version}" + def get_tags_from_file(filename): try: with open(filename, "r") as fp: @@ -134,11 +102,12 @@ def get_tags_from_file(filename): # remove empty items, if any tags = list(filter(lambda x: len(x) > 0, tags)) except: - errmsg(f"Can't open tags list: {filename}") + logger.error(f"Can't open tags list: {filename}") sys.exit(1) return tags + def get_crs_version(directory, version=None): crs_version = "" if version is None: @@ -152,6 +121,7 @@ def get_crs_version(directory, version=None): return crs_version + def check_indentation(filename, content): error = False @@ -163,7 +133,7 @@ def check_indentation(filename, content): from_lines = remove_comments("".join(from_lines)).split("\n") from_lines = [l + "\n" for l in from_lines] except: - errmsg(" Can't open file for indent check: %s" % (f)) + logger.error(" Can't open file for indent check: %s" % (f)) error = True # virtual output @@ -183,9 +153,9 @@ def check_indentation(filename, content): diff = difflib.unified_diff(from_lines, output) if from_lines == output: - msg(" Indentation check ok.") + logger.debug(" Indentation check ok.") else: - errmsg(" Indentation check found error(s)") + logger.error(" Indentation check found error(s)") error = True for d in diff: @@ -202,11 +172,14 @@ def check_indentation(filename, content): 'message': "an indentation error has found" } errmsgf(e) - errmsg(d.strip("\n")) + logger.error(d.strip("\n")) return error + def read_files(filenames): + global logger + parsed = {} for f in filenames: try: @@ -216,7 +189,7 @@ def read_files(filenames): if f.startswith("crs-setup.conf.example"): data = remove_comments(data) except: - errmsg("Can't open file: %s" % f) + logger.error(f"Can't open file: {f}") sys.exit(1) ### check file syntax @@ -224,7 +197,7 @@ def read_files(filenames): try: mparser = msc_pyparser.MSCParser() mparser.parser.parse(data) - logger.info("Parsing OK") + logger.debug("Parsing OK") parsed[f] = mparser.configlines except Exception as e: err = e.args[1] @@ -232,24 +205,20 @@ def read_files(filenames): cause = "Lexer" else: cause = "Parser" - errmsg("Can't parse config file: %s" % (f)) - errmsgf({ - 'indent': 2, - 'file': f, - 'title': "%s error" % (cause), - 'line': err['line'], - 'endLine': err['line'], - 'message': "can't parse file"}) + logger.error(f"Can't parse config file: {f}", title=f"{cause} error", file=f, line=err['line'], endLine=err['line']) retval = 1 continue return parsed + def parse_args(argv): parser = argparse.ArgumentParser(prog="crs-linter", description="CRS Rules Check tool") - parser.add_argument("-o", "--output", dest="output", help="Output format native[default]|github", required=False) + parser.add_argument("-o", "--output", dest="output", default="native", + help="Output format", choices=["native", "github"], required=False) parser.add_argument("-d", "--directory", dest="directory", default=pathlib.Path("."), type=pathlib.Path, help='Directory path to CRS git repository', required=True) + parser.add_argument("--debug", dest="debug", help="Show debug information.", action='store_true') parser.add_argument("-r", "--rules", type=str, dest="crs_rules", help='CRS rules file to check. Can be used multiple times.', action='append', required=True) parser.add_argument("-t", "--tags-list", dest="tagslist", help="Path to file with permitted tags", required=True) @@ -258,6 +227,7 @@ def parse_args(argv): return parser.parse_args(argv) def main(): + global logger retval = 0 args = parse_args(sys.argv[1:]) @@ -265,11 +235,7 @@ def main(): for r in args.crs_rules: files.extend(glob.glob(r)) - if args.output is not None: - if args.output not in ["native", "github"]: - print("--output can be one of the 'native' or 'github'. Default value is 'native'") - sys.exit(1) - oformat = args.output + logger = Logger(output=args.output, debug=args.debug) crs_version = get_crs_version(args.directory, args.version) tags = get_tags_from_file(args.tagslist) @@ -278,32 +244,25 @@ def main(): logger.info("Checking parsed rules...") for f in parsed.keys(): - msg(f) + logger.debug(f) c = Check(parsed[f], f, txvars) ### check case usings c.check_ignore_case() if len(c.caseerror) == 0: - logger.info(" Ignore case check ok.") + logger.info("Ignore case check ok.") else: - errmsg(" Ignore case check found error(s)") + logger.error("Ignore case check found error(s)") for a in c.caseerror: - a['indent'] = 2 - a['file'] = f - a['title'] = "Case check" - errmsgf(a) + logger.error(file=f, title="Case check") ### check action's order c.check_action_order() if len(c.orderacts) == 0: - msg(" Action order check ok.") + logger.debug("Action order check ok.") else: - errmsg(" Action order check found error(s)") for a in c.orderacts: - a['indent'] = 2 - a['file'] = f - a['title'] = 'Action order check' - errmsgf(a) + logger.error("Action order check found error(s)", file=f, title='Action order check') error = check_indentation(f, parsed[f]) if error: @@ -312,14 +271,11 @@ def main(): ### check `ctl:auditLogParts=+E` right place in chained rules c.check_ctl_audit_log() if len(c.auditlogparts) == 0: - msg(" no 'ctl:auditLogParts' action found.") + logger.debug(" no 'ctl:auditLogParts' action found.") else: - errmsg(" Found 'ctl:auditLogParts' action") + logger.error() for a in c.auditlogparts: - a['indent'] = 2 - a['file'] = f - a['title'] = "'ctl:auditLogParts' isn't allowed in CRS" - errmsgf(a) + logger.error("Found 'ctl:auditLogParts' action", file=f, title="'ctl:auditLogParts' isn't allowed in CRS") ### collect TX variables # this method collects the TX variables, which set via a @@ -330,129 +286,84 @@ def main(): ### check duplicate ID's # c.dupes filled during the tx variable collected if len(c.dupes) == 0: - msg(" no duplicate id's") + logger.debug("No duplicate id's") else: - errmsg(" Found duplicated id('s)") - for a in c.dupes: - a['indent'] = 2 - a['file'] = f - a['title'] = "'id' is duplicated" - errmsgf(a) + logger.error("Found duplicated id('s)", file=f, title="'id' is duplicated") ### check PL consistency c.check_pl_consistency() if len(c.pltags) == 0: - msg(" paranoia-level tags are correct.") + logger.debug("Paranoia-level tags are correct.") else: - errmsg(" Found incorrect paranoia-level/N tag(s)") for a in c.pltags: - a['indent'] = 2 - a['file'] = f - a['title'] = "wrong or missing paranoia-level/N tag" - errmsgf(a) + logger.error("Found incorrect paranoia-level/N tag(s)", file=f, title="wrong or missing paranoia-level/N tag") if len(c.plscores) == 0: - msg(" PL anomaly_scores are correct.") + logger.debug("PL anomaly_scores are correct.") else: - errmsg(" Found incorrect (inbound|outbout)_anomaly_score value(s)") for a in c.plscores: - a['indent'] = 2 - a['file'] = f - a['title'] = "wrong (inbound|outbout)_anomaly_score variable or value" - errmsgf(a) + logger.error(" Found incorrect (inbound|outbout)_anomaly_score value(s)", file=f, title="wrong (inbound|outbout)_anomaly_score variable or value") ### check existence of used TX variables c.check_tx_variable() if len(c.undef_txvars) == 0: - msg(" All TX variables are set.") + logger.debug("All TX variables are set.") else: - errmsg(" There are one or more unset TX variables.") for a in c.undef_txvars: - a['indent'] = 2 - a['file'] = f - a['title'] = "unset TX variable" - errmsgf(a) + logger.error("There are one or more unset TX variables.", file=f, title="unset TX variable") ### check new unlisted tags c.check_tags(tags) if len(c.newtags) == 0: - msg(" No new tags added.") + logger.debug("No new tags added.") else: - errmsg(" There are one or more new tag(s).") - for a in c.newtags: - a['indent'] = 2 - a['file'] = f - a['title'] = "new unlisted tag" - errmsgf(a) + logger.error(" There are one or more new tag(s).", file=f, title="new unlisted tag") ### check for t:lowercase in combination with (?i) in regex c.check_lowercase_ignorecase() if len(c.ignorecase) == 0: - msg(" No t:lowercase and (?i) flag used.") + logger.debug("No t:lowercase and (?i) flag used.") else: - errmsg(" There are one or more combinations of t:lowercase and (?i) flag.") - for a in c.ignorecase: - a['indent'] = 2 - a['file'] = f - a['title'] = "t:lowercase and (?i)" - errmsgf(a) + logger.error("There are one or more combinations of t:lowercase and (?i) flag", file=f, title="t:lowercase and (?i)") ### check for tag:'OWASP_CRS' c.check_crs_tag() if len(c.nocrstags) == 0: - msg(" No rule without OWASP_CRS tag.") + logger.debug("No rule without OWASP_CRS tag.") else: - errmsg(" There are one or more rules without OWASP_CRS tag.") - for a in c.nocrstags: - a['indent'] = 2 - a['file'] = f - a['title'] = "tag:OWASP_CRS is missing" - errmsgf(a) + logger.error("There are one or more rules without OWASP_CRS tag", file=f, title="tag:OWASP_CRS is missing") ### check for ver action c.check_ver_action(crs_version) if len(c.noveract) == 0: - msg(" No rule without correct ver action.") + logger.debug("No rule without correct ver action.") else: - errmsg(" There are one or more rules without ver action.") - for a in c.noveract: - a['indent'] = 2 - a['file'] = f - a['title'] = "ver is missing / incorrect" - errmsgf(a) + logger.error("There are one or more rules without ver action.", file=f, title="ver is missing / incorrect") c.check_capture_action() if len(c.nocaptact) == 0: - msg(" No rule uses TX.N without capture action.") + logger.debug("No rule uses TX.N without capture action.") else: - errmsg(" There are one or more rules using TX.N without capture action.") - for a in c.nocaptact: - a['indent'] = 2 - a['file'] = f - a['title'] = "capture is missing" - errmsgf(a) + logger.error("There are one or more rules using TX.N without capture action.", file=f, title="capture is missing") # set it once if there is an error if c.is_error(): retval = 1 - logger.info("End of checking parsed rules") + logger.debug("End of checking parsed rules") - logger.info("Cumulated report about unused TX variables") + logger.debug("Cumulated report about unused TX variables") has_unused = False for tk in txvars: if txvars[tk]['used'] == False: if has_unused == False: - msg(" Unused TX variable(s):") + logger.debug("Unused TX variable(s):") a = txvars[tk] - a['indent'] = 2 - a['title'] = "unused TX variable" - a['message'] = "unused variable: %s" % (tk) - errmsgf(a) + logger.error(f"unused variable: {tk}", title="unused TX variable") has_unused = True if not has_unused: - logger.info(" No unused TX variable") + logger.debug("No unused TX variable") return retval diff --git a/src/crs_linter/logger.py b/src/crs_linter/logger.py new file mode 100644 index 0000000..dfa3577 --- /dev/null +++ b/src/crs_linter/logger.py @@ -0,0 +1,43 @@ +import logging + +import github_action_utils as gha_utils + + +class Logger: + def __init__(self, output="native", debug=False): + self.output = output + self.debugging = debug + level = logging.INFO + if self.output == "native": + self.logger = logging.getLogger() + if self.debugging: + level = logging.DEBUG + self.logger.setLevel(level) + else: + self.logger = gha_utils + + def debug(self, *args, **kwargs): + if self.debugging: + if self.output == "native": + self.logger.debug(*args) + else: + self.logger.debug(*args, **kwargs) + + pass + + def error(self, *args, **kwargs): + if self.output == "native": + self.logger.error(*args) + else: + self.logger.error(*args, **kwargs) + + def warning(self, *args, **kwargs): + if self.output == "native": + self.logger.warning(*args) + else: + self.logger.warning(*args, **kwargs) + + def info(self, *args, **kwargs): + if self.output == "native": + self.logger.info(*args, **kwargs) +