Skip to content

Commit

Permalink
Merge pull request in-toto#665 from lukpueh/refactor-schema
Browse files Browse the repository at this point in the history
Add backwards-compatible SCHEMA replacement
  • Loading branch information
lukpueh authored Dec 4, 2023
2 parents dd77439 + e7e9db9 commit 3d3038d
Show file tree
Hide file tree
Showing 14 changed files with 256 additions and 126 deletions.
130 changes: 116 additions & 14 deletions in_toto/formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,124 @@
See LICENSE for licensing information.
<Purpose>
Format schemas for in-toto metadata, based on securesystemslib.schema.
Helpers to validate API inputs and metadata model objects.
The schemas can be verified using the following methods inherited from
securesystemslib.schema:
"""
from copy import deepcopy
from re import fullmatch

in_toto.formats.<SCHEMA>.check_match(<object to verify>)
in_toto.formats.<SCHEMA>.matches(<object to verify>)
from securesystemslib.exceptions import FormatError
from securesystemslib.signer import Key, Signature

`check_match` raises a securesystemslib.exceptions.FormatError and `matches`
returns False if the verified object does not match the schema (True
otherwise).
from in_toto.models._signer import GPGKey, GPGSignature

"""
import securesystemslib.schema as ssl_schema

PARAMETER_DICTIONARY_KEY = ssl_schema.RegularExpression(r"[a-zA-Z0-9_-]+")
PARAMETER_DICTIONARY_SCHEMA = ssl_schema.DictOf(
key_schema=PARAMETER_DICTIONARY_KEY, value_schema=ssl_schema.AnyString()
)
def _err(arg, expected):
return FormatError(f"expected {expected}, got '{arg} ({type(arg)})'")


def _check_int(arg):
if not isinstance(arg, int):
raise _err(arg, "int")


def _check_str(arg):
if not isinstance(arg, str):
raise _err(arg, "str")


def _check_hex(arg):
_check_str(arg)
if fullmatch(r"^[0-9a-fA-F]+$", arg) is None:
raise _err(arg, "hex string")


def _check_list(arg):
if not isinstance(arg, list):
raise _err(arg, "list")


def _check_dict(arg):
if not isinstance(arg, dict):
raise _err(arg, "dict")


def _check_str_list(arg):
_check_list(arg)
for e in arg:
_check_str(e)


def _check_hex_list(arg):
_check_list(arg)
for e in arg:
_check_hex(e)


def _check_iso8601(arg):
"""Check iso8601 date format."""
_check_str(arg)
if fullmatch(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$", arg) is None:
raise _err(arg, "'YYYY-MM-DDTHH:MM:SSZ'")


def _check_hash_dict(arg):
"""Check artifact hash dict."""
_check_dict(arg)
for k, v in arg.items():
_check_str(k)
_check_hex(v)


def _check_parameter_dict(arg):
"""Check verifylib parameter dict."""
_check_dict(arg)
for k, v in arg.items():
_check_str(k)
if fullmatch(r"^[a-zA-Z0-9_-]+$", k) is None:
raise _err(arg, "'a-zA-Z0-9_-'")
_check_str(v)


def _check_signature(arg):
"""Check signature dict."""
_check_dict(arg)
# NOTE: `GPGSignature` and `Signature` serialization formats are incompatible
try:
GPGSignature.from_dict(arg)
except KeyError:
try:
Signature.from_dict(deepcopy(arg))
except KeyError as e:
raise _err(arg, "signature dict") from e


def _check_public_key(arg):
"""Check public key dict."""
_check_dict(arg)
# NOTE: `GPGKey` and `Key` serialization formats are incompatible
try:
GPGKey.from_dict(arg["keyid"], arg)
except (KeyError, TypeError):
try:
Key.from_dict(arg["keyid"], deepcopy(arg))
except KeyError as e:
raise _err(arg, "public key dict") from e


def _check_public_keys(arg):
"""Check dict of public key dicts."""
_check_dict(arg)
for k, v in arg.items():
_check_hex(k)
_check_public_key(v)


def _check_signing_key(arg):
"""Check legacy signing key dict format.
NOTE: signing key dict is deprecated, this check will be removed with it
"""
_check_public_key(arg)
if not arg["keyval"].get("private"):
raise _err(arg, "private key data")
3 changes: 2 additions & 1 deletion in_toto/in_toto_sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
sort_action_groups,
title_case_action_groups,
)
from in_toto.formats import _check_hex
from in_toto.models._signer import GPGSigner
from in_toto.models.link import FILENAME_FORMAT
from in_toto.models.metadata import Metadata
Expand Down Expand Up @@ -94,7 +95,7 @@ def _sign_and_dump_metadata(metadata, args):

