From 94ff11cac7149fcdc457fd9a6beed68fccbadc5a Mon Sep 17 00:00:00 2001 From: huiquanjiang Date: Fri, 5 Jul 2024 15:56:49 +0800 Subject: [PATCH] refactored command group: acat report snapshot --- .../report/snapshot/_list.py | 4 +- src/acat/azext_acat/commands.py | 8 +- src/acat/azext_acat/custom.py | 142 +++++++++++++++++- src/acat/azext_acat/utils.py | 8 + 4 files changed, 159 insertions(+), 3 deletions(-) diff --git a/src/acat/azext_acat/aaz/latest/app_compliance_automation/report/snapshot/_list.py b/src/acat/azext_acat/aaz/latest/app_compliance_automation/report/snapshot/_list.py index eaad61a48ad..dcc02f92189 100644 --- a/src/acat/azext_acat/aaz/latest/app_compliance_automation/report/snapshot/_list.py +++ b/src/acat/azext_acat/aaz/latest/app_compliance_automation/report/snapshot/_list.py @@ -29,7 +29,9 @@ class List(AAZCommand): def _handler(self, command_args): super()._handler(command_args) - return self.build_paging(self._execute_operations, self._output) + # return self.build_paging(self._execute_operations, self._output) + self._execute_operations() + return self._output() _args_schema = None diff --git a/src/acat/azext_acat/commands.py b/src/acat/azext_acat/commands.py index 31570698b3f..01ecdbb5777 100644 --- a/src/acat/azext_acat/commands.py +++ b/src/acat/azext_acat/commands.py @@ -28,11 +28,17 @@ def load_command_table(self, _): # pylint: disable=unused-argument self.command_table["acat report create"]=CreateAcatReport(loader=self) from .custom import UpdateAcatReport - self.command_table["acat report create"]=CreateAcatReport(loader=self) + self.command_table["acat report create"]=UpdateAcatReport(loader=self) from .custom import DeleteAcatReport self.command_table["acat report delete"]=DeleteAcatReport(loader=self) + from .custom import GetControlAssessment + self.command_table["acat report get-control-assessment"]=GetControlAssessment(loader=self) + + from .custom import DownloadAcatReport + self.command_table["acat report download"]=DownloadAcatReport(loader=self) + with self.command_group('acat report webhook') as g: from .custom import ListAcatReportWebhook diff --git a/src/acat/azext_acat/custom.py b/src/acat/azext_acat/custom.py index 0b848b38805..62483fe41d6 100644 --- a/src/acat/azext_acat/custom.py +++ b/src/acat/azext_acat/custom.py @@ -15,6 +15,8 @@ from .aaz.latest.app_compliance_automation.report import Create as _AcatCreateReport from .aaz.latest.app_compliance_automation.report import Update as _AcatUpdateReport from .aaz.latest.app_compliance_automation.report import Delete as _AcatDeleteReport +from .aaz.latest.app_compliance_automation.report.snapshot import List as _AcatListSnapshot +from .aaz.latest.app_compliance_automation.report.snapshot import Download as _AcatDownloadSnapshot from .aaz.latest.app_compliance_automation.report.webhook import List as _AcatListReportWebhook from .aaz.latest.app_compliance_automation.report.webhook import Show as _AcatShowReportWebhook from .aaz.latest.app_compliance_automation.report.webhook import Create as _AcatCreateReportWebhook @@ -22,6 +24,8 @@ from .aaz.latest.app_compliance_automation.report.webhook import Delete as _AcatDeleteReportWebhook from azure.cli.core.commands import LongRunningOperation from .utils import * +import os + logger = get_logger(__name__) @@ -140,7 +144,135 @@ def _execute_operations(self): self.pre_operations() yield self.DeleteAcatReportWithDupAadToken(ctx=self.ctx)() self.post_operations() - + + +class DownloadAcatReport(_AcatDownloadSnapshot): + class DownloadAcatReportWithDupAadToken(_AcatDownloadSnapshot.SnapshotDownload): + CLIENT_TYPE = "DupAadTokenClient" + + def _execute_operations(self): + self.pre_operations() + if self.ctx.args.snapshot_name is None: + return "No snapshot found for report {}".format(self.ctx.args.report_name) + yield self.DownloadAcatReportWithDupAadToken(ctx=self.ctx)() + self.post_operations() + + @classmethod + def _build_arguments_schema(cls, *args, **kwargs): + from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat + args_schema= super()._build_arguments_schema(*args, **kwargs) + args_schema.snapshot_name._required=False + args_schema.snapshot_name._registered=False + + args_schema.path=AAZStrArg( + options=["--path"], + arg_group="Parameters", + help="Path to the downloaded file", + required=False, + ) + args_schema.name=AAZStrArg( + options=["--name"], + arg_group="Parameters", + help="Name of the downloaded file without postfix", + required=False, + ) + args_schema.report_creator_tenant_id._registered = False + args_schema.tenant_id = AAZStrArg( + options=["--tenant"], + help="The tenant id of the report creator.", + fmt=AAZStrArgFormat( + min_length=1, + ), + ) + return args_schema + + + def _output(self, *args, **kwargs): + result = self.deserialize_output(self.ctx.vars.instance, client_flatten=True) + + downloadType=str(self.ctx.args["downloadType"]) + path=self.ctx.args["path"] or os.getcwd() + fname=self.ctx.args["name"] or downloadType + fullpath=os.path.join(path,fname)+ (".pdf" if downloadType=="CompliancePdfReport" else ".csv") + + if(downloadType=="CompliancePdfReport"): + import urllib.request + urllib.request.urlretrieve(result["compliancePdfReport"]["sasUri"], fullpath) + elif(downloadType=="ComplianceReport"): + saveArrayAsCsv(result["complianceReport"],fullpath) + elif(downloadType=="ResourceList"): + saveArrayAsCsv(result["resourceList"],fullpath) + else: + raise NameError("Unsupported download type {}".format(downloadType)) + return "File downloaded at {0}".format(fullpath) + + def pre_operations(self): + args = self.ctx.args + args.report_creator_tenant_id=args.tenant_id + cmd=GetControlAssessment(cli_ctx=self.cli_ctx) + result=cmd._handler({ + "cmd":cmd, + "report_name":args.report_name, + "compliance_status":"all" + }) + snapshot_name= result["snapshotName"] if result is not None else None + args.snapshot_name=snapshot_name + +class GetControlAssessment(_AcatListSnapshot): + class GetControlAssessmentWithDupAadToken(_AcatListSnapshot.SnapshotList): + CLIENT_TYPE = "DupAadTokenClient" + + def _execute_operations(self): + self.pre_operations() + self.GetControlAssessmentWithDupAadToken(ctx=self.ctx)() + self.post_operations() + + @classmethod + def _build_arguments_schema(cls, *args, **kwargs): + args_schema= super()._build_arguments_schema(*args, **kwargs) + + from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat + args_schema.report_creator_tenant_id._registered = False + + args_schema.tenant_id = AAZStrArg( + options=["--tenant"], + help="The tenant id of the report creator.", + fmt=AAZStrArgFormat( + min_length=1, + ), + ) + args_schema.compliance_status = AAZStrArg( + options=["--compliance-status"], + help="Compliance status.", + enum={"failed": "Failed", "succeeded": "Passed", + "na": "Not Applicable"}, + default="all", + ) + return args_schema + + def pre_operations(self): + args = self.ctx.args + args.top=1 + args.skip_token="0" + args.report_creator_tenant_id=args.tenant_id + + def _output(self, *args, **kwargs): + snapshots = self.deserialize_output( + self.ctx.vars.instance.value, client_flatten=True) + if len(snapshots) == 0: + return "No snapshot found" + latestSnapshot=snapshots[0] #["complianceResults"][0]["categories"] + if self.ctx.args.compliance_status == "all": + return latestSnapshot + return [{ + "categoryName": category["categoryName"], + "categoryStatus":category["categoryStatus"], + "controlFamilies":[{ + "controlFamilyName": controlFamily["controlFamilyName"], + "controlFamilyStatus":controlFamily["controlFamilyStatus"], + "controls":[control for control in controlFamily["controls"] if control["controlStatus"] == self.ctx.args.compliance_status]} + for controlFamily in category["controlFamilies"]]} + for category in latestSnapshot] class ListAcatReportWebhook(_AcatListReportWebhook): class ListAcatReportWebhookWithDupAadToken(_AcatListReportWebhook.WebhookList): @@ -351,6 +483,14 @@ def _build_arguments_schema(cls, *args, **kwargs): nullable=True, enum={"Disabled": "Disabled", "Enabled": "Enabled"}, ) + # args_schema.status_with_default = AAZStrArg( + # options=["--disable"], + # arg_group="Properties", + # help="Webhook status.", + # enum={"false": "enable", "true": "disable"}, + # default="enalbe", + # blank="disable" + # ) args_schema.payload_url_nullable = AAZStrArg( options=["--payload-url"], arg_group="Properties", diff --git a/src/acat/azext_acat/utils.py b/src/acat/azext_acat/utils.py index 7b3e7ffd617..7cb02c327e5 100644 --- a/src/acat/azext_acat/utils.py +++ b/src/acat/azext_acat/utils.py @@ -36,3 +36,11 @@ def GetClosestFullHour(): """Get closest full hour""" now = datetime.datetime.now() return str(now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1)) + +def saveArrayAsCsv(arr,fullpath): + import csv + with open(fullpath,"w") as out: + csv_writer=csv.writer(out,lineterminator="\n") + csv_writer.writerow(arr[0].keys()) + for obj in arr: + csv_writer.writerow(obj.values()) \ No newline at end of file