Skip to content

Commit

Permalink
runlib: add signer kwarg to run and record methods
Browse files Browse the repository at this point in the history
Adds optional signer (`securesystemslib.signer.Signer`) arg to runlib's
run/record functions, as alternative way of signing resulting link
metadata, instead of using signing_key, gpg_keyid, or use_default_gpg.

A deprecation warning is added for `signing_key` only, because `signer`
can be used as backwards compatible replacement, and the patch is part
of a series of patches to prepare for the planned removal of
securesystemslib legacy modules `interface` and `keys`, needed by
`signing_key`.

gpg related arguments are not yet deprecated, as the related
implementation in the securesystemslib Signer API is not compatible
(it uses a different public key metadata format).

Note: The patch aims to be minimally invasive, and thus barely refactors
any of the existing signing argument handling in the relevant functions.
Although, it was tempting to simplify the code, it turned out harder
than thought, and therefor not worth the effort, given that these
arguments are bound to deprecation.

Closes in-toto#532

Signed-off-by: Lukas Puehringer <[email protected]>
  • Loading branch information
lukpueh committed Nov 20, 2023
1 parent f27f0aa commit fb96ed0
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 30 deletions.
128 changes: 98 additions & 30 deletions in_toto/runlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import securesystemslib.formats
import securesystemslib.gpg
import securesystemslib.hash
from securesystemslib.signer import Signature, SSlibSigner
from securesystemslib.signer import Key, Signature, Signer, SSlibSigner

import in_toto.exceptions
import in_toto.settings
Expand Down Expand Up @@ -401,6 +401,26 @@ def _check_match_signing_key(signing_key):
)


def _check_signer(signer):
if not isinstance(signer, Signer):
raise ValueError("signer must be a Signer instance")

if not (
hasattr(signer, "public_key") and isinstance(signer.public_key, Key)
):
# TODO: add `public_key` to `Signer` interface upstream
# see secure-systems-lab/securesystemslib#605
raise ValueError("only Signer instances with public key supported")


def _require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default):
if not any([signer, signing_key, gpg_keyid, gpg_use_default]):
raise ValueError(
"Pass either a signer, a signing key, a gpg keyid or set"
" gpg_use_default to True!"
)


