diff --git a/requirements.txt b/requirements.txt index ab57a3d..62f7c5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ Pillow==9.3.0 Twisted==22.10.0 zope.interface==5.4.0 -pycryptodomex==3.12.0 +cryptography==41.0.7 diff --git a/setup.cfg b/setup.cfg index c2bc77a..75f3fd3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,7 +35,7 @@ packages = install_requires = Twisted Pillow - pycryptodomex + cryptography tests_require = pexpect diff --git a/vncdotool/rfb.py b/vncdotool/rfb.py index 54a2301..e9001f9 100644 --- a/vncdotool/rfb.py +++ b/vncdotool/rfb.py @@ -32,9 +32,9 @@ cast, ) -from Cryptodome.Cipher import AES, DES -from Cryptodome.Hash import MD5 -from Cryptodome.Util.number import bytes_to_long, long_to_bytes +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.asymmetric import dh +from cryptography.hazmat.primitives import hashes from twisted.application import internet, service from twisted.internet import protocol from twisted.internet.interfaces import IConnector @@ -587,21 +587,28 @@ def _handleDHAuthCert(self, block: bytes) -> None: def _encryptArd(self) -> None: userStruct = f"{self.factory.username:\0<64}{self.factory.password:\0<64}" - s = bytes_to_long(os.urandom(512)) - g = self.generator - m = bytes_to_long(self.modulus) - sk = bytes_to_long(self.serverKey) + p = int.from_bytes(self.modulus, "big") + sk = int.from_bytes(self.serverKey, "big") + param_nums = dh.DHParameterNumbers(p=p, g=self.generator) + server_key = dh.DHPublicNumbers(sk, param_nums).public_key() - key = long_to_bytes(pow(g, s, m)) - shared = long_to_bytes(pow(sk, s, m)) + params = param_nums.parameters() + private_key = params.generate_private_key() + shared_key = private_key.exchange(server_key) - h = MD5.new() - h.update(shared) - keyDigest = h.digest() + h = hashes.Hash(hashes.MD5()) + h.update(shared_key) + key_digest = h.finalize() - cipher = AES.new(keyDigest, AES.MODE_ECB) - ciphertext = cipher.encrypt(userStruct.encode("utf-8")) - self.transport.write(ciphertext + key) + cipher = Cipher(algorithms.AES(key_digest), modes.ECB()) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(userStruct.encode("utf-8")) + ciphertext += encryptor.finalize() + + public_key = private_key.public_key() + y = public_key.public_numbers().y.to_bytes(self.keyLen, "big") + + self.transport.write(ciphertext + y) def ardRequestCredentials(self) -> None: if self.factory.username is None: @@ -612,8 +619,11 @@ def ardRequestCredentials(self) -> None: def sendPassword(self, password: str) -> None: """send password""" key = _vnc_des(password) - des = DES.new(key, DES.MODE_ECB) - response = des.encrypt(self._challenge) + # Triple-DES with the same 56-bit key repeated three times is + # equivalent to single-DES + des = Cipher(algorithms.TripleDES(key * 3), modes.ECB()) + encryptor = des.encryptor() + response = encryptor.update(self._challenge) + encryptor.finalize() self.transport.write(response) def _handleVNCAuthResult(self, block: bytes) -> None: