diff --git a/src/acat/azext_acat/commands.py b/src/acat/azext_acat/commands.py index 01ecdbb5777..660531730a9 100644 --- a/src/acat/azext_acat/commands.py +++ b/src/acat/azext_acat/commands.py @@ -12,47 +12,71 @@ def load_command_table(self, _): # pylint: disable=unused-argument - with self.command_group('acat') as g: + with self.command_group("acat"): from .custom import OnboardAcat - self.command_table["acat onboard"]=OnboardAcat(loader=self) - with self.command_group('acat report') as g: - + self.command_table["acat onboard"] = OnboardAcat(loader=self) + + with self.command_group("acat report"): + from .custom import ListAcatReport - self.command_table["acat report list"]=ListAcatReport(loader=self) - + + self.command_table["acat report list"] = ListAcatReport(loader=self) + from .custom import ShowAcatReport - self.command_table["acat report show"]=ShowAcatReport(loader=self) + + self.command_table["acat report show"] = ShowAcatReport(loader=self) from .custom import CreateAcatReport - self.command_table["acat report create"]=CreateAcatReport(loader=self) + + self.command_table["acat report create"] = CreateAcatReport(loader=self) from .custom import UpdateAcatReport - self.command_table["acat report create"]=UpdateAcatReport(loader=self) - + + self.command_table["acat report create"] = UpdateAcatReport(loader=self) + from .custom import DeleteAcatReport - self.command_table["acat report delete"]=DeleteAcatReport(loader=self) - + + self.command_table["acat report delete"] = DeleteAcatReport(loader=self) + from .custom import GetControlAssessment - self.command_table["acat report get-control-assessment"]=GetControlAssessment(loader=self) - + + self.command_table["acat report get-control-assessment"] = GetControlAssessment( + loader=self + ) + from .custom import DownloadAcatReport - self.command_table["acat report download"]=DownloadAcatReport(loader=self) - - with self.command_group('acat report webhook') as g: + + self.command_table["acat report download"] = DownloadAcatReport(loader=self) + + with self.command_group("acat report webhook"): from .custom import ListAcatReportWebhook - self.command_table["acat report webhook list"]=ListAcatReportWebhook(loader=self) - + + self.command_table["acat report webhook list"] = ListAcatReportWebhook( + loader=self + ) + from .custom import ShowAcatReportWebhook - self.command_table["acat report webhook show"]=ShowAcatReportWebhook(loader=self) - + + self.command_table["acat report webhook show"] = ShowAcatReportWebhook( + loader=self + ) + from .custom import CreateAcatReportWebhook - self.command_table["acat report webhook create"]=CreateAcatReportWebhook(loader=self) - + + self.command_table["acat report webhook create"] = CreateAcatReportWebhook( + loader=self + ) + from .custom import UpdateAcatReportWebhook - self.command_table["acat report webhook update"]=UpdateAcatReportWebhook(loader=self) - + + self.command_table["acat report webhook update"] = UpdateAcatReportWebhook( + loader=self + ) + from .custom import DeleteAcatReportWebhook - self.command_table["acat report webhook delete"]=DeleteAcatReportWebhook(loader=self) - \ No newline at end of file + + self.command_table["acat report webhook delete"] = DeleteAcatReportWebhook( + loader=self + ) diff --git a/src/acat/azext_acat/custom.py b/src/acat/azext_acat/custom.py index d4ee64a37d0..4b6963ecc1d 100644 --- a/src/acat/azext_acat/custom.py +++ b/src/acat/azext_acat/custom.py @@ -7,28 +7,71 @@ # pylint: disable=too-many-lines # pylint: disable=too-many-statements +# pylint: disable=protected-access +# pylint: disable=anomalous-backslash-in-string +import os from knack.log import get_logger +from azure.cli.core.aaz import register_client, AAZClientConfiguration +from azure.cli.core.commands import LongRunningOperation +from azure.cli.core.aaz._client import AAZMgmtClient +from .utils import ( + GetClosestFullHour, + GetLocalTimeZone, + ParseSubsFromResources, + saveArrayAsCsv, +) from .aaz.latest.app_compliance_automation import Onboard as _AcatOnboard from .aaz.latest.app_compliance_automation.report import List as _AcatListReport from .aaz.latest.app_compliance_automation.report import Show as _AcatShowReport from .aaz.latest.app_compliance_automation.report import Create as _AcatCreateReport from .aaz.latest.app_compliance_automation.report import Update as _AcatUpdateReport from .aaz.latest.app_compliance_automation.report import Delete as _AcatDeleteReport -from .aaz.latest.app_compliance_automation.report.snapshot import List as _AcatListSnapshot -from .aaz.latest.app_compliance_automation.report.snapshot import Download as _AcatDownloadSnapshot -from .aaz.latest.app_compliance_automation.report.webhook import List as _AcatListReportWebhook -from .aaz.latest.app_compliance_automation.report.webhook import Show as _AcatShowReportWebhook -from .aaz.latest.app_compliance_automation.report.webhook import Create as _AcatCreateReportWebhook -from .aaz.latest.app_compliance_automation.report.webhook import Update as _AcatUpdateReportWebhook -from .aaz.latest.app_compliance_automation.report.webhook import Delete as _AcatDeleteReportWebhook -from azure.cli.core.commands import LongRunningOperation -from .utils import * -import os +from .aaz.latest.app_compliance_automation.report.snapshot import ( + List as _AcatListSnapshot, +) +from .aaz.latest.app_compliance_automation.report.snapshot import ( + Download as _AcatDownloadSnapshot, +) +from .aaz.latest.app_compliance_automation.report.webhook import ( + List as _AcatListReportWebhook, +) +from .aaz.latest.app_compliance_automation.report.webhook import ( + Show as _AcatShowReportWebhook, +) +from .aaz.latest.app_compliance_automation.report.webhook import ( + Create as _AcatCreateReportWebhook, +) +from .aaz.latest.app_compliance_automation.report.webhook import ( + Update as _AcatUpdateReportWebhook, +) +from .aaz.latest.app_compliance_automation.report.webhook import ( + Delete as _AcatDeleteReportWebhook, +) logger = get_logger(__name__) +@register_client("DupAadTokenClient") +class AAZDupAadTokenClient(AAZMgmtClient): + @classmethod + def _build_configuration(cls, ctx, credential, **kwargs): + from azure.cli.core.auth.util import resource_to_scopes + from azure.core.pipeline.policies import HeadersPolicy + + token = "Bearer " + credential.get_token().token + headers_policy = HeadersPolicy(**kwargs) + headers_policy.add_header("x-ms-aad-user-token", token) + kwargs["headers_policy"] = headers_policy + + return AAZClientConfiguration( + credential=credential, + credential_scopes=resource_to_scopes( + ctx.cli_ctx.cloud.endpoints.active_directory_resource_id + ), + **kwargs + ) + class OnboardAcat(_AcatOnboard): class OnboardAcatWithDupAadToken(_AcatOnboard.ProviderActionsOnboard): @@ -38,12 +81,12 @@ def _execute_operations(self): self.pre_operations() yield self.OnboardAcatWithDupAadToken(ctx=self.ctx)() self.post_operations() - + class ListAcatReport(_AcatListReport): class ListAcatReportWithDupAadToken(_AcatListReport.ReportList): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.ListAcatReportWithDupAadToken(ctx=self.ctx)() @@ -51,9 +94,10 @@ def _execute_operations(self): @classmethod def _build_arguments_schema(cls, *args, **kwargs): - args_schema= super()._build_arguments_schema(*args, **kwargs) - - from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat + args_schema = super()._build_arguments_schema(*args, **kwargs) + + from azure.cli.core.aaz import AAZStrArg, AAZStrArgFormat + args_schema.report_creator_tenant_id._registered = False args_schema.tenant_id = AAZStrArg( options=["--tenant"], @@ -63,16 +107,16 @@ def _build_arguments_schema(cls, *args, **kwargs): ), ) return args_schema - + def pre_operations(self): args = self.ctx.args - args.report_creator_tenant_id=args.tenant_id - - + args.report_creator_tenant_id = args.tenant_id + + class ShowAcatReport(_AcatShowReport): class ShowAcatReportWithDupAadToken(_AcatShowReport.ReportGet): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.ShowAcatReportWithDupAadToken(ctx=self.ctx)() @@ -87,49 +131,51 @@ def _execute_operations(self): self.pre_operations() yield self.CreateAcatReportWithDupAadToken(ctx=self.ctx)() self.post_operations() - + @classmethod def _build_arguments_schema(cls, *args, **kwargs): args_schema = super()._build_arguments_schema(*args, **kwargs) from azure.cli.core.aaz import AAZDateTimeArg, AAZStrArg - - args_schema.time_zone._required=False - args_schema.time_zone._registered=False - args_schema.time_zone_with_default= AAZStrArg( + + args_schema.time_zone._required = False + args_schema.time_zone._registered = False + args_schema.time_zone_with_default = AAZStrArg( options=["--time-zone"], arg_group="Properties", - help="Report collection trigger time's time zone, the available list can be obtained by executing \"Get-TimeZone -ListAvailable\" in PowerShell. An example of valid timezone id is \"Pacific Standard Time\".", - default=GetLocalTimeZone() + help="""Report collection trigger time\'s time zone, the available list can be obtained by executing + "Get-TimeZone -ListAvailable" in PowerShell. An example of valid timezone id is "Pacific Standard Time".""", + default=GetLocalTimeZone(), ) - args_schema.trigger_time._required=False - args_schema.trigger_time._registered=False + args_schema.trigger_time._required = False + args_schema.trigger_time._registered = False args_schema.trigger_time_with_default = AAZDateTimeArg( options=["--trigger-time"], arg_group="Properties", help="Report collection trigger time.", - default=GetClosestFullHour() + default=GetClosestFullHour(), ) return args_schema - + def pre_operations(self): - cmd=OnboardAcat(cli_ctx=self.cli_ctx) - - poller=cmd._handler({ - "cmd": cmd, - "subscription_ids": ParseSubsFromResources(self.ctx.args.resources) - }) - LongRunningOperation( - cmd.cli_ctx, 'Starting {}'.format(cmd.name))(poller) - + cmd = OnboardAcat(cli_ctx=self.cli_ctx) + + poller = cmd._handler( + { + "cmd": cmd, + "subscription_ids": ParseSubsFromResources(self.ctx.args.resources), + } + ) + LongRunningOperation(cmd.cli_ctx, f"Starting {cmd.name}")(poller) + args = self.ctx.args - args.time_zone=args.time_zone_with_default - args.trigger_time=args.trigger_time_with_default - - + args.time_zone = args.time_zone_with_default + args.trigger_time = args.trigger_time_with_default + + class UpdateAcatReport(_AcatUpdateReport): class UpdateAcatReportWithDupAadToken(_AcatUpdateReport.ReportUpdate): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() yield self.UpdateAcatReportWithDupAadToken(ctx=self.ctx)() @@ -139,7 +185,7 @@ def _execute_operations(self): class DeleteAcatReport(_AcatDeleteReport): class DeleteAcatReportWithDupAadToken(_AcatDeleteReport.ReportDelete): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() yield self.DeleteAcatReportWithDupAadToken(ctx=self.ctx)() @@ -149,28 +195,29 @@ def _execute_operations(self): class DownloadAcatReport(_AcatDownloadSnapshot): class DownloadAcatReportWithDupAadToken(_AcatDownloadSnapshot.SnapshotDownload): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() if self.ctx.args.snapshot_name is None: - return "No snapshot found for report {}".format(self.ctx.args.report_name) + yield f"No snapshot found for report {self.ctx.args.report_name}" yield self.DownloadAcatReportWithDupAadToken(ctx=self.ctx)() self.post_operations() - + @classmethod def _build_arguments_schema(cls, *args, **kwargs): - from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat - args_schema= super()._build_arguments_schema(*args, **kwargs) - args_schema.snapshot_name._required=False - args_schema.snapshot_name._registered=False - - args_schema.path=AAZStrArg( + from azure.cli.core.aaz import AAZStrArg, AAZStrArgFormat + + args_schema = super()._build_arguments_schema(*args, **kwargs) + args_schema.snapshot_name._required = False + args_schema.snapshot_name._registered = False + + args_schema.path = AAZStrArg( options=["--path"], arg_group="Parameters", help="Path to the downloaded file", required=False, ) - args_schema.name=AAZStrArg( + args_schema.name = AAZStrArg( options=["--name"], arg_group="Parameters", help="Name of the downloaded file without postfix", @@ -185,55 +232,59 @@ def _build_arguments_schema(cls, *args, **kwargs): ), ) return args_schema - - + def _output(self, *args, **kwargs): result = self.deserialize_output(self.ctx.vars.instance, client_flatten=True) - downloadType=str(self.ctx.args["downloadType"]) - path=self.ctx.args["path"] or os.getcwd() - fname=self.ctx.args["name"] or downloadType - fullpath=os.path.join(path,fname)+ (".pdf" if downloadType=="CompliancePdfReport" else ".csv") + downloadType = str(self.ctx.args["downloadType"]) + path = self.ctx.args["path"] or os.getcwd() + fname = self.ctx.args["name"] or downloadType + fullpath = os.path.join(path, fname) + ( + ".pdf" if downloadType == "CompliancePdfReport" else ".csv" + ) - if(downloadType=="CompliancePdfReport"): + if downloadType == "CompliancePdfReport": import urllib.request - urllib.request.urlretrieve(result["compliancePdfReport"]["sasUri"], fullpath) - elif(downloadType=="ComplianceReport"): - saveArrayAsCsv(result["complianceReport"],fullpath) - elif(downloadType=="ResourceList"): - saveArrayAsCsv(result["resourceList"],fullpath) + + urllib.request.urlretrieve( + result["compliancePdfReport"]["sasUri"], fullpath + ) + elif downloadType == "ComplianceReport": + saveArrayAsCsv(result["complianceReport"], fullpath) + elif downloadType == "ResourceList": + saveArrayAsCsv(result["resourceList"], fullpath) else: - raise NameError("Unsupported download type {}".format(downloadType)) - return "File downloaded at {0}".format(fullpath) - + raise NameError(f"Unsupported download type {downloadType}") + return f"File downloaded at {fullpath}" + def pre_operations(self): args = self.ctx.args - args.report_creator_tenant_id=args.tenant_id - cmd=GetControlAssessment(cli_ctx=self.cli_ctx) - result=cmd._handler({ - "cmd":cmd, - "report_name":args.report_name, - "compliance_status":"all" - }) - snapshot_name= result["snapshotName"] if result is not None else None - args.snapshot_name=snapshot_name + args.report_creator_tenant_id = args.tenant_id + cmd = GetControlAssessment(cli_ctx=self.cli_ctx) + result = cmd._handler( + {"cmd": cmd, "report_name": args.report_name, "compliance_status": "all"} + ) + snapshot_name = result["snapshotName"] if result is not None else None + args.snapshot_name = snapshot_name + class GetControlAssessment(_AcatListSnapshot): class GetControlAssessmentWithDupAadToken(_AcatListSnapshot.SnapshotList): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.GetControlAssessmentWithDupAadToken(ctx=self.ctx)() self.post_operations() - + @classmethod def _build_arguments_schema(cls, *args, **kwargs): - args_schema= super()._build_arguments_schema(*args, **kwargs) - - from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat + args_schema = super()._build_arguments_schema(*args, **kwargs) + + from azure.cli.core.aaz import AAZStrArg, AAZStrArgFormat + args_schema.report_creator_tenant_id._registered = False - + args_schema.tenant_id = AAZStrArg( options=["--tenant"], help="The tenant id of the report creator.", @@ -244,50 +295,63 @@ def _build_arguments_schema(cls, *args, **kwargs): args_schema.compliance_status = AAZStrArg( options=["--compliance-status"], help="Compliance status.", - enum={"failed": "Failed", "succeeded": "Passed", - "na": "Not Applicable"}, + enum={"failed": "Failed", "succeeded": "Passed", "na": "Not Applicable"}, default="all", ) return args_schema - + def pre_operations(self): args = self.ctx.args - args.top=1 - args.skip_token="0" - args.report_creator_tenant_id=args.tenant_id - + args.top = 1 + args.skip_token = "0" + args.report_creator_tenant_id = args.tenant_id + def _output(self, *args, **kwargs): snapshots = self.deserialize_output( - self.ctx.vars.instance.value, client_flatten=True) + self.ctx.vars.instance.value, client_flatten=True + ) if len(snapshots) == 0: return "No snapshot found" - latestSnapshot=snapshots[0] + latestSnapshot = snapshots[0] if self.ctx.args.compliance_status == "all": return latestSnapshot - return [{ - "categoryName": category["categoryName"], - "categoryStatus":category["categoryStatus"], - "controlFamilies":[{ - "controlFamilyName": controlFamily["controlFamilyName"], - "controlFamilyStatus":controlFamily["controlFamilyStatus"], - "controls":[control for control in controlFamily["controls"] if control["controlStatus"] == self.ctx.args.compliance_status]} - for controlFamily in category["controlFamilies"]]} - for category in latestSnapshot] + return [ + { + "categoryName": category["categoryName"], + "categoryStatus": category["categoryStatus"], + "controlFamilies": [ + { + "controlFamilyName": controlFamily["controlFamilyName"], + "controlFamilyStatus": controlFamily["controlFamilyStatus"], + "controls": [ + control + for control in controlFamily["controls"] + if control["controlStatus"] + == self.ctx.args.compliance_status + ], + } + for controlFamily in category["controlFamilies"] + ], + } + for category in latestSnapshot + ] + class ListAcatReportWebhook(_AcatListReportWebhook): class ListAcatReportWebhookWithDupAadToken(_AcatListReportWebhook.WebhookList): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.ListAcatReportWebhookWithDupAadToken(ctx=self.ctx)() self.post_operations() - + @classmethod def _build_arguments_schema(cls, *args, **kwargs): - args_schema= super()._build_arguments_schema(*args, **kwargs) - - from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat + args_schema = super()._build_arguments_schema(*args, **kwargs) + + from azure.cli.core.aaz import AAZStrArg, AAZStrArgFormat + args_schema.report_creator_tenant_id._registered = False args_schema.tenant_id = AAZStrArg( options=["--tenant"], @@ -297,52 +361,54 @@ def _build_arguments_schema(cls, *args, **kwargs): ), ) return args_schema - + def pre_operations(self): args = self.ctx.args - args.report_creator_tenant_id=args.tenant_id - - + args.report_creator_tenant_id = args.tenant_id + + class ShowAcatReportWebhook(_AcatShowReportWebhook): class ShowAcatReportWebhookWithDupAadToken(_AcatShowReportWebhook.WebhookGet): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.ShowAcatReportWebhookWithDupAadToken(ctx=self.ctx)() self.post_operations() - - + + class CreateAcatReportWebhook(_AcatCreateReportWebhook): - class CreateAcatReportWebhookWithDupAadToken(_AcatCreateReportWebhook.WebhookCreateOrUpdate): + class CreateAcatReportWebhookWithDupAadToken( + _AcatCreateReportWebhook.WebhookCreateOrUpdate + ): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.CreateAcatReportWebhookWithDupAadToken(ctx=self.ctx)() self.post_operations() - + @classmethod def _build_arguments_schema(cls, *args, **kwargs): args_schema = super()._build_arguments_schema(*args, **kwargs) - from azure.cli.core.aaz import AAZListArg, AAZStrArg,AAZStrArgFormat - args_schema.content_type._required=False - args_schema.content_type._registered=False - args_schema.enable_ssl_verification._required=False - args_schema.enable_ssl_verification._registered=False - args_schema.events._required=False - args_schema.events._registered=False - args_schema.payload_url._required=False - args_schema.payload_url._registered=False - args_schema.send_all_events._required=False - args_schema.send_all_events._registered=False - args_schema.status._required=False - args_schema.status._registered=False - args_schema.update_webhook_key._required=False - args_schema.update_webhook_key._registered=False - args_schema.webhook_key._required=False - args_schema.webhook_key._registered=False + from azure.cli.core.aaz import AAZListArg, AAZStrArg, AAZStrArgFormat + args_schema.content_type._required = False + args_schema.content_type._registered = False + args_schema.enable_ssl_verification._required = False + args_schema.enable_ssl_verification._registered = False + args_schema.events._required = False + args_schema.events._registered = False + args_schema.payload_url._required = False + args_schema.payload_url._registered = False + args_schema.send_all_events._required = False + args_schema.send_all_events._registered = False + args_schema.status._required = False + args_schema.status._registered = False + args_schema.update_webhook_key._required = False + args_schema.update_webhook_key._registered = False + args_schema.webhook_key._required = False + args_schema.webhook_key._registered = False args_schema.content_type_with_default = AAZStrArg( options=["--content-type"], @@ -350,23 +416,23 @@ def _build_arguments_schema(cls, *args, **kwargs): help="content type", required=False, enum={"application/json": "application/json"}, - default="application/json" + default="application/json", ) args_schema.enable_ssl_verification_with_default = AAZStrArg( options=["--enable-ssl"], arg_group="Properties", help="whether to enable ssl verification", enum={"false": "false", "true": "true"}, - default="true" + default="true", ) args_schema.events_with_default = AAZListArg( options=["--events"], arg_group="Properties", help="under which event notification should be sent.", required=False, - default=[] + default=[], ) - args_schema.events_with_default.Element=AAZStrArg() + args_schema.events_with_default.Element = AAZStrArg() args_schema.payload_url_required = AAZStrArg( options=["--payload-url"], arg_group="Properties", @@ -383,14 +449,14 @@ def _build_arguments_schema(cls, *args, **kwargs): default="all", enum={"all": "true", "customize": "false"}, ) - + args_schema.status_with_default = AAZStrArg( options=["--disable"], arg_group="Properties", help="Webhook status.", enum={"false": "enable", "true": "disable"}, default="enalbe", - blank="disable" + blank="disable", ) args_schema.webhook_key_with_default = AAZStrArg( options=["--secret"], @@ -403,54 +469,54 @@ def _build_arguments_schema(cls, *args, **kwargs): ), ) return args_schema - + def pre_operations(self): from azure.cli.core.aaz.utils import assign_aaz_list_arg + args = self.ctx.args - args.content_type=args.content_type_with_default - args.enable_ssl_verification=args.enable_ssl_verification_with_default - args.events=assign_aaz_list_arg( - args.events, - args.events_with_default - ) - args.payload_url=args.payload_url_required - args.webhook_key=args.webhook_key_with_default - args.status=args.status_with_default - args.send_all_events=args.trigger_mode - - wKey=args.webhook_key.__str__() - hasValidWebhookKey=wKey!="Undefined" and wKey.__len__()>0 - args.update_webhook_key="true" if hasValidWebhookKey else "false" + args.content_type = args.content_type_with_default + args.enable_ssl_verification = args.enable_ssl_verification_with_default + args.events = assign_aaz_list_arg(args.events, args.events_with_default) + args.payload_url = args.payload_url_required + args.webhook_key = args.webhook_key_with_default + args.status = args.status_with_default + args.send_all_events = args.trigger_mode + + wKey = args.webhook_key.__str__() + hasValidWebhookKey = wKey != "Undefined" and wKey.__len__() > 0 + args.update_webhook_key = "true" if hasValidWebhookKey else "false" class UpdateAcatReportWebhook(_AcatUpdateReportWebhook): - class UpdateAcatReportWebhookWithDupAadToken(_AcatUpdateReportWebhook.WebhookUpdate): + class UpdateAcatReportWebhookWithDupAadToken( + _AcatUpdateReportWebhook.WebhookUpdate + ): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.UpdateAcatReportWebhookWithDupAadToken(ctx=self.ctx)() self.post_operations() - + @classmethod def _build_arguments_schema(cls, *args, **kwargs): args_schema = super()._build_arguments_schema(*args, **kwargs) - from azure.cli.core.aaz import AAZListArg, AAZStrArg,AAZStrArgFormat - args_schema.send_all_events._required=False - args_schema.send_all_events._registered=False - args_schema.webhook_key._required=False - args_schema.webhook_key._registered=False - args_schema.enable_ssl_verification._required=False - args_schema.enable_ssl_verification._registered=False - args_schema.events._required=False - args_schema.events._registered=False - args_schema.payload_url._required=False - args_schema.payload_url._registered=False - args_schema.status._required=False - args_schema.status._registered=False - args_schema.update_webhook_key._required=False - args_schema.update_webhook_key._registered=False + from azure.cli.core.aaz import AAZListArg, AAZStrArg, AAZStrArgFormat + args_schema.send_all_events._required = False + args_schema.send_all_events._registered = False + args_schema.webhook_key._required = False + args_schema.webhook_key._registered = False + args_schema.enable_ssl_verification._required = False + args_schema.enable_ssl_verification._registered = False + args_schema.events._required = False + args_schema.events._registered = False + args_schema.payload_url._required = False + args_schema.payload_url._registered = False + args_schema.status._required = False + args_schema.status._registered = False + args_schema.update_webhook_key._required = False + args_schema.update_webhook_key._registered = False args_schema.trigger_mode = AAZStrArg( options=["--trigger-mode"], @@ -465,7 +531,7 @@ def _build_arguments_schema(cls, *args, **kwargs): help="whether to enable ssl verification", enum={"false": "false", "true": "true"}, nullable=True, - default="true" + default="true", ) args_schema.events_with_default = AAZListArg( options=["--events"], @@ -473,8 +539,14 @@ def _build_arguments_schema(cls, *args, **kwargs): help="under which event notification should be sent.", nullable=True, ) - args_schema.events_with_default.Element=AAZStrArg( - enum={"assessment_failure": "assessment_failure", "generate_snapshot_failed": "generate_snapshot_failed", "generate_snapshot_success": "generate_snapshot_success", "report_configuration_changes": "report_configuration_changes", "report_deletion": "report_deletion"}, + args_schema.events_with_default.Element = AAZStrArg( + enum={ + "assessment_failure": "assessment_failure", + "generate_snapshot_failed": "generate_snapshot_failed", + "generate_snapshot_success": "generate_snapshot_success", + "report_configuration_changes": "report_configuration_changes", + "report_deletion": "report_deletion", + }, ) args_schema.status_nullable = AAZStrArg( options=["--disable"], @@ -505,32 +577,30 @@ def _build_arguments_schema(cls, *args, **kwargs): ), ) return args_schema - + def pre_operations(self): from azure.cli.core.aaz.utils import assign_aaz_list_arg + args = self.ctx.args - args.enable_ssl_verification=args.enable_ssl_verification_with_default - args.events=assign_aaz_list_arg( - args.events, - args.events_with_default - ) - args.payload_url=args.payload_url_nullable - args.webhook_key=args.webhook_key_with_default - args.status=args.status_nullable - args.send_all_events=args.trigger_mode - - wKey=args.webhook_key.__str__() - hasValidWebhookKey=wKey!="Undefined" and wKey.__len__()>0 - args.update_webhook_key="true" if hasValidWebhookKey else "false" - - - + args.enable_ssl_verification = args.enable_ssl_verification_with_default + args.events = assign_aaz_list_arg(args.events, args.events_with_default) + args.payload_url = args.payload_url_nullable + args.webhook_key = args.webhook_key_with_default + args.status = args.status_nullable + args.send_all_events = args.trigger_mode + + wKey = args.webhook_key.__str__() + hasValidWebhookKey = wKey != "Undefined" and wKey.__len__() > 0 + args.update_webhook_key = "true" if hasValidWebhookKey else "false" + + class DeleteAcatReportWebhook(_AcatDeleteReportWebhook): - class DeleteAcatReportWebhookWithDupAadToken(_AcatDeleteReportWebhook.WebhookDelete): + class DeleteAcatReportWebhookWithDupAadToken( + _AcatDeleteReportWebhook.WebhookDelete + ): CLIENT_TYPE = "DupAadTokenClient" - + def _execute_operations(self): self.pre_operations() self.DeleteAcatReportWebhookWithDupAadToken(ctx=self.ctx)() self.post_operations() - diff --git a/src/acat/azext_acat/utils.py b/src/acat/azext_acat/utils.py index 7cb02c327e5..c5584240aec 100644 --- a/src/acat/azext_acat/utils.py +++ b/src/acat/azext_acat/utils.py @@ -1,25 +1,6 @@ -from azure.cli.core.aaz import register_client,AAZClientConfiguration -from azure.cli.core.aaz._client import AAZMgmtClient import datetime -@register_client("DupAadTokenClient") -class AAZDupAadTokenClient(AAZMgmtClient): - @classmethod - def _build_configuration(cls, ctx, credential, **kwargs): - from azure.cli.core.auth.util import resource_to_scopes - from azure.core.pipeline.policies import HeadersPolicy - - token="Bearer "+credential.get_token().token - headers_policy=HeadersPolicy(**kwargs) - headers_policy.add_header("x-ms-aad-user-token",token) - kwargs["headers_policy"]=headers_policy - - return AAZClientConfiguration( - credential=credential, - credential_scopes=resource_to_scopes(ctx.cli_ctx.cloud.endpoints.active_directory_resource_id), - **kwargs - ) - + def ParseSubsFromResources(resourceList): """Parse subscriptions from resources""" subs = set() @@ -28,19 +9,25 @@ def ParseSubsFromResources(resourceList): subs.add(sub) return list(subs) + def GetLocalTimeZone(): """Get local time zone""" return str(datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo) + def GetClosestFullHour(): """Get closest full hour""" now = datetime.datetime.now() - return str(now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)) + return str( + now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1) + ) -def saveArrayAsCsv(arr,fullpath): + +def saveArrayAsCsv(arr, fullpath): import csv - with open(fullpath,"w") as out: - csv_writer=csv.writer(out,lineterminator="\n") + + with open(fullpath, "w", encoding='utf8') as out: + csv_writer = csv.writer(out, lineterminator="\n") csv_writer.writerow(arr[0].keys()) for obj in arr: - csv_writer.writerow(obj.values()) \ No newline at end of file + csv_writer.writerow(obj.values())