diff --git a/.coveragerc b/.coveragerc index a6c5302..5cbd393 100644 --- a/.coveragerc +++ b/.coveragerc @@ -6,4 +6,5 @@ include = omit = tests/**/*.py + **/__init__.py main.py diff --git a/README.md b/README.md index 8ff0092..2974723 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,11 @@ Implementation of [CRYSTALS-Kyber](https://pq-crystals.org/kyber/index.shtml) en * [Requirements specification](docs/requirements.md) * [Implementation]() -* [Testing]() +* [Testing](docs/tests.md) * [Usage guide](docs/usage.md) ### Weekly reports * [Week 1](docs/week-1.md) * [Week 2](docs/week-2.md) +* [Week 3](docs/week-3.md) diff --git a/cli.py b/cli.py new file mode 100644 index 0000000..3c5dfcb --- /dev/null +++ b/cli.py @@ -0,0 +1,82 @@ +import argparse +from base64 import b64encode +from kyber.ccakem import ccakem_generate_keys, ccakem_encrypt, ccakem_decrypt +from kyber.constants import k, n + +class CLI: + def __init__(self) -> None: + self._parser = argparse.ArgumentParser() + subparsers = self._parser.add_subparsers(title="command", required=True) + + keygen_parser = subparsers.add_parser("keygen") + keygen_parser.set_defaults(command="keygen") + keygen_parser.add_argument("outfile") + + pubkey_parser = subparsers.add_parser("pubkey", description="extract public key from private key") + pubkey_parser.set_defaults(command="pubkey") + pubkey_parser.add_argument("privkeyfile", help="file that contains the private key") + pubkey_parser.add_argument("--output", "-o", metavar="FILE", help="file to write the public key (default: stdout)") + + encrypt_parser = subparsers.add_parser("encrypt", description="encrypt 32-byte random shared secret") + encrypt_parser.set_defaults(command="encrypt") + encrypt_parser.add_argument("--key", "-k", metavar="FILE", help="file that contains public key", required=True) + encrypt_parser.add_argument("--secret", "-s", metavar="FILE", help="file to write the shared secret", required=True) + encrypt_parser.add_argument("--cipher", "-c", metavar="FILE", help="file to write the ciphertext (default: stdout)") + + decrypt_parser = subparsers.add_parser("decrypt", description="decrypt 32-byte shared secret from ciphertext") + decrypt_parser.set_defaults(command="decrypt") + decrypt_parser.add_argument("--key", "-k", metavar="FILE", help="file that contains private key", required=True) + decrypt_parser.add_argument("--output", "-o", metavar="FILE", help="file to write the shared secret (default: stdout)") + decrypt_parser.add_argument("cipherfile", help="file that contains the ciphertext") + + def parse(self) -> None: + args = self._parser.parse_args() + handlers = { + "keygen": self._handle_keygen, + "pubkey": self._handle_pubkey, + "encrypt": self._handle_encrypt, + "decrypt": self._handle_decrypt, + } + handlers[args.command](args) + + def _handle_keygen(self, arguments) -> None: + private_key, _ = ccakem_generate_keys() + with open(arguments.outfile, "wb") as file: + file.write(private_key) + + def _handle_pubkey(self, arguments) -> None: + with open(arguments.privkeyfile, "rb") as file: + private_key = file.read() + public_key = private_key[12*k*n//8 : 24*k*n//8+32] + if arguments.output is None: + print(b64encode(public_key).decode("utf-8")) + else: + with open(arguments.output, "wb") as file: + file.write(public_key) + + def _handle_encrypt(self, arguments) -> None: + with open(arguments.key, "rb") as file: + public_key = file.read() + ciphertext, shared_secret = ccakem_encrypt(public_key) + with open(arguments.secret, "wb") as file: + file.write(shared_secret) + if arguments.cipher is None: + print(b64encode(ciphertext).decode("utf-8")) + else: + with open(arguments.cipher, "wb") as file: + file.write(ciphertext) + + def _handle_decrypt(self, arguments) -> None: + with open(arguments.key, "rb") as file: + private_key = file.read() + with open(arguments.cipherfile, "rb") as file: + ciphertext = file.read() + shared_secret = ccakem_decrypt(ciphertext, private_key) + if arguments.output is None: + print(b64encode(shared_secret).decode("utf-8")) + else: + with open(arguments.output, "wb") as file: + file.write(shared_secret) + +if __name__ == "__main__": + CLI().parse() diff --git a/docs/tests.md b/docs/tests.md new file mode 100644 index 0000000..4adb789 --- /dev/null +++ b/docs/tests.md @@ -0,0 +1,55 @@ +# Tests + +All functions and methods are tested individually with sample inputs. In addition to testing with correct inputs, code is tested to fail with invalid inputs. Outputs are tested to be in correct type and to match all criteria. + +In addition to unit tests, Kyber is also tested with some intergration tests. That is, a function and its inverse function are called consecutively and the output is checked to equal the original input. + +Current test report: + +``` +tests/test_byte_conversion.py ........ [ 15%] +tests/test_cbd.py .... [ 22%] +tests/test_ccakem.py .... [ 30%] +tests/test_compression.py ...... [ 41%] +tests/test_decrypt.py ... [ 47%] +tests/test_encoding.py ........ [ 62%] +tests/test_encrypt.py ... [ 67%] +tests/test_encryption.py . [ 69%] +tests/test_key_generation.py .. [ 73%] +tests/test_modulo.py .. [ 77%] +tests/test_parse.py ... [ 83%] +tests/test_pseudo_random.py ........ [ 98%] +tests/test_round.py . [100%] + +=============== 53 passed in 0.33s ================ +``` + +Tests can be run with `poetry run invoke test`. + +## Test coverage + +[![codecov](https://codecov.io/gh/PyryL/kyber/graph/badge.svg?token=MXM7CFK9YQ)](https://codecov.io/gh/PyryL/kyber) + +Current test coverage report: + +``` +Name Stmts Miss Branch BrPart Cover +------------------------------------------------------------------ +kyber/ccakem.py 34 0 2 0 100% +kyber/constants.py 14 0 0 0 100% +kyber/encryption/decrypt.py 26 0 6 0 100% +kyber/encryption/encrypt.py 57 0 12 0 100% +kyber/encryption/keygen.py 35 0 8 0 100% +kyber/utils/byte_conversion.py 23 0 12 0 100% +kyber/utils/cbd.py 16 0 6 0 100% +kyber/utils/compression.py 23 0 8 0 100% +kyber/utils/encoding.py 36 0 22 0 100% +kyber/utils/modulo.py 17 0 8 0 100% +kyber/utils/parse.py 22 0 6 0 100% +kyber/utils/pseudo_random.py 23 0 0 0 100% +kyber/utils/round.py 5 0 2 0 100% +------------------------------------------------------------------ +TOTAL 331 0 92 0 100% +``` + +For more detailed report, run `poetry run invoke coverage-report` and then open `htmlcov/index.html`. diff --git a/docs/usage.md b/docs/usage.md index 8bf8809..052ef2e 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -12,7 +12,25 @@ poetry install Currently `kyber` provides three main functions that can be used directly from Python code. A sample usage is included in `main.py`. -At the moment there is no GUI or CLI available. +Kyber can also be used via command-line interface that can be accessed with `poetry run python cli.py`. It has four subcommands: `keygen`, `pubkey`, `encrypt` and `decrypt`. Run any subcommand with `-h` flag to get help. Below is a usage example: + +``` +# Alice +poetry run python cli.py keygen private.txt +poetry run python cli.py pubkey --output public.txt private.txt + +# Alice sends her public.txt file to Bob + +# Bob +poetry run python cli.py encrypt --key alice_public.txt --secret secret.txt --cipher cipher.txt + +# Bob sends his cipher.txt file to Alice + +# Alice +poetry run python cli.py decrypt --key private.txt --output secret.txt bob_cipher.txt +``` + +In the first line Alice generates herself a private key. On the second line she generates a public key matching the freshly-generated private key, after which she sends this public key to Bob. On the third line Bob encrypts a random shared secret with Alice's public key, after which he sends the ciphertext to Alice. On the last line Alice decrypts the ciphertext with her private key. At the end, both Alice and Bob have a file called `secret.txt` that contain the same shared secret. ### Tests diff --git a/docs/week-3.md b/docs/week-3.md new file mode 100644 index 0000000..e320afa --- /dev/null +++ b/docs/week-3.md @@ -0,0 +1,9 @@ +# Week 3 + +_13. – 19.11.2023_ + +This week I continued where I left off last week. First, I implemented encoding in a way that the outputs of all relevant functions are bytes (instead of polynomial matrices, for example). Then I implemented CPAPKE section of the official specification document which is the part that makes Kyber key-encapsulation mechanism and not asymmetric encryption algorithm. I have kept constantly adding more tests to keep the coverage at 100% and pylint score has also been steady at around 9.2. + +Next week I will start implementing my own data structures. The first will be polynomial ring, to which I should specify at least multiplication and addition operations. + +Total working time this week: 7.5 hours. diff --git a/kyber/ccakem.py b/kyber/ccakem.py new file mode 100644 index 0000000..2752f01 --- /dev/null +++ b/kyber/ccakem.py @@ -0,0 +1,66 @@ +from random import randbytes +from kyber.encryption import generate_keys, Encrypt, Decrypt +from kyber.utils.pseudo_random import H, G, kdf +from kyber.constants import k, n, du, dv + +def ccakem_generate_keys() -> tuple[bytes, bytes]: + """ + Generates a new keypair. + :returns (private_key, public_key) tuple + """ + + z = randbytes(32) + sk, pk = generate_keys() + sk = sk + pk + H(pk) + z + + assert len(pk) == 12 * k * n//8 + 32 + assert len(sk) == 24 * k * n//8 + 96 + + return ( + sk, # private key + pk # public key + ) + +def ccakem_encrypt(public_key: bytes) -> tuple[bytes, bytes]: + """ + Takes public key as input and returns (ciphertext, shered_secret) as a tuple. + Shared secret is 32 bytes in length. + """ + + assert len(public_key) == 12 * k * n//8 + 32 + + m = H(randbytes(32)) + Kr = G(m + H(public_key)) + K, r = Kr[:32], Kr[32:] + c = Encrypt(public_key, m, r).encrypt() + K = kdf(K + H(c), 32) + + return ( + c, # ciphertext + K # shared secret + ) + +def ccakem_decrypt(ciphertext: bytes, private_key: bytes) -> bytes: + """ + Decrypts the given ciphertext with the private key. + :returns Decrypted 32-byte shared secret. + """ + + assert len(ciphertext) == du * k * n//8 + dv * n//8 + assert len(private_key) == 24 * k * n//8 + 96 + + sk = private_key[: 12*k*n//8] + pk = private_key[12*k*n//8 : 24*k*n//8+32] + h = private_key[24*k*n//8+32 : 24*k*n//8+64] + z = private_key[24*k*n//8+64 :] + + assert h == H(pk) + + m = Decrypt(sk, ciphertext).decrypt() + Kr = G(m + h) + K, r = Kr[:32], Kr[32:] + c = Encrypt(pk, m, r).encrypt() + + if c == ciphertext: + return kdf(K + H(c), 32) + return kdf(z + H(c), 32) diff --git a/kyber/encrypt.py b/kyber/encrypt.py deleted file mode 100644 index bb83b29..0000000 --- a/kyber/encrypt.py +++ /dev/null @@ -1,58 +0,0 @@ -from random import randbytes -import numpy as np -from numpy.polynomial.polynomial import Polynomial -from kyber.utils.cbd import cbd -from kyber.utils.pseudo_random import prf -from kyber.utils.modulo import polmod -from kyber.utils.compression import decompress -from kyber.utils.encoding import decode -from kyber.constants import k, eta1, eta2 -from kyber.utils.byte_conversion import int_to_bytes - -class Encrypt: - def __init__(self, public_key: bytes) -> None: - self._pk = public_key - self._m = randbytes(32) - self._r = randbytes(32) - - @property - def secret(self) -> bytes: - return self._m - - def encrypt(self): - """ - Encrypts 32-bit random shared secret. - :returns Ciphertext - """ - - pk = self._pk - m = self._m - rb = self._r - assert len(m) == 32 - assert len(rb) == 32 - - A, t = pk - - N = 0 - r = np.empty((k, ), Polynomial) - for i in range(k): - r[i] = cbd(prf(rb, int_to_bytes(N)), eta1) - r[i] = polmod(r[i]) - N += 1 - - e1 = np.empty((k, ), Polynomial) - for i in range(k): - e1[i] = cbd(prf(rb, int_to_bytes(N)), eta2) - e1[i] = polmod(e1[i]) - N += 1 - - e2 = cbd(prf(rb, int_to_bytes(N)), eta2) - e2 = polmod(e2) - - u = np.matmul(A.T, r) + e1 - v = np.matmul(t.T, r) + e2 + decompress(decode(m, 1), 1) - - u = [polmod(item) for item in u] - v = polmod(v) - - return (u, v) diff --git a/kyber/encryption/__init__.py b/kyber/encryption/__init__.py new file mode 100644 index 0000000..0cbab35 --- /dev/null +++ b/kyber/encryption/__init__.py @@ -0,0 +1,3 @@ +from kyber.encryption.keygen import generate_keys +from kyber.encryption.encrypt import Encrypt +from kyber.encryption.decrypt import Decrypt diff --git a/kyber/decrypt.py b/kyber/encryption/decrypt.py similarity index 51% rename from kyber/decrypt.py rename to kyber/encryption/decrypt.py index 3ed1ee7..cdca206 100644 --- a/kyber/decrypt.py +++ b/kyber/encryption/decrypt.py @@ -1,13 +1,18 @@ import numpy as np from numpy.polynomial.polynomial import Polynomial -from kyber.utils.compression import compress -from kyber.utils.encoding import encode +from kyber.utils.compression import compress, decompress +from kyber.utils.encoding import encode, decode from kyber.utils.modulo import polmod +from kyber.constants import n, k, du, dv class Decrypt: def __init__(self, private_key, ciphertext) -> None: self._sk = private_key self._c = ciphertext + if len(self._sk) != 32*12*k: + raise ValueError() + if len(self._c) != du*k*n//8 + dv*n//8: + raise ValueError() def decrypt(self) -> bytes: """ @@ -15,8 +20,15 @@ def decrypt(self) -> bytes: :returns Decrypted 32-bit shared secret """ - s = self._sk - u, v = self._c + s = np.array(decode(self._sk, 12)) + + u, v = self._c[:du*k*n//8], self._c[du*k*n//8:] + + u = decode(u, du) + v = decode(v, dv)[0] + + u = np.array([decompress(pol, du) for pol in u]) + v = decompress(v, dv) m: Polynomial = v - np.matmul(s.T, u) m = polmod(m) diff --git a/kyber/encryption/encrypt.py b/kyber/encryption/encrypt.py new file mode 100644 index 0000000..fbcb215 --- /dev/null +++ b/kyber/encryption/encrypt.py @@ -0,0 +1,77 @@ +from random import randbytes +import numpy as np +from numpy.polynomial.polynomial import Polynomial +from kyber.utils.cbd import cbd +from kyber.utils.pseudo_random import prf +from kyber.utils.modulo import matmod, polmod +from kyber.utils.compression import compress, decompress +from kyber.utils.encoding import encode, decode +from kyber.constants import k, eta1, eta2, n, du, dv +from kyber.utils.byte_conversion import int_to_bytes +from kyber.utils.parse import parse +from kyber.utils.pseudo_random import xof + +class Encrypt: + def __init__(self, public_key: bytes, m: bytes = None, r: bytes = None) -> None: + self._pk = public_key + self._m = m if m is not None else randbytes(32) + self._r = r if r is not None else randbytes(32) + assert len(self._m) == 32 + assert len(self._r) == 32 + if len(self._pk) != 12 * k * int(n/8) + 32: + raise ValueError() + + @property + def secret(self) -> bytes: + """The 32-bit shared secret that was encrypted.""" + return self._m + + def encrypt(self): + """ + Encrypts 32-bit random shared secret. + :returns Ciphertext + """ + + m = self._m + rb = self._r + + t, rho = self._pk[:-32], self._pk[-32:] + t = np.array(decode(t, 12)) + + A = np.empty((k, k), Polynomial) + for i in range(k): + for j in range(k): + A[i][j] = parse(xof(rho, int_to_bytes(i), int_to_bytes(j))) + + N = 0 + r = np.empty((k, ), Polynomial) + for i in range(k): + r[i] = cbd(prf(rb, int_to_bytes(N)), eta1) + r[i] = polmod(r[i]) + N += 1 + + e1 = np.empty((k, ), Polynomial) + for i in range(k): + e1[i] = cbd(prf(rb, int_to_bytes(N)), eta2) + e1[i] = polmod(e1[i]) + N += 1 + + e2 = cbd(prf(rb, int_to_bytes(N)), eta2) + e2 = polmod(e2) + + u = np.matmul(A.T, r) + e1 + v = np.matmul(t.T, r) + e2 + decompress(decode(m, 1)[0], 1) + + u = matmod(u) + v = polmod(v) + + u = compress(u, du) + v = compress([v], dv) + + u = encode(u, du) + v = encode(v, dv) + + assert len(u) == du * k * n//8 + assert len(v) == dv * n//8 + + return u + v diff --git a/kyber/keygen.py b/kyber/encryption/keygen.py similarity index 61% rename from kyber/keygen.py rename to kyber/encryption/keygen.py index aaa0673..3e3c99b 100644 --- a/kyber/keygen.py +++ b/kyber/encryption/keygen.py @@ -1,11 +1,13 @@ -from random import randbytes, randint +from random import randbytes import numpy as np from numpy.polynomial.polynomial import Polynomial -from kyber.constants import k, eta1, q -from kyber.utils.pseudo_random import prf, G +from kyber.constants import k, eta1 +from kyber.utils.pseudo_random import prf, G, xof from kyber.utils.cbd import cbd from kyber.utils.modulo import matmod, polmod from kyber.utils.byte_conversion import int_to_bytes +from kyber.utils.encoding import encode +from kyber.utils.parse import parse def generate_keys() -> tuple: """ @@ -14,12 +16,12 @@ def generate_keys() -> tuple: """ d = randbytes(32) - sigma = G(d)[32:] + rho, sigma = G(d)[:32], G(d)[32:] A = np.empty((k, k), Polynomial) for i in range(k): for j in range(k): - A[i][j] = Polynomial([randint(0, q-1) for _ in range(256)]) + A[i][j] = parse(xof(rho, int_to_bytes(i), int_to_bytes(j))) N = 0 s = np.empty((k, ), Polynomial) @@ -34,10 +36,15 @@ def generate_keys() -> tuple: e[i] = polmod(e[i]) N += 1 - t = np.matmul(A, s) + e + t = np.matmul(A, s) + e # t is a polynomial matrix with shape (k, ) t = matmod(t) + s: bytes = encode(s, 12) + t: bytes = encode(t, 12) + assert len(s) == 32*12*k + assert len(t) == 32*12*k + return ( s, # private key - (A, t) # public key + t+rho # public key ) diff --git a/kyber/utils/cbd.py b/kyber/utils/cbd.py index 4b0fdef..60abbee 100644 --- a/kyber/utils/cbd.py +++ b/kyber/utils/cbd.py @@ -3,7 +3,8 @@ def cbd(b: bytes, eta: int) -> Polynomial: """ - Deterministically creates and returns a polynomial (degree 255) from the given byte array (length 64*eta). + Deterministically creates and returns a polynomial (degree 255) + from the given byte array (length 64*eta). """ if len(b) != 64*eta: diff --git a/kyber/utils/compression.py b/kyber/utils/compression.py index a646ab3..6dc47c3 100644 --- a/kyber/utils/compression.py +++ b/kyber/utils/compression.py @@ -22,12 +22,21 @@ def decompress(pol: Polynomial, d: int) -> Polynomial: return Polynomial([decompress_int(c, d) for c in pol.coef]) def compress_int(x: int, d: int) -> int: + """ + Performs compression to a single integer + by reducing it to range `0...2**d-1` (inclusive). + """ assert d < ceil(log2(q)) result = normal_round((2**d / q) * x) % (2**d) - assert 0 <= result and result <= 2**d-1 + assert 0 <= result <= 2**d-1 return result def decompress_int(x: int, d: int) -> int: + """ + Performs decompression to a single integer + by multiplying it by `q/(2**d)`. + :param x The integer to be decompressed, in range `0...2**d-1` (inclusive). + """ assert d < ceil(log2(q)) if x < 0 or x > 2**d-1: raise ValueError() diff --git a/kyber/utils/encoding.py b/kyber/utils/encoding.py index 67e4004..2aaa533 100644 --- a/kyber/utils/encoding.py +++ b/kyber/utils/encoding.py @@ -32,17 +32,21 @@ def encode(pols: list[Polynomial], l: int) -> bytes: assert len(result) == 32*l*len(pols) return bytes(result) -def decode(b: bytes, l: int) -> Polynomial: +def decode(b: bytes, l: int) -> list[Polynomial]: """ - Converts the given byte array (length `32*l`) into a polynomial (degree 255) + Converts the given byte array (length `32*l*x` for some integer x) into + a list of polynomials (length x, each degree 255) in which each coefficient is in range `0...2**l-1` (inclusive). """ - if len(b) != 32*l: + if len(b) % 32*l != 0: raise ValueError() - bits = bytes_to_bits(b) - f = np.empty((256, )) - for i in range(256): - f[i] = sum(bits[i*l+j]*2**j for j in range(l)) # accesses each bit exactly once - assert 0 <= f[i] and f[i] <= 2**l-1 - return Polynomial(f) + result = [] + for t in range(len(b) // (32*l)): + bits = bytes_to_bits(b[32*l*t : 32*l*(t+1)]) + f = np.empty((256, )) + for i in range(256): + f[i] = sum(bits[i*l+j]*2**j for j in range(l)) # accesses each bit exactly once + assert 0 <= f[i] and f[i] <= 2**l-1 + result.append(Polynomial(f)) + return result diff --git a/kyber/utils/parse.py b/kyber/utils/parse.py new file mode 100644 index 0000000..1f320bb --- /dev/null +++ b/kyber/utils/parse.py @@ -0,0 +1,30 @@ +from math import floor +from typing import Generator +from numpy.polynomial.polynomial import Polynomial +import numpy as np +from kyber.constants import n, q + +def byte_to_int(b: bytes) -> int: + """Returns the unsigned integer that the given big-endian byte array represents.""" + return int.from_bytes(b) + +def parse(stream: Generator[bytes, None, None]) -> Polynomial: + """ + Deterministically creates a polynomial (degree n-1, each coefficient in + range `0...4095` inclusive) from the given bytestream. + """ + + i, j = 0, 0 + a = np.empty((n, )) + while j < n: + b1, b2, b3 = byte_to_int(next(stream)), byte_to_int(next(stream)), byte_to_int(next(stream)) + d1 = b1 + 256 * (b2 % 16) + d2 = floor(b2 / 16) + 16 * b3 + if d1 < q: + a[j] = d1 + j += 1 + if d2 < q and j < n: + a[j] = d2 + j += 1 + i += 3 + return Polynomial(a) diff --git a/kyber/utils/pseudo_random.py b/kyber/utils/pseudo_random.py index d06cb2b..bc1e3ab 100644 --- a/kyber/utils/pseudo_random.py +++ b/kyber/utils/pseudo_random.py @@ -1,4 +1,5 @@ -from Crypto.Hash import SHAKE256, SHA3_512 +from typing import Generator +from Crypto.Hash import SHAKE256, SHA3_512, SHAKE128, SHA3_256 def prf(s: bytes, b: bytes) -> bytes: """ @@ -12,6 +13,12 @@ def prf(s: bytes, b: bytes) -> bytes: shake.update(s + b) return shake.read(128) +def kdf(b: bytes, l: int) -> bytes: + """Deterministically generate and return `l` pseudo-random bytes from the given seed.""" + shake = SHAKE256.new() + shake.update(b) + return shake.read(l) + def G(b: bytes) -> bytes: """ Deterministically returns 64 pseudo-random bytes based on the given byte array. @@ -22,3 +29,16 @@ def G(b: bytes) -> bytes: h = SHA3_512.new() h.update(b) return h.digest() + +def xof(p: bytearray, i: bytes, j: bytes) -> Generator[bytes, None, None]: + """Generator that yields a single pseudo-random byte at a time based on the given inputs.""" + shake = SHAKE128.new() + shake.update(p + i + j) + while True: + yield shake.read(1) + +def H(b: bytes) -> bytes: + """Deterministically returns 32 pseudo-random bytes.""" + h = SHA3_256.new() + h.update(b) + return h.digest() diff --git a/main.py b/main.py index c50075f..8cbb41b 100644 --- a/main.py +++ b/main.py @@ -1,20 +1,10 @@ -from kyber.keygen import generate_keys -from kyber.encrypt import Encrypt -from kyber.decrypt import Decrypt +from kyber.ccakem import ccakem_generate_keys, ccakem_encrypt, ccakem_decrypt def main(): - # generate keypair - private_key, public_key = generate_keys() + private_key, public_key = ccakem_generate_keys() + ciphertext, shared_secret1 = ccakem_encrypt(public_key) + shared_secret2 = ccakem_decrypt(ciphertext, private_key) - # encrypt - encrypter = Encrypt(public_key) - ciphertext = encrypter.encrypt() - shared_secret1 = encrypter.secret - - # decrypt - shared_secret2 = Decrypt(private_key, ciphertext).decrypt() - - # analyse assert shared_secret1 == shared_secret2 assert len(shared_secret1) == 32 print("shared secret", shared_secret1.hex()) diff --git a/tasks.py b/tasks.py index f618c75..acc40fc 100644 --- a/tasks.py +++ b/tasks.py @@ -6,7 +6,7 @@ def test(ctx): @task def coverage(ctx): - ctx.run("coverage run -m pytest tests/", pty=True) + ctx.run("coverage run -m pytest --ignore tests/test_integration.py tests/", pty=True) @task(coverage) def coverage_report(ctx): diff --git a/tests/test_ccakem.py b/tests/test_ccakem.py new file mode 100644 index 0000000..e8c0a5d --- /dev/null +++ b/tests/test_ccakem.py @@ -0,0 +1,41 @@ +import unittest +from random import seed, randbytes +from kyber.ccakem import ccakem_generate_keys, ccakem_encrypt, ccakem_decrypt +from kyber.utils.pseudo_random import H +from kyber.constants import k, n, du, dv + +class TestCCAKEM(unittest.TestCase): + def test_ccakem_key_generation(self): + private_key, public_key = ccakem_generate_keys() + # check that keys have correct types and lengths + self.assertEqual(type(private_key), bytes) + self.assertEqual(type(public_key), bytes) + self.assertEqual(len(private_key), 24 * k * n//8 + 96) + self.assertEqual(len(public_key), 12 * k * n//8 + 32) + # check that public key is concatenated inside private key + private_key_section = private_key[12*k*n//8 : 24*k*n//8+32] + self.assertEqual(private_key_section, public_key) + + def test_ccakem_encrypt(self): + _, public_key = ccakem_generate_keys() + ciphertext, shared_secret = ccakem_encrypt(public_key) + self.assertEqual(type(ciphertext), bytes) + self.assertEqual(type(shared_secret), bytes) + self.assertEqual(len(ciphertext), du * k * n//8 + dv * n//8) + self.assertEqual(len(shared_secret), 32) + + def test_ccakem_decrypt(self): + # create seemingly valid private key by forming it from valid components + seed(42) + ciphertext = randbytes(du * k * n//8 + dv * n//8) + public_key = randbytes(12 * k * n//8 + 32) + private_key = randbytes(12*k*n//8) + public_key + H(public_key) + randbytes(32) + shared_secret = ccakem_decrypt(ciphertext, private_key) + self.assertEqual(type(shared_secret), bytes) + self.assertEqual(len(shared_secret), 32) + + def test_ccakem_integration(self): + private_key, public_key = ccakem_generate_keys() + ciphertext, shared_secret1 = ccakem_encrypt(public_key) + shared_secret2 = ccakem_decrypt(ciphertext, private_key) + self.assertEqual(shared_secret1, shared_secret2) diff --git a/tests/test_decrypt.py b/tests/test_decrypt.py new file mode 100644 index 0000000..b608df5 --- /dev/null +++ b/tests/test_decrypt.py @@ -0,0 +1,29 @@ +import unittest +from random import seed, randbytes +from kyber.encryption import Decrypt +from kyber.constants import k, n, du, dv + +class TestDecrypt(unittest.TestCase): + def setUp(self): + seed(42) + + def test_decryption_outputs_valid_shared_secret(self): + private_key = randbytes(32*12*k) + ciphertext = randbytes(du*k*n//8 + dv*n//8) + shared_secret = Decrypt(private_key, ciphertext).decrypt() + self.assertEqual(type(shared_secret), bytes) + self.assertEqual(len(shared_secret), 32) + + def test_decryption_raises_with_invalid_private_key(self): + # this private key is one byte too long + invalid_private_key = randbytes(32*12*k + 1) + valid_ciphertext = randbytes(du*k*n//8 + dv*n//8) + with self.assertRaises(ValueError): + Decrypt(invalid_private_key, valid_ciphertext) + + def test_decryption_raises_with_invalid_ciphertext(self): + # this ciphertext is one byte too short + valid_private_key = randbytes(32*12*k) + invalid_ciphertext = randbytes(du*k*n//8 + dv*n//8 - 1) + with self.assertRaises(ValueError): + Decrypt(valid_private_key, invalid_ciphertext) diff --git a/tests/test_encoding.py b/tests/test_encoding.py index fb0b35c..0199d5a 100644 --- a/tests/test_encoding.py +++ b/tests/test_encoding.py @@ -12,17 +12,17 @@ def setUp(self): self.polynomial2 = Polynomial([randint(0, 1) for _ in range(256)]) def test_encoding_symmetry(self): - polynomial = decode(self.data, self.l) - restored_data = encode([polynomial], self.l) + polynomials = decode(self.data, self.l) + restored_data = encode(polynomials, self.l) self.assertEqual(self.data, restored_data) def test_decode_coefficients(self): - polynomial = decode(self.data, self.l) + polynomial = decode(self.data, self.l)[0] for c in polynomial.coef: self.assertTrue(0 <= int(c) or int(c) <= 2**self.l-1) def test_decode_degree(self): - polynomial = decode(self.data, self.l) + polynomial = decode(self.data, self.l)[0] self.assertEqual(len(polynomial.coef), 256) def test_decode_raises_with_invalid_argument_length(self): diff --git a/tests/test_encrypt.py b/tests/test_encrypt.py new file mode 100644 index 0000000..89b3af1 --- /dev/null +++ b/tests/test_encrypt.py @@ -0,0 +1,23 @@ +import unittest +from random import seed, randbytes +from kyber.encryption import Encrypt +from kyber.constants import k, n, du, dv + +class TestEncrypt(unittest.TestCase): + def test_encryption_raises_with_invalid_input(self): + # this public key is one byte too short + seed(42) + invalid_public_key = randbytes(12 * k * n//8 + 31) + with self.assertRaises(ValueError): + Encrypt(invalid_public_key) + + def test_encryption_generates_valid_shared_secret(self): + seed(42) + encrypter = Encrypt(randbytes(12 * k * n//8 + 32)) + self.assertEqual(len(encrypter.secret), 32) + + def test_encryption_outputs_valid_ciphertext(self): + seed(42) + ciphertext = Encrypt(randbytes(12 * k * n//8 + 32)).encrypt() + self.assertEqual(type(ciphertext), bytes) + self.assertEqual(len(ciphertext), du*k*n//8 + dv*n//8) diff --git a/tests/test_integration.py b/tests/test_encryption.py similarity index 82% rename from tests/test_integration.py rename to tests/test_encryption.py index 4697c8b..be71363 100644 --- a/tests/test_integration.py +++ b/tests/test_encryption.py @@ -1,7 +1,5 @@ import unittest -from kyber.keygen import generate_keys -from kyber.encrypt import Encrypt -from kyber.decrypt import Decrypt +from kyber.encryption import generate_keys, Encrypt, Decrypt class TestIntegration(unittest.TestCase): def test_encryption_symmetry(self): diff --git a/tests/test_key_generation.py b/tests/test_key_generation.py new file mode 100644 index 0000000..8cd757e --- /dev/null +++ b/tests/test_key_generation.py @@ -0,0 +1,15 @@ +import unittest +from kyber.encryption import generate_keys +from kyber.constants import k, n + +class TestKeyGeneration(unittest.TestCase): + def test_key_generation_output_characteristics(self): + private_key, public_key = generate_keys() + self.assertEqual(len(private_key), 12 * k * n//8) + self.assertEqual(len(public_key), 12 * k * n//8 + 32) + + def test_key_generation_outputs_differ(self): + # two subsequent calls should output completely different keypairs + keypair1, keypair2 = generate_keys(), generate_keys() + self.assertNotEqual(keypair1[0], keypair2[0]) + self.assertNotEqual(keypair1[1], keypair2[1]) diff --git a/tests/test_parse.py b/tests/test_parse.py new file mode 100644 index 0000000..631a593 --- /dev/null +++ b/tests/test_parse.py @@ -0,0 +1,27 @@ +import unittest +from kyber.utils.parse import parse +from typing import Generator + +def sample_generator(seed: int = 0) -> Generator[bytes, None, None]: + """A generator that yields one byte at a time `seed, seed+1, ..., 255, 0, 1, ...`.""" + i = seed + while True: + yield bytes([i % 256]) + i += 1 + +class TestParse(unittest.TestCase): + def test_parse_outputted_polynomial_characteristics(self): + pol = parse(sample_generator()) + self.assertEqual(len(pol.coef), 256) + for c in pol.coef: + self.assertTrue(0 <= c and c <= 4095) + + def test_parse_outputs_same_polynomial_with_same_input(self): + polynomial1 = parse(sample_generator()) + polynomial2 = parse(sample_generator()) + self.assertEqual(polynomial1, polynomial2) + + def test_parse_outputs_different_polynomials_with_different_inputs(self): + polynomial1 = parse(sample_generator()) + polynomial2 = parse(sample_generator(42)) + self.assertNotEqual(polynomial1, polynomial2) diff --git a/tests/test_pseudo_random.py b/tests/test_pseudo_random.py index 908fb62..23b452c 100644 --- a/tests/test_pseudo_random.py +++ b/tests/test_pseudo_random.py @@ -1,6 +1,6 @@ import unittest from random import seed, randbytes -from kyber.utils.pseudo_random import prf, G +from kyber.utils.pseudo_random import prf, G, xof class TestPseudoRandom(unittest.TestCase): def setUp(self): @@ -39,3 +39,18 @@ def test_g_returns_different_results_with_different_arguments(self): argument1, argument2 = randbytes(9), randbytes(10) result1, result2 = G(argument1), G(argument2) self.assertNotEqual(result1, result2) + + + def test_xof_returns_same_bytes_with_same_arguments(self): + arguments = (randbytes(32), randbytes(1), randbytes(1)) + generator1 = xof(arguments[0], arguments[1], arguments[2]) + generator2 = xof(arguments[0], arguments[1], arguments[2]) + for i in range(1000): + self.assertEqual(next(generator1), next(generator2), f"outputs differ at index {i}") + + def test_xof_returns_different_bytes_with_different_arguments(self): + generator1 = xof(randbytes(32), randbytes(1), randbytes(1)) + generator2 = xof(randbytes(32), randbytes(1), randbytes(1)) + output1 = [next(generator1) for _ in range(1000)] + output2 = [next(generator2) for _ in range(1000)] + self.assertNotEqual(output1, output2)