Skip to content

Commit

Permalink
refactored command group: acat report snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
HuiquanJiang-ms committed Jul 5, 2024
1 parent 8d80cc4 commit 94ff11c
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class List(AAZCommand):

def _handler(self, command_args):
super()._handler(command_args)
return self.build_paging(self._execute_operations, self._output)
# return self.build_paging(self._execute_operations, self._output)
self._execute_operations()
return self._output()

_args_schema = None

Expand Down
8 changes: 7 additions & 1 deletion src/acat/azext_acat/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ def load_command_table(self, _): # pylint: disable=unused-argument
self.command_table["acat report create"]=CreateAcatReport(loader=self)

from .custom import UpdateAcatReport
self.command_table["acat report create"]=CreateAcatReport(loader=self)
self.command_table["acat report create"]=UpdateAcatReport(loader=self)

from .custom import DeleteAcatReport
self.command_table["acat report delete"]=DeleteAcatReport(loader=self)

from .custom import GetControlAssessment
self.command_table["acat report get-control-assessment"]=GetControlAssessment(loader=self)

from .custom import DownloadAcatReport
self.command_table["acat report download"]=DownloadAcatReport(loader=self)

with self.command_group('acat report webhook') as g:

from .custom import ListAcatReportWebhook
Expand Down
142 changes: 141 additions & 1 deletion src/acat/azext_acat/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
from .aaz.latest.app_compliance_automation.report import Create as _AcatCreateReport
from .aaz.latest.app_compliance_automation.report import Update as _AcatUpdateReport
from .aaz.latest.app_compliance_automation.report import Delete as _AcatDeleteReport
from .aaz.latest.app_compliance_automation.report.snapshot import List as _AcatListSnapshot
from .aaz.latest.app_compliance_automation.report.snapshot import Download as _AcatDownloadSnapshot
from .aaz.latest.app_compliance_automation.report.webhook import List as _AcatListReportWebhook
from .aaz.latest.app_compliance_automation.report.webhook import Show as _AcatShowReportWebhook
from .aaz.latest.app_compliance_automation.report.webhook import Create as _AcatCreateReportWebhook
from .aaz.latest.app_compliance_automation.report.webhook import Update as _AcatUpdateReportWebhook
from .aaz.latest.app_compliance_automation.report.webhook import Delete as _AcatDeleteReportWebhook
from azure.cli.core.commands import LongRunningOperation
from .utils import *
import os

logger = get_logger(__name__)


Expand Down Expand Up @@ -140,7 +144,135 @@ def _execute_operations(self):
self.pre_operations()
yield self.DeleteAcatReportWithDupAadToken(ctx=self.ctx)()
self.post_operations()



class DownloadAcatReport(_AcatDownloadSnapshot):
class DownloadAcatReportWithDupAadToken(_AcatDownloadSnapshot.SnapshotDownload):
CLIENT_TYPE = "DupAadTokenClient"

def _execute_operations(self):
self.pre_operations()
if self.ctx.args.snapshot_name is None:
return "No snapshot found for report {}".format(self.ctx.args.report_name)
yield self.DownloadAcatReportWithDupAadToken(ctx=self.ctx)()
self.post_operations()

@classmethod
def _build_arguments_schema(cls, *args, **kwargs):
from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat
args_schema= super()._build_arguments_schema(*args, **kwargs)
args_schema.snapshot_name._required=False
args_schema.snapshot_name._registered=False

args_schema.path=AAZStrArg(
options=["--path"],
arg_group="Parameters",
help="Path to the downloaded file",
required=False,
)
args_schema.name=AAZStrArg(
options=["--name"],
arg_group="Parameters",
help="Name of the downloaded file without postfix",
required=False,
)
args_schema.report_creator_tenant_id._registered = False
args_schema.tenant_id = AAZStrArg(
options=["--tenant"],
help="The tenant id of the report creator.",
fmt=AAZStrArgFormat(
min_length=1,
),
)
return args_schema


def _output(self, *args, **kwargs):
result = self.deserialize_output(self.ctx.vars.instance, client_flatten=True)

downloadType=str(self.ctx.args["downloadType"])
path=self.ctx.args["path"] or os.getcwd()
fname=self.ctx.args["name"] or downloadType
fullpath=os.path.join(path,fname)+ (".pdf" if downloadType=="CompliancePdfReport" else ".csv")

