diff --git a/backend/api/utils/network.py b/backend/api/utils/network.py new file mode 100644 index 000000000..3333dac8e --- /dev/null +++ b/backend/api/utils/network.py @@ -0,0 +1,82 @@ +import socket +import ipaddress +from urllib.parse import urlparse +from django.core.exceptions import ValidationError + + +BLOCKED_NETWORKS = [ + ipaddress.ip_network("100.64.0.0/10"), # Carrier Grade NAT (used by Tailscale, Alibaba Cloud metadata) + ipaddress.ip_network("192.0.0.0/24"), # IETF Protocol Assignments + ipaddress.ip_network("198.18.0.0/15"), # Network Benchmark + ipaddress.ip_network("169.254.0.0/16"), # Link-Local / Cloud Metadata + ipaddress.ip_network("192.0.0.192/32"), # Oracle Cloud Metadata + ipaddress.ip_network("127.0.0.0/8"), # Loopback + ipaddress.ip_network("0.0.0.0/8"), # Current network + ipaddress.ip_network("240.0.0.0/4"), # Reserved + ipaddress.ip_network("::1/128"), # IPv6 Loopback + ipaddress.ip_network("fe80::/10"), # IPv6 Link-Local + ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local Address + ipaddress.ip_network("224.0.0.0/4"), # Multicast (IPv4) + ipaddress.ip_network("ff00::/8"), # Multicast (IPv6) + ipaddress.ip_network("fec0::/10"), # IPv6 Site-Local (Deprecated) +] + +def is_ip_private(ip_str): + try: + ip = ipaddress.ip_address(ip_str) + + # Check standard private properties + if ( + ip.is_private + or ip.is_loopback + or ip.is_link_local + or ip.is_multicast + or ip.is_reserved + ): + return True + + # Check against additional blocked networks + for network in BLOCKED_NETWORKS: + if ip in network: + return True + + return False + except ValueError: + return False + +def validate_url_is_safe(url): + """ + Validates that a URL does not point to a private/internal IP address. + """ + if not url: + return + + # Add scheme if missing to allow urlparse to work correctly + if "://" not in url: + parse_url = f"https://{url}" + else: + parse_url = url + + try: + parsed = urlparse(parse_url) + hostname = parsed.hostname + if not hostname: + # If no hostname (e.g. just a string), treat it as hostname + hostname = url.split(":")[0].split("/")[0] + + # Resolve hostname + try: + ip_list = socket.gethostbyname_ex(hostname)[2] + except socket.gaierror: + raise ValidationError(f"Could not resolve hostname: {hostname}") + + for ip in ip_list: + if is_ip_private(ip): + raise ValidationError(f"URL resolves to a restricted IP address: {ip}") + + except Exception as e: + if isinstance(e, ValidationError): + raise + # Log the error if needed, but for now just raise validation error + raise ValidationError(f"Invalid URL or hostname: {str(e)}") + diff --git a/backend/api/utils/syncing/github/actions.py b/backend/api/utils/syncing/github/actions.py index 04057115c..00e621c67 100644 --- a/backend/api/utils/syncing/github/actions.py +++ b/backend/api/utils/syncing/github/actions.py @@ -8,6 +8,8 @@ import base64 import datetime from django.apps import apps +from django.conf import settings +from api.utils.network import validate_url_is_safe GITHUB_CLOUD_API_URL = "https://api.github.com" @@ -22,6 +24,9 @@ def normalize_api_host(api_host): if not api_host or api_host.strip() == "": api_host = GITHUB_CLOUD_API_URL + if settings.APP_HOST == "cloud": + validate_url_is_safe(api_host) + stripped_host = api_host.rstrip("/") if stripped_host == GITHUB_CLOUD_API_URL.rstrip("/"): @@ -43,7 +48,7 @@ def list_repos(credential_id): ) api_host = GITHUB_CLOUD_API_URL - if "host" in credential.credentials: + if "host_url" in credential.credentials: api_host = decrypt_asymmetric( credential.credentials["api_url"], sk.hex(), pk.hex() ) @@ -120,7 +125,7 @@ def list_environments(credential_id, owner, repo_name): ) api_host = GITHUB_CLOUD_API_URL - if "host" in credential.credentials: + if "host_url" in credential.credentials: api_host = decrypt_asymmetric( credential.credentials["api_url"], sk.hex(), pk.hex() ) diff --git a/backend/api/utils/syncing/gitlab/main.py b/backend/api/utils/syncing/gitlab/main.py index fa338dac4..33386ac3d 100644 --- a/backend/api/utils/syncing/gitlab/main.py +++ b/backend/api/utils/syncing/gitlab/main.py @@ -2,6 +2,8 @@ import re import urllib.parse import graphene +from django.conf import settings +from api.utils.network import validate_url_is_safe from api.utils.syncing.auth import get_credentials @@ -68,6 +70,9 @@ def get_gitlab_credentials(credential_id): host = credentials["gitlab_host"] token = credentials["gitlab_token"] + if settings.APP_HOST == "cloud": + validate_url_is_safe(host) + return host, token diff --git a/backend/api/utils/syncing/nomad/main.py b/backend/api/utils/syncing/nomad/main.py index e39868711..83b4ef82e 100644 --- a/backend/api/utils/syncing/nomad/main.py +++ b/backend/api/utils/syncing/nomad/main.py @@ -1,6 +1,8 @@ from api.utils.syncing.auth import get_credentials import requests import re +from django.conf import settings +from api.utils.network import validate_url_is_safe def get_nomad_token_info(credential_id): @@ -11,6 +13,9 @@ def get_nomad_token_info(credential_id): NOMAD_ADDR = credentials["nomad_addr"] NOMAD_TOKEN = credentials["nomad_token_secret"] + if settings.APP_HOST == "cloud": + validate_url_is_safe(NOMAD_ADDR) + session = requests.Session() session.headers.update( { @@ -59,6 +64,9 @@ def sync_nomad_secrets(secrets, credential_id, path, namespace="default"): NOMAD_ADDR = credentials["nomad_addr"] NOMAD_TOKEN = credentials["nomad_token_secret"] + if settings.APP_HOST == "cloud": + validate_url_is_safe(NOMAD_ADDR) + session = requests.Session() session.headers.update( { diff --git a/backend/api/utils/syncing/vault/main.py b/backend/api/utils/syncing/vault/main.py index dded9da1c..f12419c72 100644 --- a/backend/api/utils/syncing/vault/main.py +++ b/backend/api/utils/syncing/vault/main.py @@ -1,6 +1,8 @@ from api.utils.syncing.auth import get_credentials from hvac import Client as VaultClient, exceptions as hvac_exceptions import graphene +from django.conf import settings +from api.utils.network import validate_url_is_safe class VaultMountType(graphene.ObjectType): @@ -19,6 +21,9 @@ def authenticate_vault_client(credential_id): VAULT_SECRET_ID = credentials["vault_secret_id"] VAULT_NAMESPACE = credentials.get("vault_namespace", "") + if settings.APP_HOST == "cloud": + validate_url_is_safe(VAULT_ADDR) + client = VaultClient(url=VAULT_ADDR, namespace=VAULT_NAMESPACE) client.auth.approle.login(role_id=VAULT_ROLE_ID, secret_id=VAULT_SECRET_ID) return client diff --git a/backend/api/views/identities/aws/iam.py b/backend/api/views/identities/aws/iam.py index 2f7bb3bac..f29add821 100644 --- a/backend/api/views/identities/aws/iam.py +++ b/backend/api/views/identities/aws/iam.py @@ -115,6 +115,9 @@ def aws_iam_auth(request): if configured and "://" not in configured: configured = f"https://{configured}" + from api.utils.network import validate_url_is_safe + validate_url_is_safe(configured) + cfg_host = get_normalized_host(configured) header_host = (headers.get("Host") or headers.get("host") or "").lower() diff --git a/backend/backend/graphene/mutations/access.py b/backend/backend/graphene/mutations/access.py index 62176e60c..cefa43985 100644 --- a/backend/backend/graphene/mutations/access.py +++ b/backend/backend/graphene/mutations/access.py @@ -271,6 +271,10 @@ def mutate( # Store provider-specific configuration in a generic config field # Convert comma-separated trusted_principals to list for consistency trusted_list = [p.strip() for p in trusted_principals.split(",") if p.strip()] + + from api.utils.network import validate_url_is_safe + validate_url_is_safe(sts_endpoint) + config = { "trustedPrincipals": trusted_list, "signatureTtlSeconds": signature_ttl_seconds, @@ -353,6 +357,9 @@ def mutate( if signature_ttl_seconds is not None: config_updates["signatureTtlSeconds"] = signature_ttl_seconds if sts_endpoint is not None: + from api.utils.network import validate_url_is_safe + + validate_url_is_safe(sts_endpoint) config_updates["stsEndpoint"] = sts_endpoint if config_updates: diff --git a/backend/tests/api/utils/test_network.py b/backend/tests/api/utils/test_network.py new file mode 100644 index 000000000..cef4d181e --- /dev/null +++ b/backend/tests/api/utils/test_network.py @@ -0,0 +1,105 @@ +from unittest.mock import patch +import socket +import unittest +from django.core.exceptions import ValidationError +from api.utils.network import validate_url_is_safe + +class TestNetworkUtils(unittest.TestCase): + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_public_ip(self, mock_gethostbyname): + # Mock resolving "google.com" to a public IP + mock_gethostbyname.return_value = ('google.com', [], ['8.8.8.8']) + + # Should not raise exception + validate_url_is_safe("https://google.com") + validate_url_is_safe("google.com") + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_private_ip_rfc1918(self, mock_gethostbyname): + # Mock resolving "internal.corp" to a private IP (10.0.0.1) + mock_gethostbyname.return_value = ('internal.corp', [], ['10.0.0.1']) + + with self.assertRaises(ValidationError) as cm: + validate_url_is_safe("https://internal.corp") + self.assertIn("restricted IP address", str(cm.exception)) + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_localhost(self, mock_gethostbyname): + # Mock resolving "localhost" + mock_gethostbyname.return_value = ('localhost', [], ['127.0.0.1']) + + with self.assertRaises(ValidationError) as cm: + validate_url_is_safe("http://localhost:8000") + self.assertIn("restricted IP address", str(cm.exception)) + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_link_local_imds(self, mock_gethostbyname): + # Mock resolving a domain to AWS IMDS IP + mock_gethostbyname.return_value = ('metadata.aws', [], ['169.254.169.254']) + + with self.assertRaises(ValidationError) as cm: + validate_url_is_safe("http://metadata.aws") + self.assertIn("restricted IP address", str(cm.exception)) + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_carrier_grade_nat(self, mock_gethostbyname): + # Mock resolving to Carrier Grade NAT (100.64.0.0/10) + mock_gethostbyname.return_value = ('cgnat.isp', [], ['100.64.0.1']) + + with self.assertRaises(ValidationError) as cm: + validate_url_is_safe("http://cgnat.isp") + self.assertIn("restricted IP address", str(cm.exception)) + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_dns_rebinding_multiple_ips(self, mock_gethostbyname): + # Mock a domain resolving to BOTH a public IP and a private IP + # This simulates a potential DNS rebinding or multi-record setup + mock_gethostbyname.return_value = ('sketchy.com', [], ['8.8.8.8', '192.168.1.5']) + + with self.assertRaises(ValidationError) as cm: + validate_url_is_safe("https://ssrf-central-hax0r.com") + self.assertIn("restricted IP address", str(cm.exception)) + self.assertIn("192.168.1.5", str(cm.exception)) + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_invalid_hostname(self, mock_gethostbyname): + # Simulate DNS resolution failure + mock_gethostbyname.side_effect = socket.gaierror("Name or service not known") + + with self.assertRaises(ValidationError) as cm: + validate_url_is_safe("https://nonexistent.domain") + self.assertIn("Could not resolve hostname", str(cm.exception)) + + def test_validate_url_is_safe_direct_ip_input(self): + # Test providing IPs directly without DNS lookup + + # Private IP directly + with self.assertRaises(ValidationError): + validate_url_is_safe("http://192.168.1.1") + + # Loopback IP directly + with self.assertRaises(ValidationError): + validate_url_is_safe("http://127.0.0.1") + + # Link-local IP directly + with self.assertRaises(ValidationError): + validate_url_is_safe("http://169.254.169.254") + + @patch('socket.gethostbyname_ex') + def test_validate_url_is_safe_ipv6_ignored_by_gethostbyname_ex(self, mock_gethostbyname): + # NOTE: gethostbyname_ex only returns IPv4. + # If the utility is updated to handle IPv6, this test should be updated. + # For now, we verify standard behavior. + mock_gethostbyname.return_value = ('ipv4.only', [], ['1.1.1.1']) + validate_url_is_safe("https://ipv4.only") + + def test_validate_url_is_safe_ipv6_direct_input(self): + # Test blocking IPv6 addresses directly + with self.assertRaises(ValidationError): + validate_url_is_safe("http://[::1]") + + with self.assertRaises(ValidationError): + validate_url_is_safe("http://[fe80::1]") + + diff --git a/backend/tests/utils/syncing/test_github_actions.py b/backend/tests/utils/syncing/test_github_actions.py index 36332b9d7..7059b397a 100644 --- a/backend/tests/utils/syncing/test_github_actions.py +++ b/backend/tests/utils/syncing/test_github_actions.py @@ -6,9 +6,17 @@ list_repos, ) import base64 +import pytest from unittest.mock import patch, Mock, call +@pytest.fixture(autouse=True) +def mock_settings(): + with patch("api.utils.syncing.github.actions.settings") as mock_settings: + mock_settings.APP_HOST = "cloud" + yield mock_settings + + def get_mocked_response(url, *args, **kwargs): mock_response = Mock()