def in_toto_run(
name,
material_list,
Expand All @@ -420,14 +440,15 @@ def in_toto_run(
metadata_directory=None,
use_dsse=False,
timeout=in_toto.settings.LINK_CMD_EXEC_TIMEOUT,
signer=None,
):
"""Performs a supply chain step or inspection generating link metadata.
Executes link_cmd_args, recording paths and hashes of files before and after
command execution (aka. artifacts) in a link metadata file. The metadata is
signed with the passed signing_key, a gpg key identified by its ID, or the
default gpg key. If multiple key arguments are passed, only one key is used
in above order of precedence. The resulting link file is written to
signed with the passed signer, signing_key, a gpg key identified by its ID, or
the default gpg key. If multiple key arguments are passed, only one key is
used in above order of precedence. The resulting link file is written to
``STEP-NAME.KEYID-PREFIX.link``. If no key argument is passed the link
metadata is neither signed nor written to disk.
Expand All @@ -450,6 +471,9 @@ def in_toto_run(
signing_key (optional): A key used to sign the resulting link metadata. The
format is securesystemslib.formats.KEY_SCHEMA.
.. deprecated:: 2.2.0
Please pass a ``signer`` instead.
gpg_keyid (optional): A keyid used to identify a local gpg key used to sign
the resulting link metadata.
Expand Down Expand Up @@ -484,9 +508,12 @@ def in_toto_run(
use_dsse (optional): A boolean indicating if DSSE should be used to
generate metadata.
timeout (optional): An integer indicating the max timeout in seconds
timeout (optional): An integer indicating the max timeout in seconds
for this command. Default is 10 seconds.
signer (optional): A securesystemslib Signer instance used to
sign the resulting link metadata.
Raises:
securesystemslib.exceptions.FormatError: Passed arguments are malformed.
Expand Down Expand Up @@ -524,8 +551,12 @@ def in_toto_run(
LOG.info("Running '%s'...", name)

# Check key formats to fail early
if signer:
_check_signer(signer)

if signing_key:
_check_match_signing_key(signing_key)

if gpg_keyid:
securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid)

Expand Down Expand Up @@ -593,8 +624,10 @@ def in_toto_run(
LOG.info("Generating link metadata using Metablock...")
link_metadata = Metablock(signed=link, compact_json=compact_json)

signer = None
if signing_key:
if signer:
LOG.info("Signing link metadata using passed signer...")

elif signing_key:
LOG.info("Signing link metadata using passed key...")
signer = SSlibSigner(signing_key)

Expand Down Expand Up @@ -635,12 +668,13 @@ def in_toto_record_start(
normalize_line_endings=False,
lstrip_paths=None,
use_dsse=False,
signer=None,
):
"""Generates preliminary link metadata.
Records paths and hashes of materials in a preliminary link metadata file.
The metadata is signed with the passed signing_key, a gpg key identified by
its ID, or the default gpg key. If multiple key arguments are passed, only
Records paths and hashes of materials in a preliminary link metadata file. The
metadata is signed with the passed signer, signing_key, a gpg key identified
by its ID, or the default gpg key. If multiple key arguments are passed, only
one key is used in above order of precedence. At least one key argument must
be passed. The resulting link file is written to
``.STEP-NAME.KEYID-PREFIX.link-unfinished``.
Expand All @@ -658,6 +692,9 @@ def in_toto_record_start(
signing_key (optional): A key used to sign the resulting link metadata. The
format is securesystemslib.formats.KEY_SCHEMA.
.. deprecated:: 2.2.0
Please pass a ``signer`` instead.
gpg_keyid (optional): A keyid used to identify a local gpg key used to sign
the resulting link metadata.
Expand Down Expand Up @@ -686,6 +723,9 @@ def in_toto_record_start(
use_dsse (optional): A boolean indicating if DSSE should be used to
generate metadata.
signer (optional): A securesystemslib Signer instance used to
sign the resulting link metadata.
Raises:
securesystemslib.exceptions.FormatError: Passed arguments are malformed.
Expand Down Expand Up @@ -715,20 +755,20 @@ def in_toto_record_start(
Writes preliminary link metadata file to disk.
"""
# pylint: disable=too-many-locals
# pylint: disable=too-many-locals,too-many-branches

LOG.info("Start recording '%s'...", step_name)

# Fail if there is no signing key arg at all
if not signing_key and not gpg_keyid and not gpg_use_default:
raise ValueError(
"Pass either a signing key, a gpg keyid or set"
" gpg_use_default to True!"
)
_require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default)

# Check key formats to fail early
if signer:
_check_signer(signer)

if signing_key:
_check_match_signing_key(signing_key)

if gpg_keyid:
securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid)

Expand Down Expand Up @@ -771,7 +811,10 @@ def in_toto_record_start(
LOG.info("Generating link metadata using Metablock...")
link_metadata = Metablock(signed=link)

if signing_key:
if signer:
LOG.info("Signing link metadata using passed signer...")

elif signing_key:
LOG.info("Signing link metadata using passed key...")
signer = SSlibSigner(signing_key)

Expand Down Expand Up @@ -810,16 +853,17 @@ def in_toto_record_stop(
command=None,
byproducts=None,
environment=None,
signer=None,
):
"""Finalizes preliminary link metadata generated with in_toto_record_start.
Loads preliminary link metadata file, verifies its signature, and records
paths and hashes as products, thus finalizing the link metadata. The metadata
is signed with the passed signing_key, a gpg key identified by its ID, or the
default gpg key. If multiple key arguments are passed, only one key is used
in above order of precedence. At least one key argument must be passed and it
must be the same as the one used to sign the preliminary link metadata file.
The resulting link file is written to ``STEP-NAME.KEYID-PREFIX.link``.
is signed with the passed signer, signing_key, a gpg key identified by its ID,
or the default gpg key. If multiple key arguments are passed, only one key is
used in above order of precedence. At least one key argument must be passed
and it must be the same as the one used to sign the preliminary link metadata
file. The resulting link file is written to ``STEP-NAME.KEYID-PREFIX.link``.
Use this function together with in_toto_record_start as an alternative to
in_toto_run, in order to provide evidence for supply chain steps that cannot
Expand All @@ -834,6 +878,9 @@ def in_toto_record_stop(
signing_key (optional): A key used to sign the resulting link metadata. The
format is securesystemslib.formats.KEY_SCHEMA.
.. deprecated:: 2.2.0
Please pass a ``signer`` instead.
gpg_keyid (optional): A keyid used to identify a local gpg key used to sign
the resulting link metadata.
Expand Down Expand Up @@ -876,6 +923,9 @@ def in_toto_record_stop(
"workdir": "<CWD when executing link command>"
}
signer (optional): A securesystemslib Signer instance used to
sign the resulting link metadata.
Raises:
securesystemslib.exceptions.FormatError: Passed arguments are malformed.
Expand Down Expand Up @@ -913,14 +963,14 @@ def in_toto_record_stop(
LOG.info("Stop recording '%s'...", step_name)

# Check that we have something to sign and if the formats are right
if not signing_key and not gpg_keyid and not gpg_use_default:
raise ValueError(
"Pass either a signing key, a gpg keyid or set"
" gpg_use_default to True"
)
_require_signing_arg(signer, signing_key, gpg_keyid, gpg_use_default)

if signer:
_check_signer(signer)

if signing_key:
_check_match_signing_key(signing_key)

if gpg_keyid:
securesystemslib.formats.KEYID_SCHEMA.check_match(gpg_keyid)

Expand All @@ -935,7 +985,12 @@ def in_toto_record_stop(

# Load preliminary link file
# If we have a signing key we can use the keyid to construct the name
if signing_key:
if signer:
unfinished_fn = UNFINISHED_FILENAME_FORMAT.format(
step_name=step_name, keyid=signer.public_key.keyid
)

elif signing_key:
unfinished_fn = UNFINISHED_FILENAME_FORMAT.format(
step_name=step_name, keyid=signing_key["keyid"]
)
Expand Down Expand Up @@ -972,7 +1027,13 @@ def in_toto_record_stop(

# The file must have been signed by the same key
# If we have a signing_key we use it for verification as well
if signing_key:
if signer:
LOG.info("Verifying preliminary link signature using passed signer...")
keyid = signer.public_key.keyid
verification_key = signer.public_key.to_dict()
verification_key["keyid"] = keyid

elif signing_key:
LOG.info(
"Verifying preliminary link signature using passed signing key..."
)
Expand Down Expand Up @@ -1042,7 +1103,14 @@ def in_toto_record_stop(
LOG.info("Generating link metadata using DSSE...")
link_metadata = Envelope.from_signable(link)

if signing_key:
if signer:
LOG.info(
"Updating signature with signer '{:.8}...'...".format(
signer.public_key.keyid
)
)

elif signing_key:
LOG.info("Updating signature with key '{:.8}...'...".format(keyid))
signer = SSlibSigner(signing_key)

Expand Down
90 changes: 90 additions & 0 deletions tests/test_runlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import securesystemslib.formats
from securesystemslib.interface import (
generate_and_write_unencrypted_rsa_keypair,
import_ed25519_privatekey_from_file,
import_ed25519_publickey_from_file,
import_rsa_privatekey_from_file,
import_rsa_publickey_from_file,
)
from securesystemslib.signer import CryptoSigner, Signer

import in_toto.exceptions
import in_toto.settings
Expand Down Expand Up @@ -1398,5 +1401,92 @@ def test_check(self):
)


class TestSigner(unittest.TestCase, TmpDirMixin):
"""Test signer argument in runlib API functions (run, record)."""

@classmethod
def setUpClass(cls):
cls.set_up_test_dir() # teardown is called implicitly
keys = Path(__file__).parent / "demo_files"

rsa = "alice"
rsa_priv = import_rsa_privatekey_from_file(str(keys / rsa))
cls.rsa_pub = import_rsa_publickey_from_file(str(keys / f"{rsa}.pub"))
cls.rsa_signer = CryptoSigner.from_securesystemslib_key(rsa_priv)

ed = "danny"
ed_priv = import_ed25519_privatekey_from_file(str(keys / ed))
cls.ed_pub = import_ed25519_publickey_from_file(str(keys / f"{ed}.pub"))
cls.ed_signer = CryptoSigner.from_securesystemslib_key(ed_priv)

def test_run(self):
# Successfully create, sign and verify link
link = in_toto_run("foo", [], [], [], signer=self.rsa_signer)
self.assertIsNone(link.verify_signature(self.rsa_pub))

# Fail with wrong verification key
with self.assertRaises(SignatureVerificationError):
link.verify_signature(self.ed_pub)

# Fail with bad signer arg
class NoSigner:
pass

with self.assertRaises(ValueError):
link = in_toto_run("foo", [], [], [], signer=NoSigner())

# Fail with incompatible signers
class BadSigner(Signer):
"""Signer implementation w/o public_key attribute.
secure-systems-lab/securesystemslib#605
"""

@classmethod
def from_priv_key_uri(
cls,
priv_key_uri,
public_key,
secrets_handler=None,
):
pass

def sign(self, payload):
pass

# Fail with missing public_key attribute.
bad_signer = BadSigner()
with self.assertRaises(ValueError):
link = in_toto_run("foo", [], [], [], signer=bad_signer)

class NoKey:
pass

# Fail with wrong tpe on public_key attribute
bad_signer.public_key = ( # pylint: disable=attribute-defined-outside-init
NoKey()
)
with self.assertRaises(ValueError):
link = in_toto_run("foo", [], [], [], signer=bad_signer)

def test_record(self):
# Successfully create, sign and verify link
in_toto_record_start("bar", [], signer=self.ed_signer)
in_toto_record_stop("bar", [], signer=self.ed_signer)
link_name = FILENAME_FORMAT.format(
step_name="bar", keyid=self.ed_signer.public_key.keyid
)
link = Metablock.load(link_name)
self.assertIsNone(link.verify_signature(self.ed_pub))

# Fail with wrong verification key
with self.assertRaises(SignatureVerificationError):
link.verify_signature(self.rsa_pub)

# Fail with different signer in start and stop
in_toto_record_start("baz", [], signer=self.ed_signer)
with self.assertRaises(OSError):
in_toto_record_stop("baz", [], signer=self.rsa_signer)


if __name__ == "__main__":
unittest.main()

0 comments on commit fb96ed0

Please sign in to comment.