Skip to content

Commit

Permalink
{Containerapp} az containerapp env certificate list/upload: Refacto…
Browse files Browse the repository at this point in the history
…r command with decorator (#7295)
  • Loading branch information
Greedygre committed Feb 21, 2024
1 parent 6ba8d1e commit 0b645c3
Show file tree
Hide file tree
Showing 6 changed files with 20,946 additions and 7,594 deletions.
1 change: 1 addition & 0 deletions src/containerapp/azext_containerapp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def load_command_table(self, _):

with self.command_group('containerapp env certificate') as g:
g.custom_command('create', 'create_managed_certificate', is_preview=True)
g.custom_command('upload', 'upload_certificate')
g.custom_command('list', 'list_certificates', is_preview=True)
g.custom_command('delete', 'delete_certificate', confirmation=True, exception_handler=ex_handler_factory(), is_preview=True)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any, Dict
from knack.log import get_logger

from azure.cli.command_modules.containerapp._utils import certificate_matches, certificate_location_matches, \
load_cert_file, generate_randomized_cert_name
from azure.cli.command_modules.containerapp.base_resource import BaseResource
from azure.cli.core.azclierror import MutuallyExclusiveArgumentError, ValidationError
from azure.cli.core.commands import AzCliCommand
from knack.prompting import prompt_y_n
from knack.util import CLIError
from msrestazure.tools import is_valid_resource_id, parse_resource_id

from ._constants import PRIVATE_CERTIFICATE_RT, MANAGED_CERTIFICATE_RT, CHECK_CERTIFICATE_NAME_AVAILABILITY_TYPE, \
NAME_ALREADY_EXISTS, NAME_INVALID
from ._client_factory import handle_non_404_exception, handle_raw_exception
from ._models import ContainerAppCertificateEnvelope as ContainerAppCertificateEnvelopeModel

logger = get_logger(__name__)


class ContainerappEnvCertificateDecorator(BaseResource):
def get_argument_location(self):
return self.get_param("location")

def get_argument_thumbprint(self):
return self.get_param("thumbprint")

def set_argument_thumbprint(self, thumbprint):
return self.set_param("thumbprint", thumbprint)

def get_argument_certificate(self):
return self.get_param("certificate")

def check_cert_name_availability(self, resource_group_name, name, cert_name):
name_availability_request = {"name": cert_name, "type": CHECK_CERTIFICATE_NAME_AVAILABILITY_TYPE}
try:
return self.client.check_name_availability(self.cmd, resource_group_name, name, name_availability_request)
except CLIError as e:
handle_raw_exception(e)

def get_private_certificates(self, certificate_name=None):
if certificate_name:
try:
r = self.client.show_certificate(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name(), certificate_name)
return [r] if certificate_matches(r, self.get_argument_location(), self.get_argument_thumbprint()) else []
except Exception as e:
handle_non_404_exception(e)
return []
else:
try:
r = self.client.list_certificates(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name())
return list(filter(lambda c: certificate_matches(c, self.get_argument_location(), self.get_argument_thumbprint()), r))
except Exception as e:
handle_raw_exception(e)

def get_managed_certificates(self, certificate_name=None):
if certificate_name:
try:
r = self.client.show_managed_certificate(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name(), certificate_name)
return [r] if certificate_location_matches(r, self.get_argument_location()) else []
except Exception as e:
handle_non_404_exception(e)
return []
else:
try:
r = self.client.list_managed_certificates(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name())
return list(filter(lambda c: certificate_location_matches(c, self.get_argument_location()), r))
except Exception as e:
handle_raw_exception(e)


class ContainerappEnvCertificateListDecorator(ContainerappEnvCertificateDecorator):
def list(self):
if self.get_argument_certificate() and is_valid_resource_id(self.get_argument_certificate()):
certificate_name = parse_resource_id(self.get_argument_certificate())["resource_name"]
else:
certificate_name = self.get_argument_certificate()

return self.get_private_certificates(certificate_name)


class ContainerappEnvCertificateUploadDecorator(ContainerappEnvCertificateDecorator):
def __init__(self, cmd: AzCliCommand, client: Any, raw_parameters: Dict, models: str):
super().__init__(cmd, client, raw_parameters, models)
self.certificate = ContainerAppCertificateEnvelopeModel
self.cert_name = None

def get_argument_certificate_name(self):
return self.get_param("certificate_name")

def get_argument_certificate_file(self):
return self.get_param("certificate_file")

def get_argument_certificate_password(self):
return self.get_param("certificate_password")

def get_argument_prompt(self):
return self.get_param("prompt")

def construct_payload(self):
self.set_up_certificate_value()

self.set_up_cert_name()
if self.get_argument_certificate_password():
self.certificate["properties"]["password"] = self.get_argument_certificate_password()
self.set_up_certificate_location()

def set_up_cert_name(self):
if self.get_argument_certificate_name():
name_availability = self.check_cert_name_availability(self.get_argument_resource_group_name(), self.get_argument_name(), self.get_argument_certificate_name())
if not name_availability["nameAvailable"]:
if name_availability["reason"] == NAME_ALREADY_EXISTS:
msg = '{}. If continue with this name, it will be overwritten by the new certificate file.\nOverwrite?'
overwrite = True
if self.get_argument_prompt():
overwrite = prompt_y_n(msg.format(name_availability["message"]))
else:
logger.warning('{}. It will be overwritten by the new certificate file.'.format(
name_availability["message"]))
if overwrite:
self.cert_name = self.get_argument_certificate_name()
else:
raise ValidationError(name_availability["message"])
else:
self.cert_name = self.get_argument_certificate_name()

while not self.cert_name:
random_name = generate_randomized_cert_name(self.get_argument_thumbprint(), self.get_argument_name(), self.get_argument_resource_group_name())
check_result = self.check_cert_name_availability(self.get_argument_resource_group_name(), self.get_argument_name(), random_name)
if check_result["nameAvailable"]:
self.cert_name = random_name
elif not check_result["nameAvailable"] and (check_result["reason"] == NAME_INVALID):
raise ValidationError(check_result["message"])

def set_up_certificate_value(self):
if self.get_argument_certificate_file():
blob, thumbprint = load_cert_file(self.get_argument_certificate_file(),
self.get_argument_certificate_password())
self.certificate["properties"]["value"] = blob
self.set_argument_thumbprint(thumbprint)

def set_up_certificate_location(self):
self.certificate["location"] = self.get_argument_location()

if not self.certificate["location"]:
try:
managed_env = self.client.show(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name())
self.certificate["location"] = managed_env["location"]
except Exception as e:
handle_raw_exception(e)

def create_or_update(self):
try:
r = self.client.create_or_update_certificate(self.cmd, self.get_argument_resource_group_name(), self.get_argument_name(), self.cert_name, self.certificate)
return r
except Exception as e:
handle_raw_exception(e)


# Decorators for preview Feature
class ContainerappPreviewEnvCertificateListDecorator(ContainerappEnvCertificateListDecorator):
def get_argument_managed_certificates_only(self):
return self.get_param("managed_certificates_only")

def get_argument_private_key_certificates_only(self):
return self.get_param("private_key_certificates_only")

def validate_arguments(self):
if self.get_argument_managed_certificates_only() and self.get_argument_private_key_certificates_only():
raise MutuallyExclusiveArgumentError(
"Use either '--managed-certificates-only' or '--private-key-certificates-only'.")
if self.get_argument_managed_certificates_only() and self.get_argument_thumbprint():
raise MutuallyExclusiveArgumentError("'--thumbprint' not supported for managed certificates.")

def list(self):
if self.get_argument_certificate() and is_valid_resource_id(self.get_argument_certificate()):
certificate_name = parse_resource_id(self.get_argument_certificate())["resource_name"]
certificate_type = parse_resource_id(self.get_argument_certificate())["resource_type"]
else:
certificate_name = self.get_argument_certificate()
certificate_type = PRIVATE_CERTIFICATE_RT if self.get_argument_private_key_certificates_only() or self.get_argument_thumbprint() else (MANAGED_CERTIFICATE_RT if self.get_argument_managed_certificates_only() else None)

if certificate_type == MANAGED_CERTIFICATE_RT:
return self.get_managed_certificates(certificate_name)
if certificate_type == PRIVATE_CERTIFICATE_RT:
return self.get_private_certificates(certificate_name)
managed_certs = self.get_managed_certificates(certificate_name)
private_certs = self.get_private_certificates(certificate_name)
return managed_certs + private_certs
31 changes: 29 additions & 2 deletions src/containerapp/azext_containerapp/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from msrestazure.tools import parse_resource_id, is_valid_resource_id
from msrest.exceptions import DeserializationError

from .containerapp_env_certificate_decorator import ContainerappPreviewEnvCertificateListDecorator, \
ContainerappEnvCertificateUploadDecorator
from .connected_env_decorator import ConnectedEnvironmentDecorator, ConnectedEnvironmentCreateDecorator
from .containerapp_job_decorator import ContainerAppJobPreviewCreateDecorator
from .containerapp_env_decorator import ContainerappEnvPreviewCreateDecorator, ContainerappEnvPreviewUpdateDecorator
Expand Down Expand Up @@ -1248,8 +1250,33 @@ def create_managed_certificate(cmd, name, resource_group_name, hostname, validat


def list_certificates(cmd, name, resource_group_name, location=None, certificate=None, thumbprint=None, managed_certificates_only=False, private_key_certificates_only=False):
from azure.cli.command_modules.containerapp.custom import list_certificates_logic
return list_certificates_logic(cmd, name, resource_group_name, location, certificate, thumbprint, managed_certificates_only=managed_certificates_only, private_key_certificates_only=private_key_certificates_only)
raw_parameters = locals()

containerapp_env_certificate_list_decorator = ContainerappPreviewEnvCertificateListDecorator(
cmd=cmd,
client=ManagedEnvironmentPreviewClient,
raw_parameters=raw_parameters,
models=CONTAINER_APPS_SDK_MODELS
)
containerapp_env_certificate_list_decorator.validate_subscription_registered(CONTAINER_APPS_RP)
containerapp_env_certificate_list_decorator.validate_arguments()

return containerapp_env_certificate_list_decorator.list()


def upload_certificate(cmd, name, resource_group_name, certificate_file, certificate_name=None, certificate_password=None, location=None, prompt=False):
raw_parameters = locals()

containerapp_env_certificate_upload_decorator = ContainerappEnvCertificateUploadDecorator(
cmd=cmd,
client=ManagedEnvironmentPreviewClient,
raw_parameters=raw_parameters,
models=CONTAINER_APPS_SDK_MODELS
)
containerapp_env_certificate_upload_decorator.validate_subscription_registered(CONTAINER_APPS_RP)
containerapp_env_certificate_upload_decorator.construct_payload()

return containerapp_env_certificate_upload_decorator.create_or_update()


def delete_certificate(cmd, resource_group_name, name, location=None, certificate=None, thumbprint=None):
Expand Down
Loading

0 comments on commit 0b645c3

Please sign in to comment.