# Otherwise we sign with each passed keyid
for keyid in args.gpg:
securesystemslib.formats.KEYID_SCHEMA.check_match(keyid)
_check_hex(keyid)
signature = metadata.create_signature(
GPGSigner(keyid, homedir=args.gpg_home)
)
Expand Down
3 changes: 1 addition & 2 deletions in_toto/models/_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ def sign(self, payload: bytes) -> GPGSignature:
payload: The bytes to be signed.
Raises:
securesystemslib.exceptions.FormatError:
If the keyid was passed and does not match
securesystemslib.formats.KEYID_SCHEMA.
If the keyid is invalid.
ValueError: the gpg command failed to create a valid signature.
OSError: the gpg command is not present or non-executable.
securesystemslib.exceptions.UnsupportedLibraryError: the gpg command is
Expand Down
64 changes: 35 additions & 29 deletions in_toto/models/layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,21 @@
import securesystemslib.formats
import securesystemslib.gpg.functions
import securesystemslib.interface
import securesystemslib.schema
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta

import in_toto.exceptions
import in_toto.rulelib
from in_toto.formats import (
_check_hex,
_check_hex_list,
_check_int,
_check_iso8601,
_check_public_key,
_check_public_keys,
_check_str,
_check_str_list,
)
from in_toto.models.common import Signable, ValidationMixin

# Link metadata for sublayouts are expected to be found in a subdirectory
Expand Down Expand Up @@ -158,9 +167,9 @@ def set_relative_expiration(self, days=0, months=0, years=0):
securesystemslib.exceptions.FormatError: Arguments are not ints.
"""
securesystemslib.schema.Integer().check_match(days)
securesystemslib.schema.Integer().check_match(months)
securesystemslib.schema.Integer().check_match(years)
_check_int(days)
_check_int(months)
_check_int(years)

self.expires = (
datetime.today()
Expand Down Expand Up @@ -196,7 +205,7 @@ def get_step_by_name(self, step_name):
A Step object.
"""
securesystemslib.schema.AnyString().check_match(step_name)
_check_str(step_name)

for step in self.steps: # pragma: no branch
if step.name == step_name:
Expand All @@ -216,7 +225,7 @@ def remove_step_by_name(self, step_name):
securesystemslib.exceptions.FormatError: Argument is not a string.
"""
securesystemslib.schema.AnyString().check_match(step_name)
_check_str(step_name)

for step in self.steps:
if step.name == step_name:
Expand Down Expand Up @@ -251,7 +260,7 @@ def get_inspection_by_name(self, inspection_name):
An Inspection object.
"""
securesystemslib.schema.AnyString().check_match(inspection_name)
_check_str(inspection_name)

for inspection in self.inspect: # pragma: no branch
if inspection.name == inspection_name:
Expand All @@ -271,7 +280,7 @@ def remove_inspection_by_name(self, inspection_name):
securesystemslib.exceptions.FormatError: Argument is not a string.
"""
securesystemslib.schema.AnyString().check_match(inspection_name)
_check_str(inspection_name)

for inspection in self.inspect:
if inspection.name == inspection_name:
Expand All @@ -290,7 +299,7 @@ def add_functionary_key(self, key):
"""Adds key as functionary key to layout.
Arguments:
key: A public key. Format is securesystemslib.formats.ANY_PUBKEY_SCHEMA.
key: A public key.
Raises:
securesystemslib.exceptions.FormatError: Argument is malformed.
Expand All @@ -299,7 +308,7 @@ def add_functionary_key(self, key):
The added key.
"""
securesystemslib.formats.ANY_PUBKEY_SCHEMA.check_match(key)
_check_public_key(key)
keyid = key["keyid"]
self.keys[keyid] = key
return key
Expand All @@ -308,8 +317,7 @@ def add_functionary_key_from_path(self, key_path):
"""Loads key from disk and adds as functionary key to layout.
Arguments:
key_path: A path to a PEM-formatted RSA public key. Format is
securesystemslib.formats.PATH_SCHEMA.
key_path: A path to a PEM-formatted RSA public key.
Raises:
securesystemslib.exceptions.FormatError: Argument is malformed.
Expand All @@ -319,7 +327,7 @@ def add_functionary_key_from_path(self, key_path):
The added functionary public key.
"""
securesystemslib.formats.PATH_SCHEMA.check_match(key_path)
_check_str(key_path)
key = securesystemslib.interface.import_rsa_publickey_from_file(
key_path
)
Expand All @@ -345,9 +353,9 @@ def add_functionary_key_from_gpg_keyid(self, gpg_keyid, gpg_home=None):
The added key.
"""
securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid)
_check_hex(gpg_keyid)
if gpg_home: # pragma: no branch
securesystemslib.formats.PATH_SCHEMA.check_match(gpg_home)
_check_str(gpg_home)

