From 505d0f5a69f0e2de1c43ffb04c0f8d9b5f865a48 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Thu, 23 Oct 2025 17:53:58 -0500 Subject: [PATCH 01/11] fix: fixing exclude region issues --- .../_endpoint_discovery_retry_policy.py | 5 +- .../azure/cosmos/_global_endpoint_manager.py | 19 +-- .../azure/cosmos/_location_cache.py | 109 +++++++++------- .../cosmos/_service_request_retry_policy.py | 15 ++- .../aio/_global_endpoint_manager_async.py | 18 +-- .../azure-cosmos/tests/test_location_cache.py | 118 ++++++++++++++++++ .../test_service_request_retry_policy.py | 95 ++++++++++++++ ...test_service_request_retry_policy_async.py | 95 ++++++++++++++ 8 files changed, 407 insertions(+), 67 deletions(-) create mode 100644 sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py create mode 100644 sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py index aa7bc67d2137..f113efaafc42 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_endpoint_discovery_retry_policy.py @@ -71,15 +71,16 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument self.failover_retry_count += 1 if self.request.location_endpoint_to_route: + context = self.__class__.__name__ if _OperationType.IsReadOnlyOperation(self.request.operation_type): # Mark current read endpoint as unavailable self.global_endpoint_manager.mark_endpoint_unavailable_for_read( self.request.location_endpoint_to_route, - True) + True, context) else: self.global_endpoint_manager.mark_endpoint_unavailable_for_write( self.request.location_endpoint_to_route, - True) + True, context) # set the refresh_needed flag to ensure that endpoint list is # refreshed with new writable and readable locations diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py index a01fae347caa..5997c2d4965e 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py @@ -74,11 +74,11 @@ def _resolve_service_endpoint( ) -> str: return self.location_cache.resolve_service_endpoint(request) - def mark_endpoint_unavailable_for_read(self, endpoint, refresh_cache): - self.location_cache.mark_endpoint_unavailable_for_read(endpoint, refresh_cache) + def mark_endpoint_unavailable_for_read(self, endpoint, refresh_cache, context: str): + self.location_cache.mark_endpoint_unavailable_for_read(endpoint, refresh_cache, context) - def mark_endpoint_unavailable_for_write(self, endpoint, refresh_cache): - self.location_cache.mark_endpoint_unavailable_for_write(endpoint, refresh_cache) + def mark_endpoint_unavailable_for_write(self, endpoint, refresh_cache, context: str): + self.location_cache.mark_endpoint_unavailable_for_write(endpoint, refresh_cache, context) def get_ordered_write_locations(self): return self.location_cache.get_ordered_write_locations() @@ -96,14 +96,15 @@ def force_refresh_on_startup(self, database_account): def update_location_cache(self): self.location_cache.update_location_cache() - def _mark_endpoint_unavailable(self, endpoint: str): + def _mark_endpoint_unavailable(self, endpoint: str, context: str): """Marks an endpoint as unavailable for the appropriate operations. :param str endpoint: The endpoint to mark as unavailable. + :param str context: The context for marking the endpoint as unavailable. """ write_endpoints = self.location_cache.get_all_write_endpoints() - self.mark_endpoint_unavailable_for_read(endpoint, False) + self.mark_endpoint_unavailable_for_read(endpoint, False, context) if endpoint in write_endpoints: - self.mark_endpoint_unavailable_for_write(endpoint, False) + self.mark_endpoint_unavailable_for_write(endpoint, False, context) def refresh_endpoint_list(self, database_account, **kwargs): if current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms: @@ -159,7 +160,7 @@ def _GetDatabaseAccount(self, **kwargs) -> Tuple[DatabaseAccount, str]: self.location_cache.mark_endpoint_available(locational_endpoint) return database_account, locational_endpoint except (exceptions.CosmosHttpResponseError, AzureError): - self._mark_endpoint_unavailable(locational_endpoint) + self._mark_endpoint_unavailable(locational_endpoint, "_GetDatabaseAccount") raise def _endpoints_health_check(self, **kwargs): @@ -194,7 +195,7 @@ def _endpoints_health_check(self, **kwargs): success_count += 1 self.location_cache.mark_endpoint_available(endpoint) except (exceptions.CosmosHttpResponseError, AzureError): - self._mark_endpoint_unavailable(endpoint) + self._mark_endpoint_unavailable(endpoint, "_endpoints_health_check") finally: # after the health check for that endpoint setting the timeouts back to their original values diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 09ea16200afa..8b65fbeb72a0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -150,11 +150,11 @@ def get_write_regional_routing_context(self): def get_read_regional_routing_context(self): return self.get_read_regional_routing_contexts()[0].get_primary() - def mark_endpoint_unavailable_for_read(self, endpoint, refresh_cache): - self.mark_endpoint_unavailable(endpoint, EndpointOperationType.ReadType, refresh_cache) + def mark_endpoint_unavailable_for_read(self, endpoint, refresh_cache, context="Unknown"): + self.mark_endpoint_unavailable(endpoint, EndpointOperationType.ReadType, refresh_cache, context) - def mark_endpoint_unavailable_for_write(self, endpoint, refresh_cache): - self.mark_endpoint_unavailable(endpoint, EndpointOperationType.WriteType, refresh_cache) + def mark_endpoint_unavailable_for_write(self, endpoint, refresh_cache, context="Unknown"): + self.mark_endpoint_unavailable(endpoint, EndpointOperationType.WriteType, refresh_cache, context) def perform_on_database_account_read(self, database_account): self.update_location_cache( @@ -185,42 +185,60 @@ def _get_configured_excluded_locations(self, request: RequestObject) -> list[str excluded_locations = list(self.connection_policy.ExcludedLocations) else: excluded_locations = [] - for excluded_location in request.excluded_locations_circuit_breaker: - if excluded_location not in excluded_locations: - excluded_locations.append(excluded_location) + return excluded_locations def _get_applicable_read_regional_routing_contexts(self, request: RequestObject) -> list[RegionalRoutingContext]: - # Get configured excluded locations - excluded_locations = self._get_configured_excluded_locations(request) + return self._get_applicable_regional_routing_contexts( + request, + self.get_read_regional_routing_contexts(), + self.account_locations_by_read_endpoints, + self.get_write_regional_routing_contexts()[0] # Fallback to primary write region + ) - # If excluded locations were configured, return filtered regional endpoints by excluded locations. - if excluded_locations: - return _get_applicable_regional_routing_contexts( - self.get_read_regional_routing_contexts(), - self.account_locations_by_read_endpoints, - self.get_write_regional_routing_contexts()[0], - excluded_locations, - request.resource_type) + def _get_applicable_write_regional_routing_contexts(self, request: RequestObject) -> list[RegionalRoutingContext]: + return self._get_applicable_regional_routing_contexts( + request, + self.get_write_regional_routing_contexts(), + self.account_locations_by_write_endpoints, + self.default_regional_routing_context # Fallback to default global endpoint + ) - # Else, return all regional endpoints - return self.get_read_regional_routing_contexts() + def _get_applicable_regional_routing_contexts( + self, + request: RequestObject, + regional_routing_contexts: list[RegionalRoutingContext], + location_name_by_endpoint: Mapping[str, str], + fallback_regional_routing_context: RegionalRoutingContext + ) -> list[RegionalRoutingContext]: + user_excluded_locations = self._get_configured_excluded_locations(request) + circuit_breaker_excluded_locations = request.excluded_locations_circuit_breaker or [] + + if not user_excluded_locations and not circuit_breaker_excluded_locations: + return regional_routing_contexts + + applicable_contexts = [] + last_resort_contexts = [] + + for context in regional_routing_contexts: + location = location_name_by_endpoint.get(context.get_primary()) + if location in user_excluded_locations: + # For metadata calls, user-excluded locations are added at the end as a last resort. + if base.IsMasterResource(request.resource_type): + last_resort_contexts.append(context) + elif location in circuit_breaker_excluded_locations: + last_resort_contexts.append(context) + else: + applicable_contexts.append(context) - def _get_applicable_write_regional_routing_contexts(self, request: RequestObject) -> list[RegionalRoutingContext]: - # Get configured excluded locations - excluded_locations = self._get_configured_excluded_locations(request) + # Append circuit-breaker-excluded locations (and user-excluded for metadata) to be used as a last resort. + applicable_contexts.extend(last_resort_contexts) - # If excluded locations were configured, return filtered regional endpoints by excluded locations. - if excluded_locations: - return _get_applicable_regional_routing_contexts( - self.get_write_regional_routing_contexts(), - self.account_locations_by_write_endpoints, - self.default_regional_routing_context, - excluded_locations, - request.resource_type) + # If all preferred locations are excluded, use the fallback endpoint. + if not applicable_contexts: + applicable_contexts.append(fallback_regional_routing_context) - # Else, return all regional endpoints - return self.get_write_regional_routing_contexts() + return applicable_contexts def resolve_service_endpoint(self, request): if request.location_endpoint_to_route: @@ -238,14 +256,17 @@ def resolve_service_endpoint(self, request): # For non-document resource types in case of client can use multiple write locations # or when client cannot use multiple write locations, flip-flop between the # first and the second writable region in DatabaseAccount (for manual failover) - if self.connection_policy.EnableEndpointDiscovery and self.account_write_locations: - location_index = min(location_index % 2, len(self.account_write_locations) - 1) - write_location = self.account_write_locations[location_index] - if (self.account_write_regional_routing_contexts_by_location - and write_location in self.account_write_regional_routing_contexts_by_location): - write_regional_routing_context = ( - self.account_write_regional_routing_contexts_by_location)[write_location] - return write_regional_routing_context.get_primary() + if self.connection_policy.EnableEndpointDiscovery: + # Get the list of applicable write locations, which respects excluded locations. + applicable_contexts = self._get_applicable_write_regional_routing_contexts(request) + if not applicable_contexts: + # if all write locations are excluded, fall back to the default endpoint + return self.default_regional_routing_context.get_primary() + + # For single-master writes, flip-flop between the first and second *applicable* + # regions for manual failover. + index = min(location_index % 2, len(applicable_contexts) - 1) + return applicable_contexts[index].get_primary() # if endpoint discovery is off for reads it should use passed in endpoint return self.default_regional_routing_context.get_primary() @@ -317,10 +338,12 @@ def is_endpoint_unavailable(self, endpoint: str, expected_available_operation: s return True def mark_endpoint_unavailable( - self, unavailable_endpoint: str, unavailable_operation_type: EndpointOperationType, refresh_cache: bool): - logger.warning("Marking %s unavailable for %s ", + self, unavailable_endpoint: str, unavailable_operation_type: EndpointOperationType, refresh_cache: bool, + context: str): + logger.warning("Marking %s unavailable for %s. Source: %s", unavailable_endpoint, - unavailable_operation_type) + unavailable_operation_type, + context) unavailability_info = ( self.location_unavailability_info_by_endpoint[unavailable_endpoint] if unavailable_endpoint in self.location_unavailability_info_by_endpoint diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py index 5f8f3ae9ff72..07aa2fd63549 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_service_request_retry_policy.py @@ -20,11 +20,17 @@ def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper, self.failover_retry_count = 0 self.connection_policy = connection_policy self.request = args[0] if args else None + if self.request: if _OperationType.IsReadOnlyOperation(self.request.operation_type): - self.total_retries = len(self.global_endpoint_manager.location_cache.read_regional_routing_contexts) + self.total_retries = len( + self.global_endpoint_manager.location_cache._get_applicable_read_regional_routing_contexts( + self.request)) else: - self.total_retries = len(self.global_endpoint_manager.location_cache.write_regional_routing_contexts) + self.total_retries = len( + self.global_endpoint_manager.location_cache._get_applicable_write_regional_routing_contexts( + self.request)) + def ShouldRetry(self): # pylint: disable=too-many-return-statements """Returns true if the request should retry based on preferred regions and retries already done. @@ -72,10 +78,11 @@ def resolve_next_region_service_endpoint(self): return self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper) def mark_endpoint_unavailable(self, unavailable_endpoint): + context = self.__class__.__name__ if _OperationType.IsReadOnlyOperation(self.request.operation_type): - self.global_endpoint_manager.mark_endpoint_unavailable_for_read(unavailable_endpoint, True) + self.global_endpoint_manager.mark_endpoint_unavailable_for_read(unavailable_endpoint, True, context) else: - self.global_endpoint_manager.mark_endpoint_unavailable_for_write(unavailable_endpoint, True) + self.global_endpoint_manager.mark_endpoint_unavailable_for_write(unavailable_endpoint, True, context) def update_location_cache(self): self.global_endpoint_manager.update_location_cache() diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py index ebe3f150d4d8..db907841bac3 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py @@ -77,11 +77,11 @@ def _resolve_service_endpoint( ) -> str: return self.location_cache.resolve_service_endpoint(request) - def mark_endpoint_unavailable_for_read(self, endpoint, refresh_cache): - self.location_cache.mark_endpoint_unavailable_for_read(endpoint, refresh_cache) + def mark_endpoint_unavailable_for_read(self, endpoint, refresh_cache, context: str): + self.location_cache.mark_endpoint_unavailable_for_read(endpoint, refresh_cache, context) - def mark_endpoint_unavailable_for_write(self, endpoint, refresh_cache): - self.location_cache.mark_endpoint_unavailable_for_write(endpoint, refresh_cache) + def mark_endpoint_unavailable_for_write(self, endpoint, refresh_cache, context: str): + self.location_cache.mark_endpoint_unavailable_for_write(endpoint, refresh_cache, context) def get_ordered_write_locations(self): return self.location_cache.get_ordered_write_locations() @@ -100,14 +100,14 @@ async def force_refresh_on_startup(self, database_account): def update_location_cache(self): self.location_cache.update_location_cache() - def _mark_endpoint_unavailable(self, endpoint: str): + def _mark_endpoint_unavailable(self, endpoint: str, context: str): """Marks an endpoint as unavailable for the appropriate operations. :param str endpoint: The endpoint to mark as unavailable. """ write_endpoints = self.location_cache.get_all_write_endpoints() - self.mark_endpoint_unavailable_for_read(endpoint, False) + self.mark_endpoint_unavailable_for_read(endpoint, False, context) if endpoint in write_endpoints: - self.mark_endpoint_unavailable_for_write(endpoint, False) + self.mark_endpoint_unavailable_for_write(endpoint, False, context) async def refresh_endpoint_list(self, database_account, **kwargs): if self.refresh_task and self.refresh_task.done(): @@ -151,7 +151,7 @@ async def _database_account_check(self, endpoint: str, **kwargs: dict[str, Any]) await self.client._GetDatabaseAccountCheck(endpoint, **kwargs) self.location_cache.mark_endpoint_available(endpoint) except (exceptions.CosmosHttpResponseError, AzureError): - self._mark_endpoint_unavailable(endpoint) + self._mark_endpoint_unavailable(endpoint,"_database_account_check") async def _endpoints_health_check(self, **kwargs): """Gets the database account for each endpoint. @@ -200,7 +200,7 @@ async def _GetDatabaseAccount(self, **kwargs) -> Tuple[DatabaseAccount, str]: self.location_cache.mark_endpoint_available(locational_endpoint) return database_account, locational_endpoint except (exceptions.CosmosHttpResponseError, AzureError): - self._mark_endpoint_unavailable(locational_endpoint) + self._mark_endpoint_unavailable(locational_endpoint,"_GetDatabaseAccount") raise async def _GetDatabaseAccountStub(self, endpoint, **kwargs): diff --git a/sdk/cosmos/azure-cosmos/tests/test_location_cache.py b/sdk/cosmos/azure-cosmos/tests/test_location_cache.py index 684b63375637..6c88ea126f8d 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_location_cache.py +++ b/sdk/cosmos/azure-cosmos/tests/test_location_cache.py @@ -7,6 +7,7 @@ import pytest from azure.cosmos import documents +from azure.cosmos._service_request_retry_policy import ServiceRequestRetryPolicy from azure.cosmos.documents import DatabaseAccount, _OperationType from azure.cosmos.http_constants import ResourceType @@ -355,5 +356,122 @@ def test_resolve_endpoint_unavailable_and_excluded_on_request(self): # The fallback for write is the default_endpoint. assert write_doc_resolved == default_endpoint + def test_resolve_endpoint_respects_excluded_regions_when_use_preferred_locations_is_false(self): + # This test demonstrates the bug where setting use_preferred_locations to False + # causes the excluded_locations list to be ignored. + + # 1. Setup: LocationCache with multiple locations enabled. + lc = refresh_location_cache(preferred_locations=[], use_multiple_write_locations=True) + db_acc = create_database_account(enable_multiple_writable_locations=True) + lc.perform_on_database_account_read(db_acc) + + # 2. Create a write request. + write_request = RequestObject(ResourceType.Document, _OperationType.Create, None) + + # 3. Set use_preferred_locations to False and exclude the first write location. + write_request.use_preferred_locations = False + write_request.excluded_locations = [location1_name] + + # 4. Resolve the endpoint. + # With the fix, the excluded_locations list is respected. + # It should resolve to the next available write location, which is location2. + resolved_endpoint = lc.resolve_service_endpoint(write_request) + + # 5. Assert the correct behavior for the write request. + assert resolved_endpoint == location2_endpoint + + # 6. Repeat for a read request. + read_request = RequestObject(ResourceType.Document, _OperationType.Read, None) + read_request.use_preferred_locations = False + read_request.excluded_locations = [location1_name] + + # It should resolve to the next available read location, which is location2. + resolved_endpoint = lc.resolve_service_endpoint(read_request) + + # Assert the correct behavior. + assert resolved_endpoint == location2_endpoint + + def test_regional_fallback_when_primary_is_excluded(self): + # This test simulates a scenario where the primary preferred region is excluded + # by the user, and the secondary is excluded by the circuit breaker. + # The expected behavior is to fall back to the circuit-breaker-excluded region + # as a last resort, instead of the global endpoint. + + # 1. Setup: LocationCache with two preferred write locations. + preferred_locations = [location1_name, location2_name] + lc = refresh_location_cache(preferred_locations, use_multiple_write_locations=True) + db_acc = create_database_account(enable_multiple_writable_locations=True) + lc.perform_on_database_account_read(db_acc) + + # 2. Create a write request. + write_request = RequestObject(ResourceType.Document, _OperationType.Create, None) + + # 3. Exclude the primary region by user and the secondary by circuit breaker. + write_request.excluded_locations = [location1_name] + write_request.excluded_locations_circuit_breaker = [location2_name] + + # 4. Resolve the endpoint. + # With the fix, the user-excluded location is filtered out, and the + # circuit-breaker-excluded location is moved to the end of the list. + # Since it's the only one left, it should be selected. + resolved_endpoint = lc.resolve_service_endpoint(write_request) + + # 5. Assert that the resolved endpoint is the circuit-breaker-excluded one, not the global default. + assert resolved_endpoint == location2_endpoint + + def test_write_fallback_to_global_after_regional_retries_exhausted(self): + # This test simulates the client pipeline falling back to the global endpoint for writes + # after all preferred regional endpoints have been tried and failed. + + # 1. Setup: LocationCache with two preferred write locations. + preferred_locations = [location1_name, location2_name] + lc = refresh_location_cache(preferred_locations, use_multiple_write_locations=True) + db_acc = create_database_account(enable_multiple_writable_locations=True) + lc.perform_on_database_account_read(db_acc) + + # Mock the GlobalEndpointManager to use our LocationCache and forward calls. + mock_gem = unittest.mock.Mock() + mock_gem.location_cache = lc + # Simulate resolving to the next preferred location on the first retry. + mock_gem.resolve_service_endpoint_for_partition.side_effect = [location2_endpoint] + mock_gem.mark_endpoint_unavailable_for_write = lc.mark_endpoint_unavailable_for_write + + # Mock ConnectionPolicy and pk_range_wrapper + mock_connection_policy = unittest.mock.Mock() + mock_connection_policy.EnableEndpointDiscovery = True + mock_pk_range_wrapper = unittest.mock.Mock() + + # 2. Initial Request: The client resolves the first endpoint. + write_request = RequestObject(ResourceType.Document, _OperationType.Create, None) + resolved_endpoint = lc.resolve_service_endpoint(write_request) + assert resolved_endpoint == location1_endpoint + + # 3. First Failure and Retry: The request to location1 fails. The retry policy is invoked. + write_request.location_endpoint_to_route = location1_endpoint # Simulate request was sent here + retry_policy = ServiceRequestRetryPolicy(mock_connection_policy, mock_gem, mock_pk_range_wrapper, write_request) + + # The policy should decide to retry and route to the next endpoint (location2). + should_retry = retry_policy.ShouldRetry() + assert should_retry is True + assert write_request.location_endpoint_to_route == location2_endpoint + assert lc.is_endpoint_unavailable(location1_endpoint, "Write") is True + + # 4. Second Failure and Exhaustion: The request to location2 also fails. + should_retry_again = retry_policy.ShouldRetry() + + # The policy has now exhausted all regional retries and should return False. + assert should_retry_again is False + assert lc.is_endpoint_unavailable(location2_endpoint, "Write") is True + + # 5. Fallback to Global: After the retry policy gives up, the client clears the regional + # routing preference to make a final attempt at the global endpoint. + write_request.clear_route_to_location() + assert write_request.use_preferred_locations is None + + # A final call to resolve the endpoint with the modified request will cause the + # LocationCache to return the default global endpoint. + final_endpoint = lc.resolve_service_endpoint(write_request) + assert final_endpoint == default_endpoint + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py new file mode 100644 index 000000000000..cc6cd7611a46 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +from azure.cosmos import DatabaseProxy +import unittest +import uuid +import pytest +from azure.core.exceptions import ServiceRequestError +import azure.cosmos.cosmos_client as cosmos_client +import test_config +from azure.cosmos import PartitionKey +from _fault_injection_transport import FaultInjectionTransport +from azure.cosmos.documents import _OperationType, ConnectionPolicy + + +@pytest.mark.cosmosSplit +class TestServiceRequestRetryPolicies(unittest.TestCase): + """Test cases for the read_items API.""" + + created_db: DatabaseProxy = None + client: cosmos_client.CosmosClient = None + host = test_config.TestConfig.host + masterKey = test_config.TestConfig.masterKey + configs = test_config.TestConfig + TEST_DATABASE_ID = configs.TEST_DATABASE_ID + + @classmethod + def setUpClass(cls): + cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey) + cls.database = cls.client.get_database_client(cls.TEST_DATABASE_ID) + + + def test_write_failover_to_global_with_service_request_error(self): + container = self.database.create_container("service_request_mrr_test_" + str(uuid.uuid4()), + PartitionKey(path="/id")) + + # 1. Get write regions and ensure there are atleast 2 for this test. + endpoint_manager = self.client.client_connection._global_endpoint_manager + db_account = self.client.get_database_account() + endpoint_manager.refresh_endpoint_list(db_account) + + write_locations_map = endpoint_manager.location_cache.account_locations_by_write_endpoints + + if len(write_locations_map) < 2: + pytest.skip("This test requires an account with 2 or more write regions.") + + write_endpoints = list(write_locations_map.keys()) + write_locations = list(write_locations_map.values()) + + account_name = self.host.split(".")[0].split("//")[1] + region_to_fail_slug = write_endpoints[1].split('/')[2].replace(f"{account_name}-", "").split('.')[0] + + region_to_exclude = write_locations[0] # Use the display name for ExcludedLocations + + # 2. Set up a client with one region excluded and a fault injection transport. + policy = ConnectionPolicy() # Create a new policy object for this test + policy.EnableEndpointDiscovery = True + policy.UseMultipleWriteLocations = True + policy.ExcludedLocations = [region_to_exclude] + fault_injection_transport = FaultInjectionTransport() + + client_with_faults = cosmos_client.CosmosClient( + self.host, + self.masterKey, + connection_policy=policy, + transport=fault_injection_transport, + + ) + container_with_faults = client_with_faults.get_database_client(self.database.id).get_container_client( + container.id) + + # 3. Configure fault injection to fail requests to the second write region with a ServiceRequestError. + error_to_inject = ServiceRequestError(message="Simulated Service Request Error") + + def predicate(request): + # Fail if it's a create item operation and the host matches the region we want to fail. + is_create = FaultInjectionTransport.predicate_is_operation_type(request, _OperationType.Create) + is_target_region = region_to_fail_slug in request.url + return is_create and is_target_region + + def fault_action(_): + raise error_to_inject + + fault_injection_transport.add_fault(predicate, fault_action) + + # 4. Execute a write operation. It should fail with ServiceRequestError as no regions are available. + with self.assertRaises(ServiceRequestError) as context: + container_with_faults.create_item(body={'id': 'failover_test_id', 'pk': 'pk_value'}) + + self.assertIn("Simulated Service Request Error", str(context.exception)) + + +if __name__ == "__main__": + unittest.main() + diff --git a/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py new file mode 100644 index 000000000000..c5038428ea8a --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. +from azure.cosmos import DatabaseProxy +import unittest +import uuid +import pytest +from azure.core.exceptions import ServiceRequestError +from azure.cosmos.aio import CosmosClient, DatabaseProxy +import test_config +from azure.cosmos import PartitionKey +from _fault_injection_transport import FaultInjectionTransport +from azure.cosmos.documents import _OperationType, ConnectionPolicy + + +@pytest.mark.cosmosSplit +class TestServiceRequestRetryPoliciesAsync(unittest.TestCase): + """Test cases for the read_items API.""" + + created_db: DatabaseProxy = None + client: CosmosClient = None + host = test_config.TestConfig.host + masterKey = test_config.TestConfig.masterKey + configs = test_config.TestConfig + TEST_DATABASE_ID = configs.TEST_DATABASE_ID + + async def asyncSetUp(self): + self.client = CosmosClient(self.host, self.masterKey) + self.database = self.client.get_database_client(self.TEST_DATABASE_ID) + self.container = await self.database.create_container("service_request_mrr_test_" + str(uuid.uuid4()), + PartitionKey(path="/id")) + + async def asyncTearDown(self): + await self.client.close() + + async def test_write_failover_to_global_with_service_request_error_async(self): + # 1. Get write regions and ensure there are at least 2 for this test. + endpoint_manager = self.client.client_connection._global_endpoint_manager + db_account = await self.client._get_database_account() + endpoint_manager.refresh_endpoint_list(db_account) + + write_locations_map = endpoint_manager.location_cache.account_locations_by_write_endpoints + + if len(write_locations_map) < 2: + pytest.skip("This test requires an account with 2 or more write regions.") + + write_endpoints = list(write_locations_map.keys()) + write_locations = list(write_locations_map.values()) + + account_name = self.host.split(".")[0].split("//")[1] + region_to_fail_slug = write_endpoints[1].split('/')[2].replace(f"{account_name}-", "").split('.')[0] + + region_to_exclude = write_locations[0] # Use the display name for ExcludedLocations + + # 2. Set up a client with one region excluded and a fault injection transport. + policy = ConnectionPolicy() # Create a new policy object for this test + policy.EnableEndpointDiscovery = True + policy.UseMultipleWriteLocations = True + policy.ExcludedLocations = [region_to_exclude] + fault_injection_transport = FaultInjectionTransport() + + async with CosmosClient( + self.host, + self.masterKey, + connection_policy=policy, + transport=fault_injection_transport, + + ) as client_with_faults: + container_with_faults = client_with_faults.get_database_client(self.database.id).get_container_client( + self.container.id) + + # 3. Configure fault injection to fail requests to the second write region with a ServiceRequestError. + error_to_inject = ServiceRequestError(message="Simulated Service Request Error") + + def predicate(request): + # Fail if it's a create item operation and the host matches the region we want to fail. + is_create = FaultInjectionTransport.predicate_is_operation_type(request, _OperationType.Create) + is_target_region = region_to_fail_slug in request.url + return is_create and is_target_region + + def fault_action(_): + raise error_to_inject + + fault_injection_transport.add_fault(predicate, fault_action) + + # 4. Execute a write operation. It should fail with ServiceRequestError as no regions are available. + with self.assertRaises(ServiceRequestError) as context: + await container_with_faults.create_item(body={'id': 'failover_test_id', 'pk': 'pk_value'}) + + self.assertIn("Simulated Service Request Error", str(context.exception)) + + +if __name__ == "__main__": + unittest.main() + From 4c21d28b3c92ea426a337c14cf82e9b87bebb405 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Thu, 23 Oct 2025 18:40:04 -0500 Subject: [PATCH 02/11] updated CHANGELOG.md --- sdk/cosmos/azure-cosmos/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 32108d9d59ad..0cabcd9345b6 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -7,8 +7,10 @@ #### Breaking Changes #### Bugs Fixed +* Fixed bug where customer provided excluded region was not always being honored during certain transient failures. See [PR 43602](https://github.com/Azure/azure-sdk-for-python/pull/43602) #### Other Changes +* Enhanced logging to ensure when a region is marked unavailable we have the proper context. See [PR 43602](https://github.com/Azure/azure-sdk-for-python/pull/43602) ### 4.14.0 (2025-10-13) This version and all future versions will require Python 3.9+. From 59dda34fe71f75a9a16623d4072ab30078fe3472 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Fri, 24 Oct 2025 15:23:20 -0500 Subject: [PATCH 03/11] fix: fixing bug in the implementation --- .../azure/cosmos/_location_cache.py | 128 +++++++++--------- .../azure-cosmos/tests/test_location_cache.py | 1 + 2 files changed, 62 insertions(+), 67 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 8b65fbeb72a0..aa8808c85e4d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -90,26 +90,39 @@ def _get_applicable_regional_routing_contexts(regional_routing_contexts: list[Re location_name_by_endpoint: Mapping[str, str], fall_back_regional_routing_context: RegionalRoutingContext, exclude_location_list: list[str], + circuit_breaker_exclude_list: list[str], resource_type: str) -> list[RegionalRoutingContext]: # filter endpoints by excluded locations applicable_regional_routing_contexts = [] - excluded_regional_routing_contexts = [] + user_excluded_regional_routing_contexts = [] for regional_routing_context in regional_routing_contexts: if location_name_by_endpoint.get(regional_routing_context.get_primary()) not in exclude_location_list: applicable_regional_routing_contexts.append(regional_routing_context) else: - excluded_regional_routing_contexts.append(regional_routing_context) + user_excluded_regional_routing_contexts.append(regional_routing_context) + + # Now, filter by circuit breaker exclusions, moving them to the end of the list + final_applicable_contexts = [] + circuit_breaker_excluded_contexts = [] + for regional_routing_context in applicable_regional_routing_contexts: + if location_name_by_endpoint.get(regional_routing_context.get_primary()) in circuit_breaker_exclude_list: + circuit_breaker_excluded_contexts.append(regional_routing_context) + else: + final_applicable_contexts.append(regional_routing_context) + + # Add circuit breaker excluded locations as a last resort + final_applicable_contexts.extend(circuit_breaker_excluded_contexts) # Preserves the excluded locations at the end of the list, because for the metadata API calls, excluded locations # are not preferred, but all endpoints must be used. if base.IsMasterResource(resource_type): - applicable_regional_routing_contexts.extend(excluded_regional_routing_contexts) + applicable_regional_routing_contexts.extend(user_excluded_regional_routing_contexts) # If all preferred locations are excluded, use the fallback endpoint. - if not applicable_regional_routing_contexts: - applicable_regional_routing_contexts.append(fall_back_regional_routing_context) + if not final_applicable_contexts: + final_applicable_contexts.append(fall_back_regional_routing_context) - return applicable_regional_routing_contexts + return final_applicable_contexts class LocationCache(object): # pylint: disable=too-many-public-methods,too-many-instance-attributes @@ -185,60 +198,44 @@ def _get_configured_excluded_locations(self, request: RequestObject) -> list[str excluded_locations = list(self.connection_policy.ExcludedLocations) else: excluded_locations = [] - + #for excluded_location in request.excluded_locations_circuit_breaker: + #if excluded_location not in excluded_locations: + #excluded_locations.append(excluded_location) return excluded_locations def _get_applicable_read_regional_routing_contexts(self, request: RequestObject) -> list[RegionalRoutingContext]: - return self._get_applicable_regional_routing_contexts( - request, - self.get_read_regional_routing_contexts(), - self.account_locations_by_read_endpoints, - self.get_write_regional_routing_contexts()[0] # Fallback to primary write region - ) + # Get configured excluded locations + excluded_locations = self._get_configured_excluded_locations(request) + + # If excluded locations were configured, return filtered regional endpoints by excluded locations. + if excluded_locations or request.excluded_locations_circuit_breaker: + return _get_applicable_regional_routing_contexts( + self.get_read_regional_routing_contexts(), + self.account_locations_by_read_endpoints, + self.get_write_regional_routing_contexts()[0], + excluded_locations, + request.excluded_locations_circuit_breaker, + request.resource_type) + + # Else, return all regional endpoints + return self.get_read_regional_routing_contexts() def _get_applicable_write_regional_routing_contexts(self, request: RequestObject) -> list[RegionalRoutingContext]: - return self._get_applicable_regional_routing_contexts( - request, - self.get_write_regional_routing_contexts(), - self.account_locations_by_write_endpoints, - self.default_regional_routing_context # Fallback to default global endpoint - ) - - def _get_applicable_regional_routing_contexts( - self, - request: RequestObject, - regional_routing_contexts: list[RegionalRoutingContext], - location_name_by_endpoint: Mapping[str, str], - fallback_regional_routing_context: RegionalRoutingContext - ) -> list[RegionalRoutingContext]: - user_excluded_locations = self._get_configured_excluded_locations(request) - circuit_breaker_excluded_locations = request.excluded_locations_circuit_breaker or [] - - if not user_excluded_locations and not circuit_breaker_excluded_locations: - return regional_routing_contexts - - applicable_contexts = [] - last_resort_contexts = [] - - for context in regional_routing_contexts: - location = location_name_by_endpoint.get(context.get_primary()) - if location in user_excluded_locations: - # For metadata calls, user-excluded locations are added at the end as a last resort. - if base.IsMasterResource(request.resource_type): - last_resort_contexts.append(context) - elif location in circuit_breaker_excluded_locations: - last_resort_contexts.append(context) - else: - applicable_contexts.append(context) - - # Append circuit-breaker-excluded locations (and user-excluded for metadata) to be used as a last resort. - applicable_contexts.extend(last_resort_contexts) - - # If all preferred locations are excluded, use the fallback endpoint. - if not applicable_contexts: - applicable_contexts.append(fallback_regional_routing_context) - - return applicable_contexts + # Get configured excluded locations + excluded_locations = self._get_configured_excluded_locations(request) + + # If excluded locations were configured, return filtered regional endpoints by excluded locations. + if excluded_locations or request.excluded_locations_circuit_breaker: + return _get_applicable_regional_routing_contexts( + self.get_write_regional_routing_contexts(), + self.account_locations_by_write_endpoints, + self.default_regional_routing_context, + excluded_locations, + request.excluded_locations_circuit_breaker, + request.resource_type) + + # Else, return all regional endpoints + return self.get_write_regional_routing_contexts() def resolve_service_endpoint(self, request): if request.location_endpoint_to_route: @@ -256,17 +253,14 @@ def resolve_service_endpoint(self, request): # For non-document resource types in case of client can use multiple write locations # or when client cannot use multiple write locations, flip-flop between the # first and the second writable region in DatabaseAccount (for manual failover) - if self.connection_policy.EnableEndpointDiscovery: - # Get the list of applicable write locations, which respects excluded locations. - applicable_contexts = self._get_applicable_write_regional_routing_contexts(request) - if not applicable_contexts: - # if all write locations are excluded, fall back to the default endpoint - return self.default_regional_routing_context.get_primary() - - # For single-master writes, flip-flop between the first and second *applicable* - # regions for manual failover. - index = min(location_index % 2, len(applicable_contexts) - 1) - return applicable_contexts[index].get_primary() + if self.connection_policy.EnableEndpointDiscovery and self.account_write_locations: + location_index = min(location_index % 2, len(self.account_write_locations) - 1) + write_location = self.account_write_locations[location_index] + if (self.account_write_regional_routing_contexts_by_location + and write_location in self.account_write_regional_routing_contexts_by_location): + write_regional_routing_context = ( + self.account_write_regional_routing_contexts_by_location)[write_location] + return write_regional_routing_context.get_primary() # if endpoint discovery is off for reads it should use passed in endpoint return self.default_regional_routing_context.get_primary() @@ -504,4 +498,4 @@ def GetLocationalEndpoint(default_endpoint, location_name): ) return locational_endpoint - return None + return None \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/tests/test_location_cache.py b/sdk/cosmos/azure-cosmos/tests/test_location_cache.py index 6c88ea126f8d..f80f8648def8 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_location_cache.py +++ b/sdk/cosmos/azure-cosmos/tests/test_location_cache.py @@ -3,6 +3,7 @@ import time import unittest +import unittest.mock from typing import Mapping, Any import pytest From a2a2e4266f43b33fdfa2004099a89a430b99766e Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Fri, 24 Oct 2025 17:14:56 -0500 Subject: [PATCH 04/11] fix: refactoring --- .../azure/cosmos/_location_cache.py | 51 ++++++++++++------- .../azure-cosmos/tests/test_location_cache.py | 18 +++---- .../test_service_request_retry_policy.py | 4 +- ...test_service_request_retry_policy_async.py | 4 +- 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index aa8808c85e4d..9cc8eda7775a 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -245,28 +245,43 @@ def resolve_service_endpoint(self, request): use_preferred_locations = ( request.use_preferred_locations if request.use_preferred_locations is not None else True ) - - if not use_preferred_locations or ( - documents._OperationType.IsWriteOperation(request.operation_type) - and not self.can_use_multiple_write_locations_for_request(request) - ): - # For non-document resource types in case of client can use multiple write locations - # or when client cannot use multiple write locations, flip-flop between the - # first and the second writable region in DatabaseAccount (for manual failover) - if self.connection_policy.EnableEndpointDiscovery and self.account_write_locations: - location_index = min(location_index % 2, len(self.account_write_locations) - 1) - write_location = self.account_write_locations[location_index] - if (self.account_write_regional_routing_contexts_by_location - and write_location in self.account_write_regional_routing_contexts_by_location): - write_regional_routing_context = ( - self.account_write_regional_routing_contexts_by_location)[write_location] - return write_regional_routing_context.get_primary() - # if endpoint discovery is off for reads it should use passed in endpoint + is_write = documents._OperationType.IsWriteOperation(request.operation_type) + + # For write operations on a single-write-location account, we MUST use the primary write region, + # regardless of any other preference. + if is_write and not self.can_use_multiple_write_locations_for_request(request): + primary_write_location = self.account_write_locations[0] + return self.account_write_regional_routing_contexts_by_location[primary_write_location].get_primary() + + # This block handles any operation where the request explicitly disables preferred locations. + if not use_preferred_locations: + # When not using preferred locations, we use the full list of account locations, + # respecting their original order, while filtering out excluded locations for this request. + all_contexts_by_loc = (self.account_write_regional_routing_contexts_by_location if is_write + else self.account_read_regional_routing_contexts_by_location) + ordered_locations = self.account_write_locations if is_write else self.account_read_locations + + excluded_locations = self._get_configured_excluded_locations(request) + circuit_breaker_excluded_locations = request.excluded_locations_circuit_breaker or [] + + applicable_contexts = [] + for loc_name in ordered_locations: + if (loc_name not in excluded_locations + and loc_name not in circuit_breaker_excluded_locations + and loc_name in all_contexts_by_loc): + applicable_contexts.append(all_contexts_by_loc[loc_name]) + + if self.connection_policy.EnableEndpointDiscovery and applicable_contexts: + effective_index = location_index % len(applicable_contexts) + return applicable_contexts[effective_index].get_primary() + + # If no applicable regional endpoints are found, or discovery is off, use the global default. return self.default_regional_routing_context.get_primary() + # This is the default path for multi-region accounts using preferred locations. regional_routing_contexts = ( self._get_applicable_write_regional_routing_contexts(request) - if documents._OperationType.IsWriteOperation(request.operation_type) + if is_write else self._get_applicable_read_regional_routing_contexts(request) ) regional_routing_context = regional_routing_contexts[location_index % len(regional_routing_contexts)] diff --git a/sdk/cosmos/azure-cosmos/tests/test_location_cache.py b/sdk/cosmos/azure-cosmos/tests/test_location_cache.py index f80f8648def8..090717519222 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_location_cache.py +++ b/sdk/cosmos/azure-cosmos/tests/test_location_cache.py @@ -358,8 +358,6 @@ def test_resolve_endpoint_unavailable_and_excluded_on_request(self): assert write_doc_resolved == default_endpoint def test_resolve_endpoint_respects_excluded_regions_when_use_preferred_locations_is_false(self): - # This test demonstrates the bug where setting use_preferred_locations to False - # causes the excluded_locations list to be ignored. # 1. Setup: LocationCache with multiple locations enabled. lc = refresh_location_cache(preferred_locations=[], use_multiple_write_locations=True) @@ -412,8 +410,8 @@ def test_regional_fallback_when_primary_is_excluded(self): write_request.excluded_locations_circuit_breaker = [location2_name] # 4. Resolve the endpoint. - # With the fix, the user-excluded location is filtered out, and the - # circuit-breaker-excluded location is moved to the end of the list. + # the user-excluded location should be filtered out, and the + # circuit-breaker-excluded location moved to the end of the list. # Since it's the only one left, it should be selected. resolved_endpoint = lc.resolve_service_endpoint(write_request) @@ -421,8 +419,8 @@ def test_regional_fallback_when_primary_is_excluded(self): assert resolved_endpoint == location2_endpoint def test_write_fallback_to_global_after_regional_retries_exhausted(self): - # This test simulates the client pipeline falling back to the global endpoint for writes - # after all preferred regional endpoints have been tried and failed. + # This test simulates the client pipeline retrying preferred locations for writes + # after all of them have been tried and marked as unavailable. # 1. Setup: LocationCache with two preferred write locations. preferred_locations = [location1_name, location2_name] @@ -467,12 +465,12 @@ def test_write_fallback_to_global_after_regional_retries_exhausted(self): # 5. Fallback to Global: After the retry policy gives up, the client clears the regional # routing preference to make a final attempt at the global endpoint. write_request.clear_route_to_location() - assert write_request.use_preferred_locations is None + write_request.use_preferred_locations = False - # A final call to resolve the endpoint with the modified request will cause the - # LocationCache to return the default global endpoint. + # A final call to resolve the endpoint should now return the first preferred location, + # even though it's marked as unavailable, as a last resort. final_endpoint = lc.resolve_service_endpoint(write_request) - assert final_endpoint == default_endpoint + assert final_endpoint == location1_endpoint if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py index cc6cd7611a46..da600a065075 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py +++ b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy.py @@ -13,7 +13,7 @@ from azure.cosmos.documents import _OperationType, ConnectionPolicy -@pytest.mark.cosmosSplit +@pytest.mark.cosmosMultiRegion class TestServiceRequestRetryPolicies(unittest.TestCase): """Test cases for the read_items API.""" @@ -34,7 +34,7 @@ def test_write_failover_to_global_with_service_request_error(self): container = self.database.create_container("service_request_mrr_test_" + str(uuid.uuid4()), PartitionKey(path="/id")) - # 1. Get write regions and ensure there are atleast 2 for this test. + # 1. Get write regions and ensure there are at least 2 for this test. endpoint_manager = self.client.client_connection._global_endpoint_manager db_account = self.client.get_database_account() endpoint_manager.refresh_endpoint_list(db_account) diff --git a/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py index c5038428ea8a..516619ea9ba4 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_service_request_retry_policy_async.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- # The MIT License (MIT) # Copyright (c) Microsoft Corporation. All rights reserved. -from azure.cosmos import DatabaseProxy + import unittest import uuid import pytest @@ -13,7 +13,7 @@ from azure.cosmos.documents import _OperationType, ConnectionPolicy -@pytest.mark.cosmosSplit +@pytest.mark.cosmosMultiRegion class TestServiceRequestRetryPoliciesAsync(unittest.TestCase): """Test cases for the read_items API.""" From 6d8d22aba1d8cd2e82b9c3401d3ca4ada1972fb6 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Sat, 25 Oct 2025 12:43:23 -0500 Subject: [PATCH 05/11] fix: fixing tests --- .../azure/cosmos/_location_cache.py | 53 +++++++------------ 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 9cc8eda7775a..2df50d473ccc 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -116,7 +116,7 @@ def _get_applicable_regional_routing_contexts(regional_routing_contexts: list[Re # Preserves the excluded locations at the end of the list, because for the metadata API calls, excluded locations # are not preferred, but all endpoints must be used. if base.IsMasterResource(resource_type): - applicable_regional_routing_contexts.extend(user_excluded_regional_routing_contexts) + final_applicable_contexts.extend(user_excluded_regional_routing_contexts) # If all preferred locations are excluded, use the fallback endpoint. if not final_applicable_contexts: @@ -245,43 +245,28 @@ def resolve_service_endpoint(self, request): use_preferred_locations = ( request.use_preferred_locations if request.use_preferred_locations is not None else True ) - is_write = documents._OperationType.IsWriteOperation(request.operation_type) - - # For write operations on a single-write-location account, we MUST use the primary write region, - # regardless of any other preference. - if is_write and not self.can_use_multiple_write_locations_for_request(request): - primary_write_location = self.account_write_locations[0] - return self.account_write_regional_routing_contexts_by_location[primary_write_location].get_primary() - - # This block handles any operation where the request explicitly disables preferred locations. - if not use_preferred_locations: - # When not using preferred locations, we use the full list of account locations, - # respecting their original order, while filtering out excluded locations for this request. - all_contexts_by_loc = (self.account_write_regional_routing_contexts_by_location if is_write - else self.account_read_regional_routing_contexts_by_location) - ordered_locations = self.account_write_locations if is_write else self.account_read_locations - - excluded_locations = self._get_configured_excluded_locations(request) - circuit_breaker_excluded_locations = request.excluded_locations_circuit_breaker or [] - - applicable_contexts = [] - for loc_name in ordered_locations: - if (loc_name not in excluded_locations - and loc_name not in circuit_breaker_excluded_locations - and loc_name in all_contexts_by_loc): - applicable_contexts.append(all_contexts_by_loc[loc_name]) - - if self.connection_policy.EnableEndpointDiscovery and applicable_contexts: - effective_index = location_index % len(applicable_contexts) - return applicable_contexts[effective_index].get_primary() - - # If no applicable regional endpoints are found, or discovery is off, use the global default. + + if not use_preferred_locations or ( + documents._OperationType.IsWriteOperation(request.operation_type) + and not self.can_use_multiple_write_locations_for_request(request) + ): + # For non-document resource types in case of client can use multiple write locations + # or when client cannot use multiple write locations, flip-flop between the + # first and the second writable region in DatabaseAccount (for manual failover) + if self.connection_policy.EnableEndpointDiscovery and self.account_write_locations: + location_index = min(location_index % 2, len(self.account_write_locations) - 1) + write_location = self.account_write_locations[location_index] + if (self.account_write_regional_routing_contexts_by_location + and write_location in self.account_write_regional_routing_contexts_by_location): + write_regional_routing_context = ( + self.account_write_regional_routing_contexts_by_location)[write_location] + return write_regional_routing_context.get_primary() + # if endpoint discovery is off for reads it should use passed in endpoint return self.default_regional_routing_context.get_primary() - # This is the default path for multi-region accounts using preferred locations. regional_routing_contexts = ( self._get_applicable_write_regional_routing_contexts(request) - if is_write + if documents._OperationType.IsWriteOperation(request.operation_type) else self._get_applicable_read_regional_routing_contexts(request) ) regional_routing_context = regional_routing_contexts[location_index % len(regional_routing_contexts)] From 9be8a2d1a78286deaac963cfa77e5f7b52b1d214 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Sat, 25 Oct 2025 12:44:40 -0500 Subject: [PATCH 06/11] fix: fixing pylint --- sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 2df50d473ccc..1f424ec57bae 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -106,9 +106,9 @@ def _get_applicable_regional_routing_contexts(regional_routing_contexts: list[Re circuit_breaker_excluded_contexts = [] for regional_routing_context in applicable_regional_routing_contexts: if location_name_by_endpoint.get(regional_routing_context.get_primary()) in circuit_breaker_exclude_list: - circuit_breaker_excluded_contexts.append(regional_routing_context) + circuit_breaker_excluded_contexts.append(regional_routing_context) else: - final_applicable_contexts.append(regional_routing_context) + final_applicable_contexts.append(regional_routing_context) # Add circuit breaker excluded locations as a last resort final_applicable_contexts.extend(circuit_breaker_excluded_contexts) @@ -498,4 +498,4 @@ def GetLocationalEndpoint(default_endpoint, location_name): ) return locational_endpoint - return None \ No newline at end of file + return None From 4158ab1e0e5dae4139f6a13267264da87abc7965 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Sat, 25 Oct 2025 13:31:13 -0500 Subject: [PATCH 07/11] fix: fixing pylint --- .../azure/cosmos/_location_cache.py | 87 +++++++++++++++---- .../aio/_global_endpoint_manager_async.py | 1 + 2 files changed, 72 insertions(+), 16 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 1f424ec57bae..181adb24d909 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -85,7 +85,15 @@ def _get_health_check_endpoints(regional_routing_contexts) -> Set[str]: # should use the endpoints in the order returned from gateway and only the ones specified in preferred locations preferred_endpoints = {context.get_primary() for context in regional_routing_contexts} return preferred_endpoints - +""" +this method separates the initial list of endpoints into two groups: those the user has explicitly excluded and those they have not. +It then takes the list of non-excluded endpoints and moves any that are currently marked as unavailable by the circuit breaker to the +end of that list. This ensures healthy endpoints are tried before unhealthy ones. +For special metadata requests (which must succeed), it adds the user-excluded locations back to the very end of the list. This +allows the SDK to try every possible endpoint as a last resort for critical operations. +If all available endpoints are filtered out, it adds a default fallback endpoint to the list to ensure there is always at least one endpoint +to attempt a connection to +""" def _get_applicable_regional_routing_contexts(regional_routing_contexts: list[RegionalRoutingContext], location_name_by_endpoint: Mapping[str, str], fall_back_regional_routing_context: RegionalRoutingContext, @@ -236,7 +244,24 @@ def _get_applicable_write_regional_routing_contexts(self, request: RequestObject # Else, return all regional endpoints return self.get_write_regional_routing_contexts() - + """ + this method determines the appropriate service endpoint for a request by following this logic: + Direct Override: If a specific endpoint is provided in request.location_endpoint_to_route, it is used immediately. + Main Logic Branching: It checks if use_preferred_locations is False or if the request is a write operation on a single-write account. + If True (No Preferred Locations or Single-Write Failover): + For single-write accounts: It alternates between the first two available write locations for failover, ignoring any exclusion lists. + For reads or multi-write accounts: It uses the full list of account locations, filters out any user-configured excluded + locations, and moves any circuit-breaker-excluded locations to the end of the list to be used as a last resort. + It then selects an endpoint from this filtered list. + It falls back to the default account endpoint if endpoint discovery is disabled or no locations are available. + If False (Default path for multi-region accounts using preferred locations): + It gets the list of applicable read or write locations, which is already filtered to respect preferred locations, user + exclusions, and circuit-breaker status. + It selects an endpoint from this list based on the request's location index. + In essence, the method intelligently routes requests by prioritizing explicit overrides, then handling specific failover + and non-preferred location scenarios, and finally defaulting to using the ordered list of preferred locations, all while + respecting various exclusion rules. + """ def resolve_service_endpoint(self, request): if request.location_endpoint_to_route: return request.location_endpoint_to_route @@ -246,22 +271,52 @@ def resolve_service_endpoint(self, request): request.use_preferred_locations if request.use_preferred_locations is not None else True ) + is_write = documents._OperationType.IsWriteOperation(request.operation_type) + if not use_preferred_locations or ( - documents._OperationType.IsWriteOperation(request.operation_type) - and not self.can_use_multiple_write_locations_for_request(request) + is_write and not self.can_use_multiple_write_locations_for_request(request) ): - # For non-document resource types in case of client can use multiple write locations - # or when client cannot use multiple write locations, flip-flop between the - # first and the second writable region in DatabaseAccount (for manual failover) - if self.connection_policy.EnableEndpointDiscovery and self.account_write_locations: - location_index = min(location_index % 2, len(self.account_write_locations) - 1) - write_location = self.account_write_locations[location_index] - if (self.account_write_regional_routing_contexts_by_location - and write_location in self.account_write_regional_routing_contexts_by_location): - write_regional_routing_context = ( - self.account_write_regional_routing_contexts_by_location)[write_location] - return write_regional_routing_context.get_primary() - # if endpoint discovery is off for reads it should use passed in endpoint + # When not using preferred locations, we use the full list of account locations, + # respecting their original order, while filtering out excluded locations. + ordered_locations = self.account_write_locations if is_write else self.account_read_locations + all_contexts_by_loc = (self.account_write_regional_routing_contexts_by_location if is_write + else self.account_read_regional_routing_contexts_by_location) + + # Get user-configured and circuit-breaker excluded locations + excluded_locations = self._get_configured_excluded_locations(request) + circuit_breaker_excluded_locations = request.excluded_locations_circuit_breaker or [] + + applicable_contexts = [] + circuit_breaker_contexts = [] + + # Safety check: if the location cache isn't populated, we can't proceed. + if self.connection_policy.EnableEndpointDiscovery and ordered_locations: + # For single-write-region accounts, the logic is simpler: try the first two available regions. + if is_write and not self.can_use_multiple_write_locations_for_request(request): + location_index = min(location_index % 2, len(ordered_locations) - 1) + write_location = ordered_locations[location_index] + if write_location in all_contexts_by_loc: + return all_contexts_by_loc[write_location].get_primary() + else: + # For reads (or multi-write) with use_preferred_locations=False, filter the locations. + for loc_name in ordered_locations: + if loc_name in all_contexts_by_loc: + context = all_contexts_by_loc[loc_name] + if loc_name in excluded_locations: + continue # Skip user-excluded locations + if loc_name in circuit_breaker_excluded_locations: + circuit_breaker_contexts.append(context) + else: + applicable_contexts.append(context) + + # Add circuit breaker excluded locations as a last resort + applicable_contexts.extend(circuit_breaker_contexts) + + if applicable_contexts: + effective_index = location_index % len(applicable_contexts) + return applicable_contexts[effective_index].get_primary() + + # Fallback to the default endpoint if no other endpoint is found or discovery is off. return self.default_regional_routing_context.get_primary() regional_routing_contexts = ( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py index db907841bac3..a2ca07b5b9ce 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_global_endpoint_manager_async.py @@ -103,6 +103,7 @@ def update_location_cache(self): def _mark_endpoint_unavailable(self, endpoint: str, context: str): """Marks an endpoint as unavailable for the appropriate operations. :param str endpoint: The endpoint to mark as unavailable. + :param str context: The context or reason for marking the endpoint as unavailable. """ write_endpoints = self.location_cache.get_all_write_endpoints() self.mark_endpoint_unavailable_for_read(endpoint, False, context) From f035c61804ea44a3cd428a77be4efd587e75afe1 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Sat, 25 Oct 2025 14:41:24 -0500 Subject: [PATCH 08/11] fix: fixing pylint --- .../azure/cosmos/_location_cache.py | 148 +++++++++--------- .../azure-cosmos/tests/test_read_items.py | 12 +- 2 files changed, 85 insertions(+), 75 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 181adb24d909..c5020ccfe710 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -85,21 +85,22 @@ def _get_health_check_endpoints(regional_routing_contexts) -> Set[str]: # should use the endpoints in the order returned from gateway and only the ones specified in preferred locations preferred_endpoints = {context.get_primary() for context in regional_routing_contexts} return preferred_endpoints -""" -this method separates the initial list of endpoints into two groups: those the user has explicitly excluded and those they have not. -It then takes the list of non-excluded endpoints and moves any that are currently marked as unavailable by the circuit breaker to the -end of that list. This ensures healthy endpoints are tried before unhealthy ones. -For special metadata requests (which must succeed), it adds the user-excluded locations back to the very end of the list. This -allows the SDK to try every possible endpoint as a last resort for critical operations. -If all available endpoints are filtered out, it adds a default fallback endpoint to the list to ensure there is always at least one endpoint -to attempt a connection to -""" + def _get_applicable_regional_routing_contexts(regional_routing_contexts: list[RegionalRoutingContext], location_name_by_endpoint: Mapping[str, str], fall_back_regional_routing_context: RegionalRoutingContext, exclude_location_list: list[str], circuit_breaker_exclude_list: list[str], resource_type: str) -> list[RegionalRoutingContext]: + """ + this method separates the initial list of endpoints into two groups: those the user has explicitly excluded and those they have not. + It then takes the list of non-excluded endpoints and moves any that are currently marked as unavailable by the circuit breaker to the + end of that list. This ensures healthy endpoints are tried before unhealthy ones. + For special metadata requests (which must succeed), it adds the user-excluded locations back to the very end of the list. This + allows the SDK to try every possible endpoint as a last resort for critical operations. + If all available endpoints are filtered out, it adds a default fallback endpoint to the list to ensure there is always at least one endpoint + to attempt a connection to + """ # filter endpoints by excluded locations applicable_regional_routing_contexts = [] user_excluded_regional_routing_contexts = [] @@ -244,25 +245,71 @@ def _get_applicable_write_regional_routing_contexts(self, request: RequestObject # Else, return all regional endpoints return self.get_write_regional_routing_contexts() - """ - this method determines the appropriate service endpoint for a request by following this logic: - Direct Override: If a specific endpoint is provided in request.location_endpoint_to_route, it is used immediately. - Main Logic Branching: It checks if use_preferred_locations is False or if the request is a write operation on a single-write account. - If True (No Preferred Locations or Single-Write Failover): - For single-write accounts: It alternates between the first two available write locations for failover, ignoring any exclusion lists. - For reads or multi-write accounts: It uses the full list of account locations, filters out any user-configured excluded - locations, and moves any circuit-breaker-excluded locations to the end of the list to be used as a last resort. - It then selects an endpoint from this filtered list. - It falls back to the default account endpoint if endpoint discovery is disabled or no locations are available. - If False (Default path for multi-region accounts using preferred locations): - It gets the list of applicable read or write locations, which is already filtered to respect preferred locations, user - exclusions, and circuit-breaker status. - It selects an endpoint from this list based on the request's location index. - In essence, the method intelligently routes requests by prioritizing explicit overrides, then handling specific failover - and non-preferred location scenarios, and finally defaulting to using the ordered list of preferred locations, all while - respecting various exclusion rules. - """ + + def _resolve_endpoint_without_preferred_locations(self, request, is_write, location_index): + """Resolves an endpoint when not using preferred locations.""" + ordered_locations = self.account_write_locations if is_write else self.account_read_locations + all_contexts_by_loc = (self.account_write_regional_routing_contexts_by_location if is_write + else self.account_read_regional_routing_contexts_by_location) + + # Safety check: if endpoint discovery is off or location cache isn't populated, fallback. + if not self.connection_policy.EnableEndpointDiscovery or not ordered_locations: + return self.default_regional_routing_context.get_primary() + + # For single-write-region accounts, failover between the first two available write regions. + if is_write and not self.can_use_multiple_write_locations_for_request(request): + effective_index = min(location_index % 2, len(ordered_locations) - 1) + write_location = ordered_locations[effective_index] + if write_location in all_contexts_by_loc: + return all_contexts_by_loc[write_location].get_primary() + return self.default_regional_routing_context.get_primary() # Fallback if location not found + + # For reads or multi-write, filter locations by user and circuit-breaker exclusions. + excluded_locations = self._get_configured_excluded_locations(request) + circuit_breaker_excluded_locations = request.excluded_locations_circuit_breaker or [] + + applicable_contexts = [] + circuit_breaker_contexts = [] + for loc_name in ordered_locations: + if loc_name in all_contexts_by_loc: + context = all_contexts_by_loc[loc_name] + if loc_name in excluded_locations: + continue # Skip user-excluded locations + if loc_name in circuit_breaker_excluded_locations: + circuit_breaker_contexts.append(context) + else: + applicable_contexts.append(context) + + # Add circuit breaker excluded locations as a last resort + applicable_contexts.extend(circuit_breaker_contexts) + + if applicable_contexts: + effective_index = location_index % len(applicable_contexts) + return applicable_contexts[effective_index].get_primary() + + # Fallback to the default endpoint if no other endpoint is found. + return self.default_regional_routing_context.get_primary() + def resolve_service_endpoint(self, request): + """ + this method determines the appropriate service endpoint for a request by following this logic: + Direct Override: If a specific endpoint is provided in request.location_endpoint_to_route, it is used immediately. + Main Logic Branching: It checks if use_preferred_locations is False or if the request is a write operation on a single-write account. + If True (No Preferred Locations or Single-Write Failover): + For single-write accounts: It alternates between the first two available write locations for failover, ignoring any exclusion lists. + For reads or multi-write accounts: It uses the full list of account locations, filters out any user-configured excluded + locations, and moves any circuit-breaker-excluded locations to the end of the list to be used as a last resort. + It then selects an endpoint from this filtered list. + It falls back to the default account endpoint if endpoint discovery is disabled or no locations are available. + If False (Default path for multi-region accounts using preferred locations): + It gets the list of applicable read or write locations, which is already filtered to respect preferred locations, user + exclusions, and circuit-breaker status. + It selects an endpoint from this list based on the request's location index. + In essence, the method intelligently routes requests by prioritizing explicit overrides, then handling specific failover + and non-preferred location scenarios, and finally defaulting to using the ordered list of preferred locations, all while + respecting various exclusion rules. + """ + if request.location_endpoint_to_route: return request.location_endpoint_to_route @@ -274,54 +321,13 @@ def resolve_service_endpoint(self, request): is_write = documents._OperationType.IsWriteOperation(request.operation_type) if not use_preferred_locations or ( - is_write and not self.can_use_multiple_write_locations_for_request(request) + is_write and not self.can_use_multiple_write_locations_for_request(request) ): - # When not using preferred locations, we use the full list of account locations, - # respecting their original order, while filtering out excluded locations. - ordered_locations = self.account_write_locations if is_write else self.account_read_locations - all_contexts_by_loc = (self.account_write_regional_routing_contexts_by_location if is_write - else self.account_read_regional_routing_contexts_by_location) - - # Get user-configured and circuit-breaker excluded locations - excluded_locations = self._get_configured_excluded_locations(request) - circuit_breaker_excluded_locations = request.excluded_locations_circuit_breaker or [] - - applicable_contexts = [] - circuit_breaker_contexts = [] - - # Safety check: if the location cache isn't populated, we can't proceed. - if self.connection_policy.EnableEndpointDiscovery and ordered_locations: - # For single-write-region accounts, the logic is simpler: try the first two available regions. - if is_write and not self.can_use_multiple_write_locations_for_request(request): - location_index = min(location_index % 2, len(ordered_locations) - 1) - write_location = ordered_locations[location_index] - if write_location in all_contexts_by_loc: - return all_contexts_by_loc[write_location].get_primary() - else: - # For reads (or multi-write) with use_preferred_locations=False, filter the locations. - for loc_name in ordered_locations: - if loc_name in all_contexts_by_loc: - context = all_contexts_by_loc[loc_name] - if loc_name in excluded_locations: - continue # Skip user-excluded locations - if loc_name in circuit_breaker_excluded_locations: - circuit_breaker_contexts.append(context) - else: - applicable_contexts.append(context) - - # Add circuit breaker excluded locations as a last resort - applicable_contexts.extend(circuit_breaker_contexts) - - if applicable_contexts: - effective_index = location_index % len(applicable_contexts) - return applicable_contexts[effective_index].get_primary() - - # Fallback to the default endpoint if no other endpoint is found or discovery is off. - return self.default_regional_routing_context.get_primary() + return self._resolve_endpoint_without_preferred_locations(request, is_write, location_index) regional_routing_contexts = ( self._get_applicable_write_regional_routing_contexts(request) - if documents._OperationType.IsWriteOperation(request.operation_type) + if is_write else self._get_applicable_read_regional_routing_contexts(request) ) regional_routing_context = regional_routing_contexts[location_index % len(regional_routing_contexts)] diff --git a/sdk/cosmos/azure-cosmos/tests/test_read_items.py b/sdk/cosmos/azure-cosmos/tests/test_read_items.py index 7a5bbbcbd951..71196dc9e637 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_read_items.py +++ b/sdk/cosmos/azure-cosmos/tests/test_read_items.py @@ -482,10 +482,14 @@ def test_read_items_concurrency_internals(self): self.assertEqual(mock_query.call_count, 3) call_args = mock_query.call_args_list - - self.assertEqual(len(call_args[0][0][1]['parameters']), 1000) - self.assertEqual(len(call_args[1][0][1]['parameters']), 1000) - self.assertEqual(len(call_args[2][0][1]['parameters']), 500) + # Extract the number of parameters from each call. + chunk_sizes = [len(call[0][1]['parameters']) for call in call_args] + + # Sort the chunk sizes to make the assertion deterministic. + chunk_sizes.sort(reverse=True) + self.assertEqual(chunk_sizes[0], 1000) + self.assertEqual(chunk_sizes[1], 1000) + self.assertEqual(chunk_sizes[2], 500) def test_read_items_multiple_physical_partitions_and_hook(self): """Tests read_items on a container with multiple physical partitions and verifies response_hook.""" From d0245338950d01aac32c9e26c9be4f5f1b793d5c Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Sat, 25 Oct 2025 22:53:06 -0500 Subject: [PATCH 09/11] fix: fixing pylint --- .../azure/cosmos/_location_cache.py | 84 +++++++++++++------ 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index c5020ccfe710..9353b7fb8228 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -92,14 +92,33 @@ def _get_applicable_regional_routing_contexts(regional_routing_contexts: list[Re exclude_location_list: list[str], circuit_breaker_exclude_list: list[str], resource_type: str) -> list[RegionalRoutingContext]: - """ - this method separates the initial list of endpoints into two groups: those the user has explicitly excluded and those they have not. - It then takes the list of non-excluded endpoints and moves any that are currently marked as unavailable by the circuit breaker to the - end of that list. This ensures healthy endpoints are tried before unhealthy ones. - For special metadata requests (which must succeed), it adds the user-excluded locations back to the very end of the list. This - allows the SDK to try every possible endpoint as a last resort for critical operations. - If all available endpoints are filtered out, it adds a default fallback endpoint to the list to ensure there is always at least one endpoint - to attempt a connection to + """Filters and reorders regional endpoints based on exclusion lists and health. + + This method separates the initial list of endpoints into two groups: those the user has explicitly excluded + and those they have not. It then takes the list of non-excluded endpoints and moves any that are currently + marked as unavailable by the circuit breaker to the end of that list. This ensures healthy endpoints are + tried before unhealthy ones. + + For special metadata requests (which must succeed), it adds the user-excluded locations back to the very + end of the list. This allows the SDK to try every possible endpoint as a last resort for critical operations. + + If all available endpoints are filtered out, it adds a default fallback endpoint to the list to ensure + there is always at least one endpoint to attempt a connection to. + + :param regional_routing_contexts: The initial list of regional contexts to filter. + :type regional_routing_contexts: list[RegionalRoutingContext] + :param location_name_by_endpoint: A mapping from endpoint URL to location name. + :type location_name_by_endpoint: Mapping[str, str] + :param fall_back_regional_routing_context: The context to use as a fallback if all others are filtered out. + :type fall_back_regional_routing_context: RegionalRoutingContext + :param exclude_location_list: A list of location names to exclude, based on user configuration. + :type exclude_location_list: list[str] + :param circuit_breaker_exclude_list: A list of location names to temporarily exclude due to circuit breaker logic. + :type circuit_breaker_exclude_list: list[str] + :param resource_type: The type of resource for the request, used to determine if it's a metadata request. + :type resource_type: str + :return: A filtered and reordered list of regional routing contexts. + :rtype: list[RegionalRoutingContext] """ # filter endpoints by excluded locations applicable_regional_routing_contexts = [] @@ -247,7 +266,21 @@ def _get_applicable_write_regional_routing_contexts(self, request: RequestObject return self.get_write_regional_routing_contexts() def _resolve_endpoint_without_preferred_locations(self, request, is_write, location_index): - """Resolves an endpoint when not using preferred locations.""" + """Resolves an endpoint when not using preferred locations or for single-write failover. + + This helper method is called when `use_preferred_locations` is False or for write operations on single-write + accounts. It determines the appropriate endpoint by cycling through available locations while respecting + user-configured and circuit-breaker-based exclusion lists. + + :param request: The request object for the current operation. + :type request: azure.cosmos._request_object.RequestObject + :param is_write: A boolean indicating if the operation is a write. + :type is_write: bool + :param location_index: The index used to select an endpoint from the list of available locations. + :type location_index: int + :return: The resolved endpoint URL as a string. + :rtype: str + """ ordered_locations = self.account_write_locations if is_write else self.account_read_locations all_contexts_by_loc = (self.account_write_regional_routing_contexts_by_location if is_write else self.account_read_regional_routing_contexts_by_location) @@ -291,23 +324,22 @@ def _resolve_endpoint_without_preferred_locations(self, request, is_write, locat return self.default_regional_routing_context.get_primary() def resolve_service_endpoint(self, request): - """ - this method determines the appropriate service endpoint for a request by following this logic: - Direct Override: If a specific endpoint is provided in request.location_endpoint_to_route, it is used immediately. - Main Logic Branching: It checks if use_preferred_locations is False or if the request is a write operation on a single-write account. - If True (No Preferred Locations or Single-Write Failover): - For single-write accounts: It alternates between the first two available write locations for failover, ignoring any exclusion lists. - For reads or multi-write accounts: It uses the full list of account locations, filters out any user-configured excluded - locations, and moves any circuit-breaker-excluded locations to the end of the list to be used as a last resort. - It then selects an endpoint from this filtered list. - It falls back to the default account endpoint if endpoint discovery is disabled or no locations are available. - If False (Default path for multi-region accounts using preferred locations): - It gets the list of applicable read or write locations, which is already filtered to respect preferred locations, user - exclusions, and circuit-breaker status. - It selects an endpoint from this list based on the request's location index. - In essence, the method intelligently routes requests by prioritizing explicit overrides, then handling specific failover - and non-preferred location scenarios, and finally defaulting to using the ordered list of preferred locations, all while - respecting various exclusion rules. + """Determines the appropriate service endpoint for a given request. + + This method intelligently routes requests by following a specific logic: + 1. If `request.location_endpoint_to_route` is set, it is used immediately. + 2. No Preferred Locations or Single-Write Failover: If `use_preferred_locations` is False or it's a + write operation on a single-write account, it calls a helper to resolve the endpoint. + - For single-write accounts, it fails over between the first two write locations. + - For other cases, it filters locations based on user and circuit-breaker exclusions. + 3. Preferred Locations (Default): It uses the pre-filtered list of applicable read or write locations, + respecting preferred locations, user exclusions, and circuit-breaker status, and selects an endpoint + based on the request's location index. + + :param request: The request object for the current operation. + :type request: azure.cosmos._request_object.RequestObject + :return: The resolved endpoint URL as a string. + :rtype: str """ if request.location_endpoint_to_route: From 7b42885805f15987ba94d0cf5420f5c4c51828ca Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Sun, 26 Oct 2025 14:43:21 -0500 Subject: [PATCH 10/11] fix: fixing logic --- .../azure/cosmos/_location_cache.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 9353b7fb8228..6d9c12210df3 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -138,16 +138,15 @@ def _get_applicable_regional_routing_contexts(regional_routing_contexts: list[Re else: final_applicable_contexts.append(regional_routing_context) - # Add circuit breaker excluded locations as a last resort - final_applicable_contexts.extend(circuit_breaker_excluded_contexts) - - # Preserves the excluded locations at the end of the list, because for the metadata API calls, excluded locations - # are not preferred, but all endpoints must be used. + # For metadata requests, add user-excluded locations BEFORE fallback if base.IsMasterResource(resource_type): final_applicable_contexts.extend(user_excluded_regional_routing_contexts) - # If all preferred locations are excluded, use the fallback endpoint. - if not final_applicable_contexts: + # If no healthy regions, try circuit-breaker excluded ones BEFORE global fallback + if not final_applicable_contexts and circuit_breaker_excluded_contexts: + final_applicable_contexts = circuit_breaker_excluded_contexts + elif not final_applicable_contexts: + # Only use global fallback if there are no other options final_applicable_contexts.append(fall_back_regional_routing_context) return final_applicable_contexts @@ -242,7 +241,7 @@ def _get_applicable_read_regional_routing_contexts(self, request: RequestObject) self.account_locations_by_read_endpoints, self.get_write_regional_routing_contexts()[0], excluded_locations, - request.excluded_locations_circuit_breaker, + request.excluded_locations_circuit_breaker or [], request.resource_type) # Else, return all regional endpoints @@ -259,7 +258,7 @@ def _get_applicable_write_regional_routing_contexts(self, request: RequestObject self.account_locations_by_write_endpoints, self.default_regional_routing_context, excluded_locations, - request.excluded_locations_circuit_breaker, + request.excluded_locations_circuit_breaker or [], request.resource_type) # Else, return all regional endpoints @@ -313,8 +312,9 @@ def _resolve_endpoint_without_preferred_locations(self, request, is_write, locat else: applicable_contexts.append(context) - # Add circuit breaker excluded locations as a last resort - applicable_contexts.extend(circuit_breaker_contexts) + # Only add circuit breaker excluded locations if no healthy regions exist + if not applicable_contexts and circuit_breaker_contexts: + applicable_contexts = circuit_breaker_contexts if applicable_contexts: effective_index = location_index % len(applicable_contexts) From e3b582e56861f5ff777381e08e8b346447af8753 Mon Sep 17 00:00:00 2001 From: dibahlfi <106994927+dibahlfi@users.noreply.github.com> Date: Mon, 27 Oct 2025 14:18:25 -0500 Subject: [PATCH 11/11] fix: cleaning up code --- sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py index 6d9c12210df3..cf8239488712 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py @@ -225,9 +225,7 @@ def _get_configured_excluded_locations(self, request: RequestObject) -> list[str excluded_locations = list(self.connection_policy.ExcludedLocations) else: excluded_locations = [] - #for excluded_location in request.excluded_locations_circuit_breaker: - #if excluded_location not in excluded_locations: - #excluded_locations.append(excluded_location) + return excluded_locations def _get_applicable_read_regional_routing_contexts(self, request: RequestObject) -> list[RegionalRoutingContext]: