Skip to content

Commit

Permalink
Merge pull request #2 from PyryL/week3
Browse files Browse the repository at this point in the history
Week3
  • Loading branch information
PyryL committed Nov 18, 2023
2 parents f28481f + a01b60d commit 39804e1
Show file tree
Hide file tree
Showing 27 changed files with 581 additions and 106 deletions.
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ include =

omit =
tests/**/*.py
**/__init__.py
main.py
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ Implementation of [CRYSTALS-Kyber](https://pq-crystals.org/kyber/index.shtml) en

* [Requirements specification](docs/requirements.md)
* [Implementation]()
* [Testing]()
* [Testing](docs/tests.md)
* [Usage guide](docs/usage.md)

### Weekly reports

* [Week 1](docs/week-1.md)
* [Week 2](docs/week-2.md)
* [Week 3](docs/week-3.md)
82 changes: 82 additions & 0 deletions cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import argparse
from base64 import b64encode
from kyber.ccakem import ccakem_generate_keys, ccakem_encrypt, ccakem_decrypt
from kyber.constants import k, n

class CLI:
def __init__(self) -> None:
self._parser = argparse.ArgumentParser()
subparsers = self._parser.add_subparsers(title="command", required=True)

keygen_parser = subparsers.add_parser("keygen")
keygen_parser.set_defaults(command="keygen")
keygen_parser.add_argument("outfile")

pubkey_parser = subparsers.add_parser("pubkey", description="extract public key from private key")
pubkey_parser.set_defaults(command="pubkey")
pubkey_parser.add_argument("privkeyfile", help="file that contains the private key")
pubkey_parser.add_argument("--output", "-o", metavar="FILE", help="file to write the public key (default: stdout)")

encrypt_parser = subparsers.add_parser("encrypt", description="encrypt 32-byte random shared secret")
encrypt_parser.set_defaults(command="encrypt")
encrypt_parser.add_argument("--key", "-k", metavar="FILE", help="file that contains public key", required=True)
encrypt_parser.add_argument("--secret", "-s", metavar="FILE", help="file to write the shared secret", required=True)
encrypt_parser.add_argument("--cipher", "-c", metavar="FILE", help="file to write the ciphertext (default: stdout)")

decrypt_parser = subparsers.add_parser("decrypt", description="decrypt 32-byte shared secret from ciphertext")
decrypt_parser.set_defaults(command="decrypt")
decrypt_parser.add_argument("--key", "-k", metavar="FILE", help="file that contains private key", required=True)
decrypt_parser.add_argument("--output", "-o", metavar="FILE", help="file to write the shared secret (default: stdout)")
decrypt_parser.add_argument("cipherfile", help="file that contains the ciphertext")

def parse(self) -> None:
args = self._parser.parse_args()
handlers = {
"keygen": self._handle_keygen,
"pubkey": self._handle_pubkey,
"encrypt": self._handle_encrypt,
"decrypt": self._handle_decrypt,
}
handlers[args.command](args)

def _handle_keygen(self, arguments) -> None:
private_key, _ = ccakem_generate_keys()
with open(arguments.outfile, "wb") as file:
file.write(private_key)

def _handle_pubkey(self, arguments) -> None:
with open(arguments.privkeyfile, "rb") as file:
private_key = file.read()
public_key = private_key[12*k*n//8 : 24*k*n//8+32]
if arguments.output is None:
print(b64encode(public_key).decode("utf-8"))
else:
with open(arguments.output, "wb") as file:
file.write(public_key)

def _handle_encrypt(self, arguments) -> None:
with open(arguments.key, "rb") as file:
public_key = file.read()
ciphertext, shared_secret = ccakem_encrypt(public_key)
with open(arguments.secret, "wb") as file:
file.write(shared_secret)
if arguments.cipher is None:
print(b64encode(ciphertext).decode("utf-8"))
else:
with open(arguments.cipher, "wb") as file:
file.write(ciphertext)

def _handle_decrypt(self, arguments) -> None:
with open(arguments.key, "rb") as file:
private_key = file.read()
with open(arguments.cipherfile, "rb") as file:
ciphertext = file.read()
shared_secret = ccakem_decrypt(ciphertext, private_key)
if arguments.output is None:
print(b64encode(shared_secret).decode("utf-8"))
else:
with open(arguments.output, "wb") as file:
file.write(shared_secret)

if __name__ == "__main__":
CLI().parse()
55 changes: 55 additions & 0 deletions docs/tests.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Tests

All functions and methods are tested individually with sample inputs. In addition to testing with correct inputs, code is tested to fail with invalid inputs. Outputs are tested to be in correct type and to match all criteria.

In addition to unit tests, Kyber is also tested with some intergration tests. That is, a function and its inverse function are called consecutively and the output is checked to equal the original input.

Current test report:

```
tests/test_byte_conversion.py ........ [ 15%]
tests/test_cbd.py .... [ 22%]
tests/test_ccakem.py .... [ 30%]
tests/test_compression.py ...... [ 41%]
tests/test_decrypt.py ... [ 47%]
tests/test_encoding.py ........ [ 62%]
tests/test_encrypt.py ... [ 67%]
tests/test_encryption.py . [ 69%]
tests/test_key_generation.py .. [ 73%]
tests/test_modulo.py .. [ 77%]
tests/test_parse.py ... [ 83%]
tests/test_pseudo_random.py ........ [ 98%]
tests/test_round.py . [100%]
=============== 53 passed in 0.33s ================
```

Tests can be run with `poetry run invoke test`.

## Test coverage

[![codecov](https://codecov.io/gh/PyryL/kyber/graph/badge.svg?token=MXM7CFK9YQ)](https://codecov.io/gh/PyryL/kyber)

Current test coverage report:

```
Name Stmts Miss Branch BrPart Cover
------------------------------------------------------------------
kyber/ccakem.py 34 0 2 0 100%
kyber/constants.py 14 0 0 0 100%
kyber/encryption/decrypt.py 26 0 6 0 100%
kyber/encryption/encrypt.py 57 0 12 0 100%
kyber/encryption/keygen.py 35 0 8 0 100%
kyber/utils/byte_conversion.py 23 0 12 0 100%
kyber/utils/cbd.py 16 0 6 0 100%
kyber/utils/compression.py 23 0 8 0 100%
kyber/utils/encoding.py 36 0 22 0 100%
kyber/utils/modulo.py 17 0 8 0 100%
kyber/utils/parse.py 22 0 6 0 100%
kyber/utils/pseudo_random.py 23 0 0 0 100%
kyber/utils/round.py 5 0 2 0 100%
------------------------------------------------------------------
TOTAL 331 0 92 0 100%
```

For more detailed report, run `poetry run invoke coverage-report` and then open `htmlcov/index.html`.
20 changes: 19 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,25 @@ poetry install

Currently `kyber` provides three main functions that can be used directly from Python code. A sample usage is included in `main.py`.

At the moment there is no GUI or CLI available.
Kyber can also be used via command-line interface that can be accessed with `poetry run python cli.py`. It has four subcommands: `keygen`, `pubkey`, `encrypt` and `decrypt`. Run any subcommand with `-h` flag to get help. Below is a usage example:

```
# Alice
poetry run python cli.py keygen private.txt
poetry run python cli.py pubkey --output public.txt private.txt
# Alice sends her public.txt file to Bob
# Bob
poetry run python cli.py encrypt --key alice_public.txt --secret secret.txt --cipher cipher.txt
# Bob sends his cipher.txt file to Alice
# Alice
poetry run python cli.py decrypt --key private.txt --output secret.txt bob_cipher.txt
```

In the first line Alice generates herself a private key. On the second line she generates a public key matching the freshly-generated private key, after which she sends this public key to Bob. On the third line Bob encrypts a random shared secret with Alice's public key, after which he sends the ciphertext to Alice. On the last line Alice decrypts the ciphertext with her private key. At the end, both Alice and Bob have a file called `secret.txt` that contain the same shared secret.

### Tests

Expand Down
9 changes: 9 additions & 0 deletions docs/week-3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Week 3

_13. – 19.11.2023_

This week I continued where I left off last week. First, I implemented encoding in a way that the outputs of all relevant functions are bytes (instead of polynomial matrices, for example). Then I implemented CPAPKE section of the official specification document which is the part that makes Kyber key-encapsulation mechanism and not asymmetric encryption algorithm. I have kept constantly adding more tests to keep the coverage at 100% and pylint score has also been steady at around 9.2.

Next week I will start implementing my own data structures. The first will be polynomial ring, to which I should specify at least multiplication and addition operations.

Total working time this week: 7.5 hours.
66 changes: 66 additions & 0 deletions kyber/ccakem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from random import randbytes
from kyber.encryption import generate_keys, Encrypt, Decrypt
from kyber.utils.pseudo_random import H, G, kdf
from kyber.constants import k, n, du, dv

def ccakem_generate_keys() -> tuple[bytes, bytes]:
"""
Generates a new keypair.
:returns (private_key, public_key) tuple
"""

z = randbytes(32)
sk, pk = generate_keys()
sk = sk + pk + H(pk) + z

assert len(pk) == 12 * k * n//8 + 32
assert len(sk) == 24 * k * n//8 + 96

return (
sk, # private key
pk # public key
)

def ccakem_encrypt(public_key: bytes) -> tuple[bytes, bytes]:
"""
Takes public key as input and returns (ciphertext, shered_secret) as a tuple.
Shared secret is 32 bytes in length.
"""

assert len(public_key) == 12 * k * n//8 + 32

m = H(randbytes(32))
Kr = G(m + H(public_key))
K, r = Kr[:32], Kr[32:]
c = Encrypt(public_key, m, r).encrypt()
K = kdf(K + H(c), 32)

return (
c, # ciphertext
K # shared secret
)

def ccakem_decrypt(ciphertext: bytes, private_key: bytes) -> bytes:
"""
Decrypts the given ciphertext with the private key.
:returns Decrypted 32-byte shared secret.
"""

assert len(ciphertext) == du * k * n//8 + dv * n//8
assert len(private_key) == 24 * k * n//8 + 96

sk = private_key[: 12*k*n//8]
pk = private_key[12*k*n//8 : 24*k*n//8+32]
h = private_key[24*k*n//8+32 : 24*k*n//8+64]
z = private_key[24*k*n//8+64 :]

assert h == H(pk)

m = Decrypt(sk, ciphertext).decrypt()
Kr = G(m + h)
K, r = Kr[:32], Kr[32:]
c = Encrypt(pk, m, r).encrypt()

if c == ciphertext:
return kdf(K + H(c), 32)
return kdf(z + H(c), 32)
58 changes: 0 additions & 58 deletions kyber/encrypt.py

This file was deleted.

3 changes: 3 additions & 0 deletions kyber/encryption/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from kyber.encryption.keygen import generate_keys
from kyber.encryption.encrypt import Encrypt
from kyber.encryption.decrypt import Decrypt
20 changes: 16 additions & 4 deletions kyber/decrypt.py → kyber/encryption/decrypt.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
import numpy as np
from numpy.polynomial.polynomial import Polynomial
from kyber.utils.compression import compress
from kyber.utils.encoding import encode
from kyber.utils.compression import compress, decompress
from kyber.utils.encoding import encode, decode
from kyber.utils.modulo import polmod
from kyber.constants import n, k, du, dv

class Decrypt:
def __init__(self, private_key, ciphertext) -> None:
self._sk = private_key
self._c = ciphertext
if len(self._sk) != 32*12*k:
raise ValueError()
if len(self._c) != du*k*n//8 + dv*n//8:
raise ValueError()

def decrypt(self) -> bytes:
"""
Decrypts the given ciphertext with the given private key.
:returns Decrypted 32-bit shared secret
"""

s = self._sk
u, v = self._c
s = np.array(decode(self._sk, 12))

u, v = self._c[:du*k*n//8], self._c[du*k*n//8:]

u = decode(u, du)
v = decode(v, dv)[0]

u = np.array([decompress(pol, du) for pol in u])
v = decompress(v, dv)

m: Polynomial = v - np.matmul(s.T, u)
m = polmod(m)
Expand Down
Loading

0 comments on commit 39804e1

Please sign in to comment.