Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-676645: Add connector support for keypair authentication using hardware security modules or cloud key managers #1276

Closed
arsatiki opened this issue Oct 12, 2022 · 7 comments · Fixed by #1477

Comments

@arsatiki
Copy link

arsatiki commented Oct 12, 2022

What is the current behavior?

Current Snowflake keypair authentication requires that the user has the private key in their possession as a file. However, Hardware Security Modules (e.g. Smartcards, Yubikeys, …) and Cloud Key Managers do not expose the private key to the user. Instead, they provide cryptographic operations as API functionality. I can, for example, ask the HSM to sign a certain string using the private key.

What is the desired behavior?

I would like to be able to use any cryptographic device or service capable of RSA-signatures for keypair authentication. More specifically, I want an interface against which I can implement my custom signing flow.

How would this improve snowflake-connector-python?

It would provide more options for secure logins. In our case, we would use Azure Key Vault to store keys and allow services with Managed Identity to read those keys. This lets our service accounts log in without exposing passwords or private keys at any phase.

References, Other Background

As a proof of concept, I have implemented a version of keypair authentication that abstracts signing and public key operations into a separate class. An instance of this class can be provided to the Snowflake connector to sign the token.

Here's an example. I will provide a full pull request soon.

class AzureKeyVaultManager(KeyManager):
    def __init__(self, kvclient, keyname):
        self._kvclient = kvclient
        self._keyname = keyname

    def public_key(self):
        pubkey = self._kvclient.get_key(self._keyname)
        e = int.from_bytes(pubkey.key.e, 'big')
        n = int.from_bytes(pubkey.key.n, 'big')
        rsakey = RSAPublicNumbers(e, n)
        return rsakey.public_key()

    def sign(self, message):
        cc = self._kvclient.get_cryptography_client(self._keyname)
        hash = sha256(message).digest()
        result = cc.sign(SignatureAlgorithm.rs256, hash)
        return result.signature


    ctx = snowflake.connector.connect(
        user='[email protected]',
        key_manager=AzureKeyVaultManager(kc, 'testkey'),
        authenticator='KMS',
        account='foo.west-europe.azure',
        warehouse='compute_wh',
        database='misc',
    )

Please note that this approach would not add new dependencies into the connector. The user of the connector library is responsible for providing the class, customized to their particular usecase.

@github-actions github-actions bot changed the title Add connector support for keypair authentication using hardware security modules or cloud key managers SNOW-676645: Add connector support for keypair authentication using hardware security modules or cloud key managers Oct 12, 2022
@malthe
Copy link
Contributor

malthe commented Mar 14, 2023

@arsatiki any update on this?

I see that in the current codebase, authentication is locked down to an internal set of classes so this key_manager would be a great addition. We're keen to use Azure Key Vault as well for the same reasons, avoiding exposing the private key to users or systems.

@malthe
Copy link
Contributor

malthe commented Mar 14, 2023

In the Java JDBC driver, there's a similar issue.

There's a pull request open for that issue, with a very similar interface:

/** Interface for customer signer implementations for key pair authentication. */
public interface PrivateKeySigner {
  /**
   * Returns a signature for the given input.
   *
   * <p>The signature must be compatible with the "RS256" JWT signing algorithm, a.k.a.
   * "RSASSA-PKCS1-v1_5 using SHA-256"
   */
  byte[] sign(byte[] input);

  /** Returns the public key associated with the private key used by the sign() method. */
  PublicKey publicKey();
}

@arsatiki perhaps using the term PrivateKeySigner (although private key really is implied) is better than KeyManager – because it's really not about the management of keys, it's about signing bytes and providing a public key fingerprint.

@arsatiki
Copy link
Author

Hello!

Unfortunately I've been doing network and security related stuff for the past few months and haven't had time to do the necessary clean up. But if you are interested in the code I can publish the work-in-progress code

@malthe
Copy link
Contributor

malthe commented Mar 14, 2023

Please do @arsatiki – I should be able to clean it up and finish it.

@arsatiki
Copy link
Author

@malthe
Copy link
Contributor

malthe commented Jun 10, 2023

An alternative interface would be to allow specifying an external program to be responsible for providing the JWT token.

This would also enable the use of HSM- or cloud-based key managers for the SnowSQL CLI.

@Srinu3366
Copy link

Thanks @malthe for posting a sample implementation for Azure. It helped me implement similar approach for AWS KMS. I am posting my example here in case if it helps anyone.

class AwsManagedKey(RSAPrivateKey):

    _algorithm_mapping = {
        hashes.SHA256: 'RSASSA_PKCS1_V1_5_SHA_256',
        hashes.SHA384: 'RSASSA_PKCS1_V1_5_SHA_384',
        hashes.SHA512: 'RSASSA_PKCS1_V1_5_SHA_512',
    }

    def __init__(self, kms_client: BaseClient, aws_key_arn: str):
        self._kms_client = kms_client
        self._aws_key_arn = aws_key_arn

    def decrypt(self, ciphertext: bytes, padding: AsymmetricPadding) -> bytes:
        raise NotImplementedError()

    @property
    def key_size(self) -> int:
        raise NotImplementedError()

    def public_key(self) -> RSAPublicKey:
        public_key_response = self._kms_client.get_public_key(KeyId=self._aws_key_arn)

        public_key = serialization.load_der_public_key(
            public_key_response['PublicKey'],
            backend=default_backend()
        )
        return public_key

    def sign(self, data: bytes, padding: AsymmetricPadding,
             algorithm: typing.Union[asym_utils.Prehashed, hashes.HashAlgorithm]) -> bytes:

        if not isinstance(padding, PKCS1v15):
            raise ValueError(f"Unsupported padding: {padding.name}")

        sig_algorithm = self._algorithm_mapping.get(type(algorithm))
        if not sig_algorithm:
            raise ValueError(f"Unsupported hash algorithm: {algorithm.name}")

        response = self._kms_client.sign(
            KeyId=self._aws_key_arn,
            Message=data,
            MessageType="RAW",
            SigningAlgorithm=sig_algorithm
        )
        return response.get("Signature")

    def private_numbers(self) -> RSAPrivateNumbers:
        raise NotImplementedError()

    def private_bytes(self, *args) -> bytes:
        raise NotImplementedError()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants