Skip to content

Commit

Permalink
Make both pyright and mypy happy
Browse files Browse the repository at this point in the history
  • Loading branch information
nwiltsie committed Mar 1, 2024
1 parent 0d02c85 commit 3ade4e4
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 58 deletions.
6 changes: 3 additions & 3 deletions run-nextflow-tests/configtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def from_file(cls: Type[T], pipeline: Path, filepath: Path) -> T:
result.filepath = filepath.resolve()
return result

def replace_results(self, updated_results):
def replace_results(self: T, updated_results) -> T:
"Return another test object with updated results and filepath."
regenerated_test = dataclasses.replace(
self,
Expand Down Expand Up @@ -178,7 +178,7 @@ def _run_test(self):
print(config_output)
raise

def print_diffs(self, other: Type[T]):
def print_diffs(self, other: T):
"Print the diff results to the console."
diff_process = subprocess.run(
["diff", self.filepath, other.filepath],
Expand Down Expand Up @@ -260,7 +260,7 @@ def mark_for_archive(self):
outfile.write(f"archive_key={key}\n")
outfile.write(f"archive_path={self.filepath}\n")

def recompute_results(self) -> T:
def recompute_results(self: T) -> T:
"Compare the results."
result = self._run_test()

Expand Down
131 changes: 76 additions & 55 deletions run-nextflow-tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import itertools
import re
import json
from typing import List
from typing import List, Any


ESCAPE_RE = re.compile(r"([^\\])\\([ =:])")
Expand Down Expand Up @@ -66,70 +66,91 @@ def diff_json(alpha, beta):
return results


def parse_value(value_str: str):
def _parse_list_value(value_str: str) -> List[Any]:
"Parse a list-like value."
value = []
stack = []

assert value_str[0] == "["
assert value_str[-1] == "]"

list_str = value_str[1:-1]

index = 0
first_index = 0
for index, character in enumerate(list_str):
if character == "{":
stack.append("}")
elif character == "(":
stack.append(")")
elif character in ("}", ")"):
assert stack[-1] == character
stack.pop()

elif character == "," and not stack:
# Do not include the comma
value.append(parse_value(list_str[first_index:index]))
first_index = index + 1

assert not stack

if index > first_index:
value.append(parse_value(list_str[first_index:]))

return value


def _parse_dict_value(value_str: str) -> dict:
"Parse a dictionary-like value."
value = {}

assert value_str[0] == "{"
assert value_str[-1] == "}"

for token in value_str[1:-1].split(", "):
try:
token_key, token_value = token.split("\\=", maxsplit=1)
except ValueError:
print(f"The bad value is `{value_str}`")
print(f"The specific token is `{token}`")
raise

value[parse_value(token_key)] = parse_value(token_value)

return value


def parse_value(value_str: str) -> Any:
"Parse a value."
# pylint: disable=too-many-branches,too-many-return-statements
try:
if CLOSURE_RE.match(value_str):
return "closure()"
except TypeError:
print(value_str)
raise

value: Any = None

# Mask any memory addresses
if POINTER_RE.match(value_str):
return POINTER_RE.sub(r"\1dec0ded", value_str)

if value_str and value_str[0] == "[" and value_str[-1] == "]":
value = []
stack = []

list_str = value_str[1:-1]

index = 0
first_index = 0
for index, character in enumerate(list_str):
if character == "{":
stack.append("}")
elif character == "(":
stack.append(")")
elif character in ("}", ")"):
assert stack[-1] == character
stack.pop()

elif character == "," and not stack:
# Do not include the comma
value.append(parse_value(list_str[first_index:index]))
first_index = index + 1
value_str = POINTER_RE.sub(r"\1dec0ded", value_str)

assert not stack
elif value_str and value_str[0] == "[" and value_str[-1] == "]":
value = _parse_list_value(value_str)

if index > first_index:
value.append(parse_value(list_str[first_index:]))
elif value_str and value_str[0] == "{" and value_str[-1] == "}":
value = _parse_dict_value(value_str)

return value
elif value_str == "true":
value = True

if value_str and value_str[0] == "{" and value_str[-1] == "}":
value = {}
for token in value_str[1:-1].split(", "):
try:
token_key, token_value = token.split("\\=", maxsplit=1)
except ValueError:
print(f"The bad value is `{value_str}`")
print(f"The specific token is `{token}`")
raise
elif value_str == "false":
value = False

value[parse_value(token_key)] = parse_value(token_value)

return value

if value_str == "true":
return True

if value_str == "false":
return False
else:
value = ESCAPE_RE.sub(r"\1\2", value_str.strip())

return ESCAPE_RE.sub(r"\1\2", value_str.strip())
return value


def parse_config(config_str: str, dated_fields: List[str]) -> dict:
Expand All @@ -149,18 +170,18 @@ def assign_value(closure, key, value):

assign_value(closure[local_key], remainder, value)

config = {}
config: dict[str, Any] = {}

for line in config_str.splitlines():
line = line.strip()
if not line:
continue

try:
key, value = param_re.match(line).groups()
except AttributeError:
print(f"The offending line is `{line}`")
raise
param_match = param_re.match(line)
if not param_match:
raise ValueError(f"The offending line is `{line}`")

key, value = param_match.groups()

escaped_key = ESCAPE_RE.sub(r"\1\2", key)
if escaped_key in dated_fields:
Expand Down

0 comments on commit 3ade4e4

Please sign in to comment.