if(downloadType=="CompliancePdfReport"):
import urllib.request
urllib.request.urlretrieve(result["compliancePdfReport"]["sasUri"], fullpath)
elif(downloadType=="ComplianceReport"):
saveArrayAsCsv(result["complianceReport"],fullpath)
elif(downloadType=="ResourceList"):
saveArrayAsCsv(result["resourceList"],fullpath)
else:
raise NameError("Unsupported download type {}".format(downloadType))
return "File downloaded at {0}".format(fullpath)

def pre_operations(self):
args = self.ctx.args
args.report_creator_tenant_id=args.tenant_id
cmd=GetControlAssessment(cli_ctx=self.cli_ctx)
result=cmd._handler({
"cmd":cmd,
"report_name":args.report_name,
"compliance_status":"all"
})
snapshot_name= result["snapshotName"] if result is not None else None
args.snapshot_name=snapshot_name

class GetControlAssessment(_AcatListSnapshot):
class GetControlAssessmentWithDupAadToken(_AcatListSnapshot.SnapshotList):
CLIENT_TYPE = "DupAadTokenClient"

def _execute_operations(self):
self.pre_operations()
self.GetControlAssessmentWithDupAadToken(ctx=self.ctx)()
self.post_operations()

@classmethod
def _build_arguments_schema(cls, *args, **kwargs):
args_schema= super()._build_arguments_schema(*args, **kwargs)

from azure.cli.core.aaz import AAZStrArg,AAZStrArgFormat
args_schema.report_creator_tenant_id._registered = False

args_schema.tenant_id = AAZStrArg(
options=["--tenant"],
help="The tenant id of the report creator.",
fmt=AAZStrArgFormat(
min_length=1,
),
)
args_schema.compliance_status = AAZStrArg(
options=["--compliance-status"],
help="Compliance status.",
enum={"failed": "Failed", "succeeded": "Passed",
"na": "Not Applicable"},
default="all",
)
return args_schema

def pre_operations(self):
args = self.ctx.args
args.top=1
args.skip_token="0"
args.report_creator_tenant_id=args.tenant_id

def _output(self, *args, **kwargs):
snapshots = self.deserialize_output(
self.ctx.vars.instance.value, client_flatten=True)
if len(snapshots) == 0:
return "No snapshot found"
latestSnapshot=snapshots[0] #["complianceResults"][0]["categories"]
if self.ctx.args.compliance_status == "all":
return latestSnapshot
return [{
"categoryName": category["categoryName"],
"categoryStatus":category["categoryStatus"],
"controlFamilies":[{
"controlFamilyName": controlFamily["controlFamilyName"],
"controlFamilyStatus":controlFamily["controlFamilyStatus"],
"controls":[control for control in controlFamily["controls"] if control["controlStatus"] == self.ctx.args.compliance_status]}
for controlFamily in category["controlFamilies"]]}
for category in latestSnapshot]

class ListAcatReportWebhook(_AcatListReportWebhook):
class ListAcatReportWebhookWithDupAadToken(_AcatListReportWebhook.WebhookList):
Expand Down Expand Up @@ -351,6 +483,14 @@ def _build_arguments_schema(cls, *args, **kwargs):
nullable=True,
enum={"Disabled": "Disabled", "Enabled": "Enabled"},
)
# args_schema.status_with_default = AAZStrArg(
# options=["--disable"],
# arg_group="Properties",
# help="Webhook status.",
# enum={"false": "enable", "true": "disable"},
# default="enalbe",
# blank="disable"
# )
args_schema.payload_url_nullable = AAZStrArg(
options=["--payload-url"],
arg_group="Properties",
Expand Down
8 changes: 8 additions & 0 deletions src/acat/azext_acat/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,11 @@ def GetClosestFullHour():
"""Get closest full hour"""
now = datetime.datetime.now()
return str(now.replace(minute=0, second=0, microsecond=0) + datetime.timedelta(hours=1))

def saveArrayAsCsv(arr,fullpath):
import csv
with open(fullpath,"w") as out:
csv_writer=csv.writer(out,lineterminator="\n")
csv_writer.writerow(arr[0].keys())
for obj in arr:
csv_writer.writerow(obj.values())

0 comments on commit 94ff11c

Please sign in to comment.