Skip to content

Commit

Permalink
Support mutual TLS using a certificate from a Windows cert store (#341)
Browse files Browse the repository at this point in the history
Add the ability to use a client certificate located in a Windows certificate store. Previously, the client certificate and private key had to be passed by filepath or file contents. With this change, certificates and keys stored on TPM devices can be used.
  • Loading branch information
graebm authored Mar 18, 2022
1 parent 3565074 commit afe4f59
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
23 changes: 23 additions & 0 deletions awscrt/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class TlsContextOptions:
'_pkcs11_private_key_label',
'_pkcs11_cert_file_path',
'_pkcs11_cert_file_contents',
'_windows_cert_store_path',
)

def __init__(self):
Expand Down Expand Up @@ -422,6 +423,27 @@ def create_client_with_mtls_pkcs12(pkcs12_filepath, pkcs12_password):
opt.pkcs12_password = pkcs12_password
return opt

@staticmethod
def create_client_with_mtls_windows_cert_store_path(cert_path):
"""
Create options configured for use with mutual TLS in client mode,
using a certificate in a Windows certificate store.
NOTE: This configuration only works on Windows devices.
Args:
cert_path (str): Path to certificate in a Windows certificate store.
The path must use backslashes and end with the certificate's thumbprint.
Example: ``CurrentUser\\MY\\A11F8A9B5DF5B98BA3508FBCA575D09570E0D2C6``
Returns:
TlsContextOptions
"""
assert isinstance(cert_path, str)
opt = TlsContextOptions()
opt._windows_cert_store_path = cert_path
return opt

@staticmethod
def create_server_from_path(cert_filepath, pk_filepath):
"""
Expand Down Expand Up @@ -556,6 +578,7 @@ def __init__(self, options):
options._pkcs11_private_key_label,
options._pkcs11_cert_file_path,
options._pkcs11_cert_file_contents,
options._windows_cert_store_path,
)

def new_connection_options(self):
Expand Down
16 changes: 9 additions & 7 deletions source/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,11 @@ PyObject *aws_py_client_tls_ctx_new(PyObject *self, PyObject *args) {
Py_ssize_t pkcs11_cert_file_path_len;
const char *pkcs11_cert_file_contents;
Py_ssize_t pkcs11_cert_file_contents_len;
const char *windows_cert_store_path;

if (!PyArg_ParseTuple(
args,
"bzz#zz#z#zzbOz#Oz#z#z#z#",
"bzz#zz#z#zzbOz#Oz#z#z#z#z",
&min_tls_version,
&ca_dirpath,
&ca_buffer,
Expand All @@ -451,7 +452,8 @@ PyObject *aws_py_client_tls_ctx_new(PyObject *self, PyObject *args) {
&pkcs11_cert_file_path,
&pkcs11_cert_file_path_len,
&pkcs11_cert_file_contents,
&pkcs11_cert_file_contents_len)) {
&pkcs11_cert_file_contents_len,
&windows_cert_store_path)) {
return NULL;
}

Expand Down Expand Up @@ -498,16 +500,16 @@ PyObject *aws_py_client_tls_ctx_new(PyObject *self, PyObject *args) {
}
} else if (pkcs12_filepath != NULL) {
/* mTLS with PKCS#12 */
#ifdef __APPLE__
struct aws_byte_cursor password = aws_byte_cursor_from_c_str(pkcs12_password);
if (aws_tls_ctx_options_init_client_mtls_pkcs12_from_path(
&ctx_options, allocator, pkcs12_filepath, &password)) {
return PyErr_AwsLastError();
}
#else
PyErr_SetString(PyExc_NotImplementedError, "PKCS#12 is currently only supported on Apple devices");
return NULL;
#endif
} else if (windows_cert_store_path != NULL) {
/* mTLS with certificate from a Windows certificate store */
if (aws_tls_ctx_options_init_client_mtls_from_system_path(&ctx_options, allocator, windows_cert_store_path)) {
return PyErr_AwsLastError();
}
} else {
/* no mTLS */
aws_tls_ctx_options_init_default_client(&ctx_options, allocator);
Expand Down
8 changes: 6 additions & 2 deletions test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,12 @@ def test_with_mtls_pkcs12(self):
opt = TlsContextOptions.create_client_with_mtls_pkcs12(
'test/resources/unittest.p12', '1234')
ctx = ClientTlsContext(opt)
except NotImplementedError:
raise unittest.SkipTest(f'PKCS#12 not supported on this platform ({sys.platform})')
except Exception as e:
if 'PLATFORM_NOT_SUPPORTED' in str(e):
raise unittest.SkipTest(f'PKCS#12 not supported on this platform ({sys.platform})')
else:
# well then this is a real error
raise e

def test_override_default_trust_store_dir(self):
opt = TlsContextOptions()
Expand Down

0 comments on commit afe4f59

Please sign in to comment.