Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,149 +110,143 @@
namespace = {}

# Import necessary modules directly in the namespace
# nosec B102 - These exec calls are necessary to import modules in the namespace
exec( # nosem: python.lang.security.audit.exec-detected.exec-detected
# nosem: python.lang.security.audit.exec-detected.exec-detected
exec( # nosec B102 - These exec calls are necessary to import modules in the namespace
'import os',
namespace,
)

Check warning

Code scanning / Semgrep OSS

Semgrep Finding: python.lang.security.audit.exec-detected.exec-detected Warning

Detected the use of exec(). exec() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources.
# nosec B102 - These exec calls are necessary to import modules in the namespace
exec( # nosem: python.lang.security.audit.exec-detected.exec-detected
exec( # nosec B102 - These exec calls are necessary to import modules in the namespace
'import diagrams', namespace
)

Check warning

Code scanning / Semgrep OSS

Semgrep Finding: python.lang.security.audit.exec-detected.exec-detected Warning

Detected the use of exec(). exec() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources.
# nosec B102 - These exec calls are necessary to import modules in the namespace
exec( # nosem: python.lang.security.audit.exec-detected.exec-detected
exec( # nosec B102 - These exec calls are necessary to import modules in the namespace
'from diagrams import Diagram, Cluster, Edge', namespace
) # nosem: python.lang.security.audit.exec-detected.exec-detected
# nosec B102 - These exec calls are necessary to import modules in the namespace
exec( # nosem: python.lang.security.audit.exec-detected.exec-detected
)
Comment on lines +120 to +122

Check warning

Code scanning / Semgrep OSS

Semgrep Finding: python.lang.security.audit.exec-detected.exec-detected Warning

Detected the use of exec(). exec() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources.
exec( # nosec B102 - These exec calls are necessary to import modules in the namespace
"""from diagrams.saas.crm import *
from diagrams.saas.identity import *
from diagrams.saas.chat import *
from diagrams.saas.recommendation import *
from diagrams.saas.cdn import *
from diagrams.saas.communication import *
from diagrams.saas.media import *
from diagrams.saas.logging import *
from diagrams.saas.security import *
from diagrams.saas.social import *
from diagrams.saas.alerting import *
from diagrams.saas.analytics import *
from diagrams.saas.automation import *
from diagrams.saas.filesharing import *
from diagrams.onprem.vcs import *
from diagrams.onprem.database import *
from diagrams.onprem.gitops import *
from diagrams.onprem.workflow import *
from diagrams.onprem.etl import *
from diagrams.onprem.inmemory import *
from diagrams.onprem.identity import *
from diagrams.onprem.network import *
from diagrams.onprem.proxmox import *
from diagrams.onprem.cd import *
from diagrams.onprem.container import *
from diagrams.onprem.certificates import *
from diagrams.onprem.mlops import *
from diagrams.onprem.dns import *
from diagrams.onprem.compute import *
from diagrams.onprem.logging import *
from diagrams.onprem.registry import *
from diagrams.onprem.security import *
from diagrams.onprem.client import *
from diagrams.onprem.groupware import *
from diagrams.onprem.iac import *
from diagrams.onprem.analytics import *
from diagrams.onprem.messaging import *
from diagrams.onprem.tracing import *
from diagrams.onprem.ci import *
from diagrams.onprem.search import *
from diagrams.onprem.storage import *
from diagrams.onprem.auth import *
from diagrams.onprem.monitoring import *
from diagrams.onprem.aggregator import *
from diagrams.onprem.queue import *
from diagrams.gis.database import *
from diagrams.gis.cli import *
from diagrams.gis.server import *
from diagrams.gis.python import *
from diagrams.gis.organization import *
from diagrams.gis.cplusplus import *
from diagrams.gis.mobile import *
from diagrams.gis.javascript import *
from diagrams.gis.desktop import *
from diagrams.gis.ogc import *
from diagrams.gis.java import *
from diagrams.gis.routing import *
from diagrams.gis.data import *
from diagrams.gis.geocoding import *
from diagrams.gis.format import *
from diagrams.elastic.saas import *
from diagrams.elastic.observability import *
from diagrams.elastic.elasticsearch import *
from diagrams.elastic.orchestration import *
from diagrams.elastic.security import *
from diagrams.elastic.beats import *
from diagrams.elastic.enterprisesearch import *
from diagrams.elastic.agent import *
from diagrams.programming.runtime import *
from diagrams.programming.framework import *
from diagrams.programming.flowchart import *
from diagrams.programming.language import *
from diagrams.gcp.storage import *
from diagrams.generic.database import *
from diagrams.generic.blank import *
from diagrams.generic.network import *
from diagrams.generic.virtualization import *
from diagrams.generic.place import *
from diagrams.generic.device import *
from diagrams.generic.compute import *
from diagrams.generic.os import *
from diagrams.generic.storage import *
from diagrams.k8s.others import *
from diagrams.k8s.rbac import *
from diagrams.k8s.network import *
from diagrams.k8s.ecosystem import *
from diagrams.k8s.compute import *
from diagrams.k8s.chaos import *
from diagrams.k8s.infra import *
from diagrams.k8s.podconfig import *
from diagrams.k8s.controlplane import *
from diagrams.k8s.clusterconfig import *
from diagrams.k8s.storage import *
from diagrams.k8s.group import *
from diagrams.aws.cost import *
from diagrams.aws.ar import *
from diagrams.aws.general import *
from diagrams.aws.database import *
from diagrams.aws.management import *
from diagrams.aws.ml import *
from diagrams.aws.game import *
from diagrams.aws.enablement import *
from diagrams.aws.network import *
from diagrams.aws.quantum import *
from diagrams.aws.iot import *
from diagrams.aws.robotics import *
from diagrams.aws.migration import *
from diagrams.aws.mobile import *
from diagrams.aws.compute import *
from diagrams.aws.media import *
from diagrams.aws.engagement import *
from diagrams.aws.security import *
from diagrams.aws.devtools import *
from diagrams.aws.integration import *
from diagrams.aws.business import *
from diagrams.aws.analytics import *
from diagrams.aws.blockchain import *
from diagrams.aws.storage import *
from diagrams.aws.satellite import *
from diagrams.aws.enduser import *
""",
namespace,
)

Check warning

Code scanning / Semgrep OSS

Semgrep Finding: python.lang.security.audit.exec-detected.exec-detected Warning

Detected the use of exec(). exec() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources.
# nosec B102 - These exec calls are necessary to import modules in the namespace
exec( # nosem: python.lang.security.audit.exec-detected.exec-detected
exec( # nosec B102 - These exec calls are necessary to import modules in the namespace
'from urllib.request import urlretrieve', namespace
) # nosem: python.lang.security.audit.exec-detected.exec-detected
)
Comment on lines +247 to +249

Check warning

Code scanning / Semgrep OSS

Semgrep Finding: python.lang.security.audit.exec-detected.exec-detected Warning

Detected the use of exec(). exec() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources.

# Process the code to ensure show=False and set the output path
if 'with Diagram(' in code:
Expand Down Expand Up @@ -301,8 +295,7 @@
signal.alarm(timeout)

# Execute the code
# nosec B102 - This exec is necessary to run user-provided diagram code in a controlled environment
exec(code, namespace) # nosem: python.lang.security.audit.exec-detected.exec-detected
exec(code, namespace) # nosec B102 - This exec is necessary to run user-provided diagram code in a controlled environment

Check warning

Code scanning / Semgrep OSS

Semgrep Finding: python.lang.security.audit.exec-detected.exec-detected Warning

Detected the use of exec(). exec() can be dangerous if used to evaluate dynamic content. If this content can be input from outside the program, this may be a code injection vulnerability. Ensure evaluated content is not definable by external sources.

# Cancel the alarm
signal.alarm(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
"""
# Validate asset ID
validate_asset_id(asset_id)
return f"""
query = ( # nosec B608 - safe: asset_id is validated, length limit, no direct execution
f"""

Check warning

Code scanning / Bandit

Possible SQL injection vector through string-based query construction. Warning

Possible SQL injection vector through string-based query construction.
You are an AWS IoT SiteWise expert helping to analyze and visualize asset hierarchies.
Please analyze the asset hierarchy starting from asset ID: {asset_id}
Expand Down Expand Up @@ -88,7 +89,8 @@
If you encounter any errors, explain what information is missing and \
suggest alternative approaches.
"""
""")
return query


# Create the prompt using from_function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
# Validate input strings for injections
validate_string_for_injection(exploration_goal)
validate_string_for_injection(time_range)
return f"""
query = ( # nosec B608 - safe: exploration_goal, time_range are validated
f"""

Check warning

Code scanning / Bandit

Possible SQL injection vector through string-based query construction. Warning

Possible SQL injection vector through string-based query construction.
You are an AWS IoT SiteWise data analytics expert helping to explore \
industrial IoT data using the executeQuery API with correct view schemas \
and
Expand Down Expand Up @@ -528,7 +529,8 @@
Use the `execute_query` tool with these correct view names and \
column names to perform sophisticated data exploration and \
analytics on your IoT SiteWise data.
"""
""")
return query


# Create the prompt using from_function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
request_params['MaxResults'] = max_results

# Use the paginate_aws_response utility for consistent pagination
all_coverages, pagination_metadata = await paginate_aws_response(
all_coverages, pagination_metadata = await paginate_aws_response( # nosec B105: paginate_aws_response is used for pagination

Check notice

Code scanning / Bandit

Possible hardcoded password: 'NextPageToken' Note

Possible hardcoded password: 'NextPageToken'
ctx=ctx,
operation_name='GetReservationCoverage',
api_function=ce_client.get_reservation_coverage,
Expand Down Expand Up @@ -314,7 +314,7 @@
request_params['MaxResults'] = max_results

# Use the paginate_aws_response utility for consistent pagination
all_utilizations, pagination_metadata = await paginate_aws_response(
all_utilizations, pagination_metadata = await paginate_aws_response( # nosec B105: paginate_aws_response is used for pagination

Check notice

Code scanning / Bandit

Possible hardcoded password: 'NextPageToken' Note

Possible hardcoded password: 'NextPageToken'
ctx=ctx,
operation_name='GetReservationUtilization',
api_function=ce_client.get_reservation_utilization,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
request_params['Filter'] = parse_json(filter_expr, 'filter')

# Use the paginate_aws_response utility for consistent pagination
all_coverages, pagination_metadata = await paginate_aws_response(
all_coverages, pagination_metadata = await paginate_aws_response( # nosec B105: paginate_aws_response is used for pagination

Check notice

Code scanning / Bandit

Possible hardcoded password: 'NextToken' Note

Possible hardcoded password: 'NextToken'
ctx=ctx,
operation_name='GetSavingsPlansCoverage',
api_function=ce_client.get_savings_plans_coverage,
Expand Down Expand Up @@ -234,7 +234,7 @@
request_params['Filter'] = parse_json(filter_expr, 'filter')

# Use the paginate_aws_response utility for consistent pagination
all_utilizations, pagination_metadata = await paginate_aws_response(
all_utilizations, pagination_metadata = await paginate_aws_response( # nosec B105: paginate_aws_response is used for pagination

Check notice

Code scanning / Bandit

Possible hardcoded password: 'NextToken' Note

Possible hardcoded password: 'NextToken'
ctx=ctx,
operation_name='GetSavingsPlansUtilization',
api_function=ce_client.get_savings_plans_utilization,
Expand Down Expand Up @@ -428,7 +428,7 @@
request_params['MaxResults'] = 20 # Default

# Use the paginate_aws_response utility for consistent pagination
all_details, pagination_metadata = await paginate_aws_response(
all_details, pagination_metadata = await paginate_aws_response( # nosec B105: paginate_aws_response is used for pagination

Check notice

Code scanning / Bandit

Possible hardcoded password: 'NextToken' Note

Possible hardcoded password: 'NextToken'
ctx=ctx,
operation_name='GetSavingsPlansUtilizationDetails',
api_function=ce_client.get_savings_plans_utilization_details,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def create_safe_sql_statement(
if statement_type.upper() == 'CREATE':
return f'CREATE TABLE {table_name} ({", ".join(args)})'
elif statement_type.upper() == 'SELECT':
base_sql = f'SELECT {", ".join(args)} FROM {table_name}'
base_sql = f'SELECT {", ".join(args)} FROM {table_name}' # nosec B608 - safe: table name, column names are validated, data uses proper parameter binding
if limit is not None and isinstance(limit, int) and limit > 0:
base_sql += f' LIMIT {limit}'
return base_sql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import datetime
import json
import os
import subprocess
import subprocess # nosec B404: subprocess is used for security scanning
import tempfile
import uuid
from awslabs.ccapi_mcp_server.errors import ClientError
Expand All @@ -40,7 +40,7 @@
"""
try:
# Check if Checkov is available
subprocess.run(
subprocess.run( # nosec B603: uses shell=False, inputs are validated, safe file operations, only subprocess calls

Check notice

Code scanning / Bandit

Starting a process with a partial executable path Note

Starting a process with a partial executable path
['checkov', '--version'],
capture_output=True,
text=True,
Expand Down Expand Up @@ -136,7 +136,7 @@
}

# Run checkov with shell=False for security
process = subprocess.run(cmd, capture_output=True, text=True, shell=False)
process = subprocess.run(cmd, capture_output=True, text=True, shell=False) # nosec B603: uses shell=False, inputs are validated, safe file operations, only subprocess calls

# Parse the output
if process.returncode == 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def search_eks_troubleshoot_guide(
API_ENDPOINT,
json={'question': query},
auth=AWSSigV4(AWS_SERVICE, region=AWS_REGION),
timeout=30,
)
response.raise_for_status()
return response.text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _get_token_client_credentials(self) -> Optional[str]:
logger.debug(f'Using scopes: {data["scope"]}')

logger.debug(f'Making token request to: {token_endpoint}')
response = requests.post(token_endpoint, headers=headers, data=data)
response = requests.post(token_endpoint, headers=headers, data=data, timeout=20)

if response.status_code != 200:
logger.error(f'Token request failed: {response.status_code} {response.text}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async def get_module_details(namespace: str, name: str, provider: str = 'aws') -
details_url = f'https://registry.terraform.io/v1/modules/{namespace}/{name}/{provider}'
logger.debug(f'Making API request to: {details_url}')

response = requests.get(details_url)
response = requests.get(details_url, timeout=30)
response.raise_for_status()

details = response.json()
Expand All @@ -77,7 +77,7 @@ async def get_module_details(namespace: str, name: str, provider: str = 'aws') -
versions_url = f'{details_url}/versions'
logger.debug(f'Making API request to get versions: {versions_url}')

versions_response = requests.get(versions_url)
versions_response = requests.get(versions_url, timeout=30)
logger.debug(f'Versions API response code: {versions_response.status_code}')

if versions_response.status_code == 200:
Expand Down Expand Up @@ -166,7 +166,7 @@ async def get_module_details(namespace: str, name: str, provider: str = 'aws') -
raw_readme_url = f'https://raw.githubusercontent.com/{owner}/{repo}/{branch}/README.md'
logger.debug(f'Trying to fetch README from: {raw_readme_url}')

readme_response = requests.get(raw_readme_url)
readme_response = requests.get(raw_readme_url, timeout=30)
if readme_response.status_code == 200:
readme_content = readme_response.text
found_readme_branch = branch
Expand Down Expand Up @@ -267,7 +267,7 @@ async def get_specific_module_info(module_info: Dict[str, str]) -> Optional[Modu
try:
# First, check if the module exists
details_url = f'https://registry.terraform.io/v1/modules/{namespace}/{name}/{provider}'
response = requests.get(details_url)
response = requests.get(details_url, timeout=30)

if response.status_code != 200:
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ async def get_module_details(

logger.debug(f'Making API request to: {details_url}')

response = requests.get(details_url)
response = requests.get(details_url, timeout=30)
response.raise_for_status()

details = response.json()
Expand Down Expand Up @@ -303,7 +303,7 @@ async def get_module_details(
raw_readme_url = f'https://raw.githubusercontent.com/{owner}/{repo}/{branch}/README.md'
logger.debug(f'Trying to fetch README from: {raw_readme_url}')

readme_response = requests.get(raw_readme_url)
readme_response = requests.get(raw_readme_url, timeout=30)
if readme_response.status_code == 200:
readme_content = readme_response.text
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def get_github_release_details(owner: str, repo: str) -> Dict[str, Any]:
logger.debug(f'Making request to GitHub releases API: {release_url}')

try:
response = requests.get(release_url)
response = requests.get(release_url, timeout=30)
logger.debug(f'GitHub releases API response code: {response.status_code}')

if response.status_code == 200:
Expand Down Expand Up @@ -102,7 +102,7 @@ async def get_github_release_details(owner: str, repo: str) -> Dict[str, Any]:
logger.debug(f'No releases found, trying tags: {tags_url}')

try:
response = requests.get(tags_url)
response = requests.get(tags_url, timeout=30)
logger.debug(f'GitHub tags API response code: {response.status_code}')

if response.status_code == 200 and response.json():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
def download_pdf(url):
"""Download PDF from URL and return as bytes."""
print(f'Downloading PDF from {url}...')
response = requests.get(url)
response = requests.get(url, timeout=30)
response.raise_for_status() # Raise an exception for HTTP errors
return response.content

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ async def check_access_analyzer(region: str, session: boto3.Session, ctx: Contex
analyzer_arn, analyzer_client, ctx
)

except Exception:
except Exception as e:
await ctx.warning(f"Error getting findings count for analyzer {analyzer_arn}: {e}")
findings_count = "Error"
else:
findings_count = "Unknown (No ARN)"
Expand Down Expand Up @@ -191,7 +192,8 @@ async def check_security_hub(region: str, session: boto3.Session, ctx: Context)
}
)
except Exception:
pass
# Skip processing this standard if there's an error
continue

return {
"enabled": True,
Expand Down
Loading