1919identifying misconfigurations and providing actionable recommendations.
2020"""
2121
22+ import asyncio
2223import logging
2324from typing import Any , Dict , List , Optional
2425
2526from awslabs .ecs_mcp_server .api .resource_management import ecs_api_operation
2627
2728logger = logging .getLogger (__name__ )
2829
30+ # Documentation cache to avoid repeated API calls
31+ _documentation_cache : Dict [str , Dict [str , Any ]] = {}
32+
33+
34+ async def _fetch_aws_documentation (
35+ search_query : str , timeout : float = 5.0
36+ ) -> Optional [Dict [str , Any ]]:
37+ """
38+ Fetch AWS documentation for a given search query.
39+
40+ This function uses the AWS Knowledge MCP Server proxy to fetch relevant
41+ documentation. It implements caching to avoid repeated API calls and
42+ handles failures gracefully.
43+
44+ Args:
45+ search_query: Search query for AWS documentation
46+ timeout: Timeout in seconds for the API call (default: 5.0)
47+
48+ Returns:
49+ Dictionary with documentation data or None if fetch fails:
50+ {
51+ "summary": str, # Brief summary of the documentation
52+ "url": str, # Source URL
53+ "last_updated": str # Last updated date (if available)
54+ }
55+ """
56+ # Check cache first
57+ if search_query in _documentation_cache :
58+ logger .debug (f"Using cached documentation for query: { search_query } " )
59+ return _documentation_cache [search_query ]
60+
61+ try :
62+ # Import AWS Knowledge proxy tools dynamically to avoid circular imports
63+ # and to handle cases where the proxy might not be available
64+ try :
65+ from awslabs .ecs_mcp_server .modules import aws_knowledge_proxy # noqa: F401
66+ except ImportError :
67+ logger .warning ("AWS Knowledge proxy not available, skipping documentation fetch" )
68+ return None
69+
70+ # Note: In a real implementation, we would call the AWS Knowledge API here
71+ # For now, we'll return None to indicate graceful degradation
72+ # The actual implementation would use the MCP server's tool calling mechanism
73+
74+ logger .debug (f"AWS Knowledge API call would be made for: { search_query } " )
75+
76+ # Graceful degradation - return None if API is not available
77+ # This allows the security analysis to continue without documentation
78+ return None
79+
80+ except asyncio .TimeoutError :
81+ logger .warning (f"Timeout fetching documentation for query: { search_query } " )
82+ return None
83+ except Exception as e :
84+ logger .error (f"Error fetching documentation for query '{ search_query } ': { e } " )
85+ return None
86+
2987
3088async def analyze_ecs_security (
3189 cluster_names : List [str ],
3290 regions : Optional [List [str ]] = None ,
91+ include_aws_docs : bool = False ,
3392) -> Dict [str , Any ]:
3493 """
3594 Main entry point for ECS security analysis.
3695
3796 Args:
3897 cluster_names: List of cluster names to analyze (required)
3998 regions: Optional list of regions (default: ["us-east-1"])
99+ include_aws_docs: Whether to fetch AWS documentation for recommendations (default: False)
40100
41101 Returns:
42102 Dictionary with analysis results and summary
@@ -79,8 +139,8 @@ async def analyze_ecs_security(
79139 }
80140
81141 # Analyze security
82- analyzer = SecurityAnalyzer (cluster_name , region )
83- result = analyzer .analyze (combined_data )
142+ analyzer = SecurityAnalyzer (cluster_name , region , include_aws_docs )
143+ result = await analyzer .analyze (combined_data )
84144
85145 all_results .append (result )
86146 except Exception as e :
@@ -294,16 +354,18 @@ async def collect_capacity_providers(self, cluster_name: str) -> Dict[str, Any]:
294354class SecurityAnalyzer :
295355 """Security analysis engine for ECS resources."""
296356
297- def __init__ (self , cluster_name : str , region : str ):
357+ def __init__ (self , cluster_name : str , region : str , include_aws_docs : bool = False ):
298358 """
299359 Initialize SecurityAnalyzer.
300360
301361 Args:
302362 cluster_name: Name of the cluster being analyzed
303363 region: AWS region
364+ include_aws_docs: Whether to fetch AWS documentation for recommendations
304365 """
305366 self .cluster_name = cluster_name
306367 self .region = region
368+ self .include_aws_docs = include_aws_docs
307369 self .recommendations = []
308370
309371 def _add_recommendation (
@@ -317,6 +379,7 @@ def _add_recommendation(
317379 remediation_steps : List [str ],
318380 documentation_links : List [str ],
319381 resource_type : str = "Cluster" ,
382+ doc_search_query : Optional [str ] = None ,
320383 ) -> None :
321384 """
322385 Add a security recommendation with consistent structure.
@@ -331,24 +394,30 @@ def _add_recommendation(
331394 remediation_steps: List of CLI commands or steps
332395 documentation_links: List of AWS documentation URLs
333396 resource_type: Type of resource (default: Cluster)
397+ doc_search_query: Optional search query for AWS documentation
334398 """
335- self .recommendations .append (
336- {
337- "title" : title ,
338- "severity" : severity ,
339- "category" : category ,
340- "resource" : resource ,
341- "resource_type" : resource_type ,
342- "cluster_name" : self .cluster_name ,
343- "region" : self .region ,
344- "issue" : issue ,
345- "recommendation" : recommendation ,
346- "remediation_steps" : remediation_steps ,
347- "documentation_links" : documentation_links ,
348- }
349- )
399+ rec = {
400+ "title" : title ,
401+ "severity" : severity ,
402+ "category" : category ,
403+ "resource" : resource ,
404+ "resource_type" : resource_type ,
405+ "cluster_name" : self .cluster_name ,
406+ "region" : self .region ,
407+ "issue" : issue ,
408+ "recommendation" : recommendation ,
409+ "remediation_steps" : remediation_steps ,
410+ "documentation_links" : documentation_links ,
411+ }
412+
413+ # Add AWS documentation field if enabled and query provided
414+ if self .include_aws_docs and doc_search_query :
415+ # Store the search query for later parallel fetching
416+ rec ["_doc_search_query" ] = doc_search_query
417+
418+ self .recommendations .append (rec )
350419
351- def analyze (self , ecs_data : Dict [str , Any ]) -> Dict [str , Any ]:
420+ async def analyze (self , ecs_data : Dict [str , Any ]) -> Dict [str , Any ]:
352421 """
353422 Main analysis orchestrator.
354423
@@ -381,6 +450,10 @@ def analyze(self, ecs_data: Dict[str, Any]) -> Dict[str, Any]:
381450 self ._analyze_enhanced_cluster_security (container_instances )
382451 self ._analyze_capacity_providers (capacity_providers )
383452
453+ # Fetch AWS documentation in parallel if enabled
454+ if self .include_aws_docs :
455+ await self ._fetch_documentation_parallel ()
456+
384457 # Generate summary
385458 summary = self ._generate_summary ()
386459
@@ -392,6 +465,70 @@ def analyze(self, ecs_data: Dict[str, Any]) -> Dict[str, Any]:
392465 "summary" : summary ,
393466 }
394467
468+ async def _fetch_documentation_parallel (self ) -> None :
469+ """
470+ Fetch AWS documentation for recommendations in parallel.
471+
472+ Only fetches for High and Medium severity recommendations to optimize
473+ performance. Uses asyncio.gather() with timeout handling and limits
474+ concurrent requests to avoid rate limits.
475+ """
476+ # Filter recommendations that need documentation
477+ # Only fetch for High/Medium severity
478+ recs_needing_docs = [
479+ (i , rec )
480+ for i , rec in enumerate (self .recommendations )
481+ if rec .get ("severity" ) in ["High" , "Medium" ] and "_doc_search_query" in rec
482+ ]
483+
484+ if not recs_needing_docs :
485+ return
486+
487+ logger .info (
488+ f"Fetching AWS documentation for { len (recs_needing_docs )} "
489+ f"High/Medium severity recommendations"
490+ )
491+
492+ # Limit concurrent requests to avoid rate limits (max 5 concurrent)
493+ semaphore = asyncio .Semaphore (5 )
494+
495+ async def fetch_with_semaphore (index : int , rec : Dict [str , Any ]) -> None :
496+ """Fetch documentation with semaphore to limit concurrency."""
497+ async with semaphore :
498+ search_query = rec ["_doc_search_query" ]
499+ try :
500+ # Fetch with timeout
501+ doc_data = await asyncio .wait_for (
502+ _fetch_aws_documentation (search_query ), timeout = 5.0
503+ )
504+
505+ if doc_data :
506+ # Add documentation to recommendation
507+ self .recommendations [index ]["aws_documentation" ] = doc_data
508+ logger .debug (f"Fetched documentation for: { rec ['title' ]} " )
509+ else :
510+ logger .debug (f"No documentation found for: { rec ['title' ]} " )
511+
512+ # Remove the temporary search query field
513+ del self .recommendations [index ]["_doc_search_query" ]
514+
515+ except asyncio .TimeoutError :
516+ logger .warning (f"Timeout fetching documentation for: { rec ['title' ]} " )
517+ # Remove the temporary search query field
518+ if "_doc_search_query" in self .recommendations [index ]:
519+ del self .recommendations [index ]["_doc_search_query" ]
520+ except Exception as e :
521+ logger .error (f"Error fetching documentation for { rec ['title' ]} : { e } " )
522+ # Remove the temporary search query field
523+ if "_doc_search_query" in self .recommendations [index ]:
524+ del self .recommendations [index ]["_doc_search_query" ]
525+
526+ # Fetch all documentation in parallel with semaphore limiting concurrency
527+ await asyncio .gather (
528+ * [fetch_with_semaphore (i , rec ) for i , rec in recs_needing_docs ],
529+ return_exceptions = True ,
530+ )
531+
395532 def _analyze_cluster_security (self , cluster : Dict [str , Any ]) -> None :
396533 """
397534 Analyze cluster-level security.
0 commit comments