diff --git a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py index 6fef311f1d58..ba66d083e57b 100644 --- a/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py +++ b/sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py @@ -3,6 +3,9 @@ from azure.eventhub._pyamqp.link import Link from azure.eventhub._pyamqp.receiver import ReceiverLink from azure.eventhub._pyamqp.constants import LinkState +from azure.eventhub._pyamqp.link import Source, Target +from unittest.mock import Mock, patch +from azure.eventhub._pyamqp.constants import LINK_MAX_MESSAGE_SIZE import pytest @@ -84,6 +87,73 @@ def test_receive_transfer_frame_multiple(): link._incoming_transfer(transfer_frame_two) assert link.current_link_credit == 1 +def test_max_message_size_negotiation_with_client_unlimited(): + """ + Test AMQP attach frame negotiation where client sends max_message_size=0 (unlimited) + and server responds with its limit (20MB), resulting in final size of 20MB. + + """ + mock_session = Mock() + mock_connection = Mock() + mock_session._connection = mock_connection + + SERVER_MAX_MESSAGE_SIZE = 20 * 1024 * 1024 + + link = Link( + mock_session, + 3, + name="test_link", + role=False, # Sender role + source_address="test_source", + target_address="test_target", + network_trace=False, + network_trace_params={}, + max_message_size=LINK_MAX_MESSAGE_SIZE + ) + + # Verifying that client sends 0 (unlimited) in attach frame + assert link.max_message_size == 0, f"Expected client max_message_size=0, got {link._max_message_size}" + # Simulating server's attach response with 20MB limit, Mock incoming attach frame from server + mock_attach_frame = [ + "test_link", + 3, + False, + 0, + 1, + Source(address="test_source"), + Target(address="test_target"), + None, + False, + None, + 20 * 1024 * 1024, + None, + None, + None, ] + + # Testing _outgoing_attach() + with patch.object(link, '_outgoing_attach') as mock_outgoing_attach: + # Trigger outgoing attach + link.attach() + + # Verifying _outgoing_attach was called + assert mock_outgoing_attach.called, "_outgoing_attach should have been called during attach()" + + # Get the attach frame that would be sent to server + call_args = mock_outgoing_attach.call_args + if call_args and call_args[0]: + attach_frame = call_args[0][0] + assert attach_frame.max_message_size == 0, f"Expected client to send max_message_size=0, got {attach_frame.max_message_size}" + + # Testing _incoming_attach() + with patch.object(link, '_outgoing_attach') as mock_outgoing_response: + + # Calling _incoming_attach to process server's response + link._incoming_attach(mock_attach_frame) + + expected_final_size = SERVER_MAX_MESSAGE_SIZE + # Verifying remote_max_message_size is set correctly + assert link.remote_max_message_size == expected_final_size, \ + f"Expected remote_max_message_size={SERVER_MAX_MESSAGE_SIZE}, got {link.remote_max_message_size}" def test_receive_transfer_continuation_frame(): session = None