key = securesystemslib.gpg.functions.export_pubkey(
gpg_keyid, homedir=gpg_home
Expand All @@ -358,8 +366,7 @@ def add_functionary_keys_from_paths(self, key_path_list):
"""Loads keys from disk and adds as functionary keys to layout.
Arguments:
key_path_list: A list of paths to PEM-formatted RSA public keys. Format
of each path is securesystemslib.formats.PATH_SCHEMA.
key_path_list: A list of paths to PEM-formatted RSA public keys.
Raises:
securesystemslib.exceptions.FormatError: Argument is malformed.
Expand All @@ -370,7 +377,7 @@ def add_functionary_keys_from_paths(self, key_path_list):
keys and keys as values.
"""
securesystemslib.formats.PATHS_SCHEMA.check_match(key_path_list)
_check_str_list(key_path_list)
key_dict = {}
for key_path in key_path_list:
key = self.add_functionary_key_from_path(key_path)
Expand Down Expand Up @@ -400,7 +407,7 @@ def add_functionary_keys_from_gpg_keyids(
keys and keys as values.
"""
securesystemslib.formats.KEYIDS_SCHEMA.check_match(gpg_keyid_list)
_check_hex_list(gpg_keyid_list)
key_dict = {}
for gpg_keyid in gpg_keyid_list:
key = self.add_functionary_key_from_gpg_keyid(gpg_keyid, gpg_home)
Expand All @@ -419,12 +426,11 @@ def _validate_expires(self):
"""Private method to verify if the expiration field has the right format
and can be parsed."""
try:
# We do both 'parse' and 'check_match' because the format check does not
# We do both 'parse' and '_check_iso8601' because the format check does not
# detect bogus dates (e.g. Jan 35th) and parse can do more formats.
parse(self.expires)
securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(
self.expires
)
_check_iso8601(self.expires)

except Exception as e:
raise securesystemslib.exceptions.FormatError(
"Malformed date string in layout. Exception: {}".format(e)
Expand All @@ -441,7 +447,7 @@ def _validate_readme(self):

def _validate_keys(self):
"""Private method to ensure that the keys contained are right."""
securesystemslib.formats.ANY_PUBKEY_DICT_SCHEMA.check_match(self.keys)
_check_public_keys(self.keys)

def _validate_steps_and_inspections(self):
"""Private method to verify that the list of steps and inspections are
Expand Down Expand Up @@ -529,7 +535,7 @@ def add_material_rule_from_string(self, rule_string):
securesystemslib.exceptions.FormatError: Argument is malformed.
"""
securesystemslib.schema.AnyString().check_match(rule_string)
_check_str(rule_string)
rule_list = shlex.split(rule_string)

# Raises format error if the parsed rule_string is not a valid rule
Expand All @@ -547,7 +553,7 @@ def add_product_rule_from_string(self, rule_string):
securesystemslib.exceptions.FormatError: Argument is malformed.
"""
securesystemslib.schema.AnyString().check_match(rule_string)
_check_str(rule_string)
rule_list = shlex.split(rule_string)

# Raises format error if the parsed rule_string is not a valid rule
Expand Down Expand Up @@ -637,7 +643,7 @@ def set_expected_command_from_string(self, command_string):
securesystemslib.exceptions.FormatError: Argument is malformed.
"""
securesystemslib.schema.AnyString().check_match(command_string)
_check_str(command_string)
self.expected_command = shlex.split(command_string)

def _validate_type(self):
Expand All @@ -664,7 +670,7 @@ def _validate_pubkeys(self):
)

for keyid in self.pubkeys:
securesystemslib.formats.KEYID_SCHEMA.check_match(keyid)
_check_hex(keyid)

def _validate_expected_command(self):
"""Private method to check that the expected_command is proper."""
Expand Down Expand Up @@ -724,7 +730,7 @@ def set_run_from_string(self, command_string):
securesystemslib.exceptions.FormatError: Argument is malformed.
"""
securesystemslib.schema.AnyString().check_match(command_string)
_check_str(command_string)
self.run = shlex.split(command_string)

def _validate_type(self):
Expand Down
Loading

0 comments on commit 3d3038d

Please sign in to comment.