From df61e3675ea60c221cd25e153937e5336d4d6962 Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Tue, 9 Dec 2025 11:17:36 -0500 Subject: [PATCH 1/3] fix http concurrency Add tests --- .../Transfer/Internal/FilePartDataHandler.cs | 7 +- .../Internal/MultipartDownloadManager.cs | 350 ++++++++++++------ .../Custom/MultipartDownloadManagerTests.cs | 250 +++++++++---- 3 files changed, 406 insertions(+), 201 deletions(-) diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs index da9210465fde..26410c04bed7 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs @@ -90,7 +90,7 @@ public async Task ProcessPartAsync( partNumber, offset); // Write part data to file at the calculated offset - await WritePartToFileAsync(offset, response, cancellationToken) + await WritePartToFileAsync(partNumber, offset, response, cancellationToken) .ConfigureAwait(false); _logger.DebugFormat("FilePartDataHandler: [Part {0}] File write completed successfully", @@ -192,6 +192,7 @@ private long GetPartOffset(GetObjectResponse response, int partNumber) /// Writes part data from GetObjectResponse ResponseStream to the file at the specified offset. /// private async Task WritePartToFileAsync( + int partNumber, long offset, GetObjectResponse response, CancellationToken cancellationToken) @@ -213,7 +214,7 @@ private async Task WritePartToFileAsync( // Seek to the correct offset for this part fileStream.Seek(offset, SeekOrigin.Begin); - _logger.DebugFormat("FilePartDataHandler: Writing {0} bytes to file at offset {1}", + _logger.DebugFormat("FilePartDataHandler: [Part {0} Writing {1} bytes to file at offset {2}", partNumber, response.ContentLength, offset); // Use GetObjectResponse's stream copy logic which includes: @@ -232,7 +233,7 @@ await response.WriteResponseStreamAsync( await fileStream.FlushAsync(cancellationToken) .ConfigureAwait(false); - _logger.DebugFormat("FilePartDataHandler: Successfully wrote {0} bytes at offset {1}", + _logger.DebugFormat("FilePartDataHandler: [Part {0}] Successfully wrote {1} bytes at offset {2}", partNumber, response.ContentLength, offset); } } diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs index 59d14e889b28..ed5be5a08f1c 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs @@ -173,6 +173,41 @@ public Exception DownloadException } } + /// + /// Discovers the download strategy (single-part vs multipart) by making an initial GetObject request. + /// + /// Cancellation token to cancel the discovery operation. + /// + /// A containing information about the object size, part count, + /// and the initial GetObject response. + /// + /// + /// IMPORTANT - HTTP Semaphore Lifecycle: + /// + /// This method acquires an HTTP concurrency slot from the configured semaphore and downloads Part 1. + /// The semaphore slot is HELD until completes processing Part 1. + /// Callers MUST call after this method to release the semaphore. + /// Failure to call will cause the semaphore slot to remain held indefinitely, + /// potentially blocking other downloads and causing deadlocks. + /// + /// Concurrency Implications: + /// + /// With limited HTTP concurrency (e.g., ConcurrentServiceRequests=1 for shared throttlers in directory downloads), + /// concurrent calls to this method will block until previous downloads complete their full lifecycle + /// (discover → start). This is by design to ensure the entire I/O operation (network + disk) is + /// within the concurrency limit. For single-slot throttlers, downloads must be processed sequentially: + /// complete one download's full lifecycle before starting the next. + /// + /// Typical Usage Pattern: + /// + /// var discovery = await manager.DiscoverDownloadStrategyAsync(cancellationToken); + /// await manager.StartDownloadsAsync(discovery, progressCallback, cancellationToken); + /// await manager.DownloadCompletionTask; // Wait for multipart downloads to finish + /// + /// + /// Thrown if the manager has been disposed. + /// Thrown if discovery has already been performed. + /// Thrown if the operation is cancelled. /// public async Task DiscoverDownloadStrategyAsync(CancellationToken cancellationToken) { @@ -209,6 +244,50 @@ public async Task DiscoverDownloadStrategyAsync(Cancell } } + /// + /// Processes Part 1 and starts downloading remaining parts for multipart downloads. + /// Returns immediately after processing Part 1 to allow the consumer to begin reading. + /// + /// + /// The discovery result from containing object metadata + /// and the initial GetObject response. + /// + /// + /// Optional progress callback that will be invoked as parts are downloaded. For multipart downloads, + /// progress is aggregated across all concurrent part downloads. + /// + /// Cancellation token to cancel the download operation. + /// + /// A task that completes after Part 1 is processed. For multipart downloads, remaining parts + /// continue downloading in the background (monitor via ). + /// + /// + /// HTTP Semaphore Release: + /// + /// This method processes Part 1 (downloaded during ) + /// and releases the HTTP semaphore slot that was acquired during discovery. + /// The semaphore is released after both the network download and disk write + /// operations complete for Part 1. This ensures the ConcurrentServiceRequests limit + /// controls the entire I/O operation (network + disk), not just the network download. + /// + /// Background Processing (Multipart Only): + /// + /// For multipart downloads (when TotalParts > 1), this method starts a background task + /// to download and process remaining parts (Part 2+) and returns immediately. This allows the + /// consumer to start reading from the buffer without waiting for all downloads to complete, + /// which prevents deadlocks when the buffer fills up before the consumer begins reading. + /// Monitor to detect when all background downloads have finished. + /// + /// Single-Part Downloads: + /// + /// For single-part downloads (when TotalParts = 1), this method processes Part 1 synchronously + /// and returns immediately. No background task is created, and + /// will already be completed when this method returns. + /// + /// + /// Thrown if the manager has been disposed. + /// Thrown if is null. + /// Thrown if the operation is cancelled. /// public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, EventHandler progressCallback, CancellationToken cancellationToken) { @@ -246,17 +325,26 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E } // Process Part 1 from InitialResponse (applies to both single-part and multipart) - _logger.DebugFormat("MultipartDownloadManager: Buffering Part 1 from discovery response"); + // NOTE: Semaphore is still held from discovery phase and will be released in finally block + _logger.DebugFormat("MultipartDownloadManager: Processing Part 1 from discovery response"); await _dataHandler.ProcessPartAsync(1, discoveryResult.InitialResponse, cancellationToken).ConfigureAwait(false); + + _logger.DebugFormat("MultipartDownloadManager: Part 1 processing completed"); } finally { // Always detach the event handler to prevent memory leak - // This runs whether ProcessPartAsync succeeds or throws if (wrappedCallback != null) { discoveryResult.InitialResponse.WriteObjectProgressEvent -= wrappedCallback; } + + // Release semaphore after BOTH network download AND disk write complete for Part 1 + // This ensures ConcurrentServiceRequests controls the entire I/O operation, + // consistent with Parts 2+ (see CreateDownloadTaskAsync) + _httpConcurrencySlots.Release(); + _logger.DebugFormat("MultipartDownloadManager: [Part 1] HTTP concurrency slot released (Available: {0}/{1})", + _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests); } if (discoveryResult.IsSinglePart) @@ -374,7 +462,9 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Waiting for HTTP concurrency slot (Available: {1}/{2})", partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests); - // Limit HTTP concurrency + // Limit HTTP concurrency for both network download AND disk write + // The semaphore is held until AFTER ProcessPartAsync completes to ensure + // ConcurrentServiceRequests controls the entire I/O operation await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false); _logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot acquired", partNumber); @@ -438,25 +528,27 @@ private async Task CreateDownloadTaskAsync(int partNumber, long objectSize, Even } _logger.DebugFormat("MultipartDownloadManager: [Part {0}] ETag validation passed", partNumber); + + _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing part (handler will decide: stream or buffer)", partNumber); + + // Delegate data handling to the handler + // IMPORTANT: Handler takes ownership of response and is responsible for disposing it in ALL cases: + // - If streaming: StreamingDataSource takes ownership and disposes when consumer finishes reading + // - If buffering: Handler disposes immediately after copying data to buffer + // - On error: Handler disposes in its catch block before rethrowing + await _dataHandler.ProcessPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false); + ownsResponse = false; // Ownership transferred to handler + + _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing completed successfully", partNumber); } finally { + // Release semaphore after BOTH network download AND disk write complete + // This ensures ConcurrentServiceRequests limits the entire I/O operation _httpConcurrencySlots.Release(); _logger.DebugFormat("MultipartDownloadManager: [Part {0}] HTTP concurrency slot released (Available: {1}/{2})", partNumber, _httpConcurrencySlots.CurrentCount, _config.ConcurrentServiceRequests); } - - _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing part (handler will decide: stream or buffer)", partNumber); - - // Delegate data handling to the handler - // IMPORTANT: Handler takes ownership of response and is responsible for disposing it in ALL cases: - // - If streaming: StreamingDataSource takes ownership and disposes when consumer finishes reading - // - If buffering: Handler disposes immediately after copying data to buffer - // - On error: Handler disposes in its catch block before rethrowing - await _dataHandler.ProcessPartAsync(partNumber, response, cancellationToken).ConfigureAwait(false); - ownsResponse = false; // Ownership transferred to handler - - _logger.DebugFormat("MultipartDownloadManager: [Part {0}] Processing completed successfully", partNumber); } catch (Exception ex) { @@ -491,59 +583,66 @@ private async Task DiscoverUsingPartStrategyAsync(Cance await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false); GetObjectResponse firstPartResponse = null; + + // NOTE: Semaphore is NOT released here - it will be released in StartDownloadsAsync + // after Part 1 is processed. This ensures the semaphore controls both network download + // AND disk write for Part 1, consistent with Parts 2+ (see CreateDownloadTaskAsync) + try { // SEP Part GET Step 2: "send the request and wait for the response in a non-blocking fashion" firstPartResponse = await _s3Client.GetObjectAsync(firstPartRequest, cancellationToken).ConfigureAwait(false); - } - finally - { - _httpConcurrencySlots.Release(); - _logger.DebugFormat("MultipartDownloadManager: [Part 1 Discovery] HTTP concurrency slot released"); - } - - if (firstPartResponse == null) - throw new InvalidOperationException("Failed to retrieve object from S3"); - - // SEP Part GET Step 3: Save ETag for later IfMatch validation in subsequent requests - _savedETag = firstPartResponse.ETag; - - // SEP Part GET Step 3: "check the response. First parse total content length from ContentRange - // of the GetObject response and save the value in a variable. The length is the numeric value - // after / delimiter. For example, given ContentRange=bytes 0-1/5, 5 is the total content length. - // Then check PartsCount." - if (firstPartResponse.PartsCount.HasValue && firstPartResponse.PartsCount.Value > 1) - { - // SEP Part GET Step 3: "If PartsCount in the response is larger than 1, it indicates there - // are more parts available to download. The S3 Transfer Manager MUST save etag from the - // response to a variable." - _discoveredPartCount = firstPartResponse.PartsCount.Value; - // Parse total content length from ContentRange header - // For example, "bytes 0-5242879/52428800" -> extract 52428800 - var totalObjectSize = ExtractTotalSizeFromContentRange(firstPartResponse.ContentRange); + if (firstPartResponse == null) + throw new InvalidOperationException("Failed to retrieve object from S3"); - // SEP Part GET Step 7 will use this response for creating DownloadResponse - // Keep the response with its stream (will be buffered in StartDownloadsAsync) - return new DownloadDiscoveryResult + // SEP Part GET Step 3: Save ETag for later IfMatch validation in subsequent requests + _savedETag = firstPartResponse.ETag; + + // SEP Part GET Step 3: "check the response. First parse total content length from ContentRange + // of the GetObject response and save the value in a variable. The length is the numeric value + // after / delimiter. For example, given ContentRange=bytes 0-1/5, 5 is the total content length. + // Then check PartsCount." + if (firstPartResponse.PartsCount.HasValue && firstPartResponse.PartsCount.Value > 1) { - TotalParts = firstPartResponse.PartsCount.Value, - ObjectSize = totalObjectSize, - InitialResponse = firstPartResponse // Keep response with stream - }; + // SEP Part GET Step 3: "If PartsCount in the response is larger than 1, it indicates there + // are more parts available to download. The S3 Transfer Manager MUST save etag from the + // response to a variable." + _discoveredPartCount = firstPartResponse.PartsCount.Value; + + // Parse total content length from ContentRange header + // For example, "bytes 0-5242879/52428800" -> extract 52428800 + var totalObjectSize = ExtractTotalSizeFromContentRange(firstPartResponse.ContentRange); + + // SEP Part GET Step 7 will use this response for creating DownloadResponse + // Keep the response with its stream (will be buffered in StartDownloadsAsync) + return new DownloadDiscoveryResult + { + TotalParts = firstPartResponse.PartsCount.Value, + ObjectSize = totalObjectSize, + InitialResponse = firstPartResponse // Keep response with stream + }; + } + else + { + // SEP Part GET Step 3: "If PartsCount is 1, go to Step 7." + _discoveredPartCount = 1; + + // Single part upload - return the response for immediate use (SEP Step 7) + return new DownloadDiscoveryResult + { + TotalParts = 1, + ObjectSize = firstPartResponse.ContentLength, + InitialResponse = firstPartResponse // Keep response with stream + }; + } } - else + catch { - // SEP Part GET Step 3: "If PartsCount is 1, go to Step 7." - _discoveredPartCount = 1; - - // Single part upload - return the response for immediate use (SEP Step 7) - return new DownloadDiscoveryResult - { - TotalParts = 1, - ObjectSize = firstPartResponse.ContentLength, - InitialResponse = firstPartResponse // Keep response with stream - }; + // On error, release semaphore and dispose response before rethrowing + _httpConcurrencySlots.Release(); + firstPartResponse?.Dispose(); + throw; } } @@ -568,84 +667,91 @@ private async Task DiscoverUsingRangeStrategyAsync(Canc await _httpConcurrencySlots.WaitAsync(cancellationToken).ConfigureAwait(false); GetObjectResponse firstRangeResponse = null; + + // NOTE: Semaphore is NOT released here - it will be released in StartDownloadsAsync + // after Part 1 is processed. This ensures the semaphore controls both network download + // AND disk write for Part 1, consistent with Parts 2+ (see CreateDownloadTaskAsync) + try { // SEP Ranged GET Step 2: "send the request and wait for the response in a non-blocking fashion" firstRangeResponse = await _s3Client.GetObjectAsync(firstRangeRequest, cancellationToken).ConfigureAwait(false); - } - finally - { - _httpConcurrencySlots.Release(); - _logger.DebugFormat("MultipartDownloadManager: [Part 1 Discovery] HTTP concurrency slot released"); - } - - // Defensive null check - if (firstRangeResponse == null) - throw new InvalidOperationException("Failed to retrieve object from S3"); - - // SEP Ranged GET Step 5: "save Etag from the response to a variable" - // (for IfMatch validation in subsequent requests) - _savedETag = firstRangeResponse.ETag; - - // SEP Ranged GET Step 3: "parse total content length from ContentRange of the GetObject response - // and save the value in a variable. The length is the numeric value after / delimiter. - // For example, given ContentRange=bytes0-1/5, 5 is the total content length." - // Check if ContentRange is null (object smaller than requested range) - if (firstRangeResponse.ContentRange == null) - { - // No ContentRange means we got the entire small object - _discoveredPartCount = 1; - return new DownloadDiscoveryResult + // Defensive null check + if (firstRangeResponse == null) + throw new InvalidOperationException("Failed to retrieve object from S3"); + + // SEP Ranged GET Step 5: "save Etag from the response to a variable" + // (for IfMatch validation in subsequent requests) + _savedETag = firstRangeResponse.ETag; + + // SEP Ranged GET Step 3: "parse total content length from ContentRange of the GetObject response + // and save the value in a variable. The length is the numeric value after / delimiter. + // For example, given ContentRange=bytes0-1/5, 5 is the total content length." + // Check if ContentRange is null (object smaller than requested range) + if (firstRangeResponse.ContentRange == null) { - TotalParts = 1, - ObjectSize = firstRangeResponse.ContentLength, - InitialResponse = firstRangeResponse // Keep response with stream - }; - } - - - // Parse total object size from ContentRange (e.g., "bytes 0-5242879/52428800" -> 52428800) - var totalContentLength = ExtractTotalSizeFromContentRange(firstRangeResponse.ContentRange); - - // SEP Ranged GET Step 4: "compare the parsed total content length from Step 3 with ContentLength - // of the response. If the parsed total content length equals to the value from ContentLength, - // it indicates this request contains all of the data. The request is finished, return the response." - if (totalContentLength == firstRangeResponse.ContentLength) - { - // Single part: total size equals returned ContentLength - // This request contains all of the data - _discoveredPartCount = 1; + // No ContentRange means we got the entire small object + _discoveredPartCount = 1; + + return new DownloadDiscoveryResult + { + TotalParts = 1, + ObjectSize = firstRangeResponse.ContentLength, + InitialResponse = firstRangeResponse // Keep response with stream + }; + } + + + // Parse total object size from ContentRange (e.g., "bytes 0-5242879/52428800" -> 52428800) + var totalContentLength = ExtractTotalSizeFromContentRange(firstRangeResponse.ContentRange); + + // SEP Ranged GET Step 4: "compare the parsed total content length from Step 3 with ContentLength + // of the response. If the parsed total content length equals to the value from ContentLength, + // it indicates this request contains all of the data. The request is finished, return the response." + if (totalContentLength == firstRangeResponse.ContentLength) + { + // Single part: total size equals returned ContentLength + // This request contains all of the data + _discoveredPartCount = 1; + + return new DownloadDiscoveryResult + { + TotalParts = 1, + ObjectSize = totalContentLength, + InitialResponse = firstRangeResponse // Keep response with stream + }; + } + // SEP Ranged GET Step 4: "If they do not match, it indicates there are more parts available + // to download. Add a validation to verify that ContentLength equals to the targetPartSizeBytes." + if (firstRangeResponse.ContentLength != targetPartSize) + { + throw new InvalidOperationException( + $"Expected first part size {targetPartSize} bytes, but received {firstRangeResponse.ContentLength} bytes. " + + $"Total object size is {totalContentLength} bytes."); + } + + // SEP Ranged GET Step 5: "calculate number of requests required by performing integer division + // of total contentLength/targetPartSizeBytes. Save the number of ranged GET requests in a variable." + _discoveredPartCount = (int)Math.Ceiling((double)totalContentLength / targetPartSize); + + // SEP Ranged GET Step 9 will use this response for creating DownloadResponse + // Keep the response with its stream (will be buffered in StartDownloadsAsync) return new DownloadDiscoveryResult { - TotalParts = 1, + TotalParts = _discoveredPartCount, ObjectSize = totalContentLength, InitialResponse = firstRangeResponse // Keep response with stream }; } - - // SEP Ranged GET Step 4: "If they do not match, it indicates there are more parts available - // to download. Add a validation to verify that ContentLength equals to the targetPartSizeBytes." - if (firstRangeResponse.ContentLength != targetPartSize) + catch { - throw new InvalidOperationException( - $"Expected first part size {targetPartSize} bytes, but received {firstRangeResponse.ContentLength} bytes. " + - $"Total object size is {totalContentLength} bytes."); + // On error, release semaphore and dispose response before rethrowing + _httpConcurrencySlots.Release(); + firstRangeResponse?.Dispose(); + throw; } - - // SEP Ranged GET Step 5: "calculate number of requests required by performing integer division - // of total contentLength/targetPartSizeBytes. Save the number of ranged GET requests in a variable." - _discoveredPartCount = (int)Math.Ceiling((double)totalContentLength / targetPartSize); - - // SEP Ranged GET Step 9 will use this response for creating DownloadResponse - // Keep the response with its stream (will be buffered in StartDownloadsAsync) - return new DownloadDiscoveryResult - { - TotalParts = _discoveredPartCount, - ObjectSize = totalContentLength, - InitialResponse = firstRangeResponse // Keep response with stream - }; } private GetObjectRequest CreateGetObjectRequest() diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs index 7ea1c89af832..b24eaeb31fe9 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs @@ -1951,74 +1951,6 @@ public async Task DiscoverUsingRangeStrategy_CallsWaitForCapacityAsync() mockDataHandler.Verify(x => x.WaitForCapacityAsync(It.IsAny()), Times.Once); } - [TestMethod] - public async Task DiscoverUsingPartStrategy_AcquiresAndReleasesHttpSlot() - { - // Arrange - Use real SemaphoreSlim to track HTTP concurrency usage - var httpThrottler = new SemaphoreSlim(2, 2); // 2 concurrent requests max - var initialCount = httpThrottler.CurrentCount; - - var mockDataHandler = CreateMockDataHandler(); - var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( - 8 * 1024 * 1024, 3, 24 * 1024 * 1024, "test-etag"); - - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( - (req, ct) => Task.FromResult(mockResponse)); - - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( - downloadType: MultipartDownloadType.PART); - var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); - - // Use shared HTTP throttler to track usage - var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); - - // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - - // Assert - Assert.IsNotNull(result); - Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP concurrency slot should be released after discovery completes"); - - // Cleanup - httpThrottler.Dispose(); - } - - [TestMethod] - public async Task DiscoverUsingRangeStrategy_AcquiresAndReleasesHttpSlot() - { - // Arrange - Use real SemaphoreSlim to track HTTP concurrency usage - var httpThrottler = new SemaphoreSlim(2, 2); // 2 concurrent requests max - var initialCount = httpThrottler.CurrentCount; - - var mockDataHandler = CreateMockDataHandler(); - var totalObjectSize = 52428800; // 50MB - var partSize = 8388608; // 8MB - var mockResponse = MultipartDownloadTestHelpers.CreateRangeResponse( - 0, partSize - 1, totalObjectSize, "test-etag"); - - var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( - (req, ct) => Task.FromResult(mockResponse)); - - var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( - partSize: partSize, - downloadType: MultipartDownloadType.RANGE); - var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); - - // Use shared HTTP throttler to track usage - var coordinator = new MultipartDownloadManager(mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); - - // Act - var result = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); - - // Assert - Assert.IsNotNull(result); - Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP concurrency slot should be released after discovery completes"); - - // Cleanup - httpThrottler.Dispose(); - } [TestMethod] public async Task MultipleDownloads_WithSharedHttpThrottler_RespectsLimits() @@ -2044,16 +1976,22 @@ public async Task MultipleDownloads_WithSharedHttpThrottler_RespectsLimits() var coordinator1 = new MultipartDownloadManager(mockClient1.Object, request1, config, mockDataHandler1.Object, null, sharedThrottler); var coordinator2 = new MultipartDownloadManager(mockClient2.Object, request2, config, mockDataHandler2.Object, null, sharedThrottler); - // Act - Start both discoveries concurrently - var task1 = coordinator1.DiscoverDownloadStrategyAsync(CancellationToken.None); - var task2 = coordinator2.DiscoverDownloadStrategyAsync(CancellationToken.None); + var discovery1 = await coordinator1.DiscoverDownloadStrategyAsync(CancellationToken.None); + await coordinator1.StartDownloadsAsync(discovery1, null, CancellationToken.None); + + var discovery2 = await coordinator2.DiscoverDownloadStrategyAsync(CancellationToken.None); + await coordinator2.StartDownloadsAsync(discovery2, null, CancellationToken.None); - await Task.WhenAll(task1, task2); + // Wait for all background work to complete + await Task.WhenAll( + coordinator1.DownloadCompletionTask, + coordinator2.DownloadCompletionTask + ); - // Assert - Both should complete successfully despite shared throttler limits - Assert.IsNotNull(task1.Result); - Assert.IsNotNull(task2.Result); - Assert.AreEqual(1, sharedThrottler.CurrentCount, "HTTP throttler should be fully released"); + // Assert - Both should complete successfully and semaphore should be fully released + Assert.IsNotNull(discovery1); + Assert.IsNotNull(discovery2); + Assert.AreEqual(1, sharedThrottler.CurrentCount, "HTTP throttler should be fully released after complete download lifecycle"); // Cleanup coordinator1.Dispose(); @@ -2230,6 +2168,166 @@ public async Task Discovery_SinglePart_StillCallsCapacityCheck() #endregion + #region Concurrency Control Tests + + [TestMethod] + public async Task HttpSemaphore_HeldThroughProcessPartAsync() + { + // Arrange - Test that HTTP semaphore is NOT released until ProcessPartAsync completes + // This is the core bug that commit daf985e fixed + var totalParts = 2; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + // Use our own semaphore to monitor its state + var concurrentRequests = 1; + var httpSemaphore = new SemaphoreSlim(concurrentRequests, concurrentRequests); + + var part1EnteredProcessPart = new TaskCompletionSource(); + var part1CanExitProcessPart = new TaskCompletionSource(); + var semaphoreWasReleasedDuringPart1 = false; + + var mockDataHandler = new Mock(); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (partNum, response, ct) => + { + if (partNum == 1) + { + // Part 1 enters ProcessPartAsync + part1EnteredProcessPart.SetResult(true); + + // Check if semaphore has been released (it shouldn't be with the fix!) + if (httpSemaphore.CurrentCount > 0) + { + semaphoreWasReleasedDuringPart1 = true; + } + + // Block Part 1 here so we can observe semaphore state + await part1CanExitProcessPart.Task; + } + }); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration( + concurrentRequests: concurrentRequests); + + // Pass in our instrumented semaphore + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpSemaphore); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + var startTask = coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + // Wait for Part 1 to enter ProcessPartAsync + await part1EnteredProcessPart.Task; + + // Check semaphore state while Part 1 is in ProcessPartAsync + var semaphoreAvailableDuringProcessing = httpSemaphore.CurrentCount > 0; + + // Release Part 1 to continue + part1CanExitProcessPart.SetResult(true); + + await startTask; + await coordinator.DownloadCompletionTask; + + // Assert - This is the deterministic test of the fix + // Before fix (commit daf985e): semaphore was released after HTTP download but BEFORE ProcessPartAsync + // After fix: semaphore is held through the ENTIRE operation including ProcessPartAsync + Assert.IsFalse(semaphoreAvailableDuringProcessing, + "HTTP semaphore should NOT be released while ProcessPartAsync is executing. " + + "Before fix (daf985e): semaphore.CurrentCount would be > 0 (released early). " + + "After fix: semaphore.CurrentCount should be 0 (held through ProcessPartAsync)."); + + Assert.IsFalse(semaphoreWasReleasedDuringPart1, + "Semaphore should not have been released at any point during Part 1 ProcessPartAsync execution"); + + // Cleanup + httpSemaphore.Dispose(); + } + + [TestMethod] + public async Task HttpSemaphore_RangeStrategy_HeldThroughProcessPartAsync() + { + // Arrange - Test that RANGE strategy also holds semaphore through ProcessPartAsync + var totalObjectSize = 17 * 1024 * 1024; // 17MB -> 3 parts @ 8MB + var partSize = 8 * 1024 * 1024; + + var concurrentRequests = 1; + var httpSemaphore = new SemaphoreSlim(concurrentRequests, concurrentRequests); + + var part1EnteredProcessPart = new TaskCompletionSource(); + var part1CanExitProcessPart = new TaskCompletionSource(); + + var mockDataHandler = new Mock(); + + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns(async (partNum, response, ct) => + { + if (partNum == 1) + { + part1EnteredProcessPart.SetResult(true); + await part1CanExitProcessPart.Task; + } + }); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + 3, partSize, totalObjectSize, "test-etag", usePartStrategy: false); // RANGE strategy + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + partSize: partSize, + downloadType: MultipartDownloadType.RANGE); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration( + concurrentRequests: concurrentRequests); + + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpSemaphore); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // Act + var startTask = coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + await part1EnteredProcessPart.Task; + + // Check semaphore state while Part 1 is in ProcessPartAsync + var semaphoreAvailableDuringProcessing = httpSemaphore.CurrentCount > 0; + + part1CanExitProcessPart.SetResult(true); + await startTask; + await coordinator.DownloadCompletionTask; + + // Assert + Assert.IsFalse(semaphoreAvailableDuringProcessing, + "RANGE strategy should also hold HTTP semaphore through ProcessPartAsync"); + + // Cleanup + httpSemaphore.Dispose(); + } + + #endregion + #region ContentRange and Part Range Calculation Tests [TestMethod] From dcad1dc7addc1ca5638e03074f957e1a2c8cd910 Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Tue, 9 Dec 2025 14:21:22 -0500 Subject: [PATCH 2/3] handle edge cases and add unit tets --- .../Transfer/Internal/FilePartDataHandler.cs | 2 +- .../Internal/MultipartDownloadManager.cs | 6 +- .../Custom/MultipartDownloadManagerTests.cs | 383 ++++++++++++++++++ 3 files changed, 387 insertions(+), 4 deletions(-) diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs index 26410c04bed7..4d7415a4a8f5 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/FilePartDataHandler.cs @@ -214,7 +214,7 @@ private async Task WritePartToFileAsync( // Seek to the correct offset for this part fileStream.Seek(offset, SeekOrigin.Begin); - _logger.DebugFormat("FilePartDataHandler: [Part {0} Writing {1} bytes to file at offset {2}", partNumber, + _logger.DebugFormat("FilePartDataHandler: [Part {0}] Writing {1} bytes to file at offset {2}", partNumber, response.ContentLength, offset); // Use GetObjectResponse's stream copy logic which includes: diff --git a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs index ed5be5a08f1c..010243c8c7bd 100644 --- a/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs +++ b/sdk/src/Services/S3/Custom/Transfer/Internal/MultipartDownloadManager.cs @@ -308,9 +308,6 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E try { - // Prepare the data handler (e.g., create temp files for file-based downloads) - await _dataHandler.PrepareAsync(discoveryResult, cancellationToken).ConfigureAwait(false); - // Create delegate once and reuse for all parts var wrappedCallback = progressCallback != null ? new EventHandler(DownloadPartProgressEventCallback) @@ -318,6 +315,9 @@ public async Task StartDownloadsAsync(DownloadDiscoveryResult discoveryResult, E try { + // Prepare the data handler (e.g., create temp files for file-based downloads) + await _dataHandler.PrepareAsync(discoveryResult, cancellationToken).ConfigureAwait(false); + // Attach progress callback to Part 1's response if provided if (wrappedCallback != null) { diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs index b24eaeb31fe9..511119182915 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs @@ -2328,6 +2328,389 @@ public async Task HttpSemaphore_RangeStrategy_HeldThroughProcessPartAsync() #endregion + #region Semaphore Release Error Path Tests + + [TestMethod] + public async Task StartDownloadsAsync_PrepareAsyncFails_ReleasesHttpSemaphore() + { + // Arrange - Test critical bug: PrepareAsync fails but semaphore was acquired during discovery + var httpThrottler = new SemaphoreSlim(2, 2); + var initialCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + // WaitForCapacityAsync succeeds (buffer space available) + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // PrepareAsync fails BEFORE Part 1 processing + mockDataHandler + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated prepare failure")); + + var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( + 8 * 1024 * 1024, 2, 16 * 1024 * 1024, "test-etag"); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( + (req, ct) => Task.FromResult(mockResponse)); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // After discovery, semaphore should have 1 slot held (2 total - 1 used = 1 available) + Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, + "After discovery, semaphore should have 1 slot held"); + + // Act & Assert + try + { + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + Assert.Fail("Expected InvalidOperationException to be thrown"); + } + catch (InvalidOperationException ex) + { + Assert.AreEqual("Simulated prepare failure", ex.Message); + } + + // Assert - CRITICAL BUG: Semaphore should be released but currently ISN'T + // This test will FAIL until the bug is fixed + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be released when PrepareAsync fails (BUG: currently leaks!)"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task StartDownloadsAsync_Part1ProcessingFails_ReleasesHttpSemaphore() + { + // Arrange - Test that finally block correctly releases semaphore when Part 1 processing fails + var httpThrottler = new SemaphoreSlim(2, 2); + var initialCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + // WaitForCapacityAsync succeeds + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // PrepareAsync succeeds + mockDataHandler + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // ProcessPartAsync fails for Part 1 + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated Part 1 processing failure")); + + var mockResponse = MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( + 8 * 1024 * 1024, 2, 16 * 1024 * 1024, "test-etag"); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client( + (req, ct) => Task.FromResult(mockResponse)); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // After discovery, semaphore should have 1 slot held + Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, + "After discovery, semaphore should have 1 slot held"); + + // Act & Assert + try + { + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + Assert.Fail("Expected InvalidOperationException to be thrown"); + } + catch (InvalidOperationException ex) + { + Assert.AreEqual("Simulated Part 1 processing failure", ex.Message); + } + + // Assert - Finally block should release semaphore + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be released by finally block when Part 1 processing fails"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task Discovery_WaitForCapacityFails_DoesNotReleaseHttpSemaphore() + { + // Arrange - Test that semaphore is NOT released when it was never acquired + var httpThrottler = new SemaphoreSlim(2, 2); + var initialCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + // WaitForCapacityAsync fails BEFORE HTTP semaphore is acquired + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated capacity wait failure")); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3Client(); + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act & Assert + try + { + await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + Assert.Fail("Expected InvalidOperationException to be thrown"); + } + catch (InvalidOperationException ex) + { + Assert.AreEqual("Simulated capacity wait failure", ex.Message); + } + + // Assert - Semaphore should NOT be released (it was never acquired) + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should NOT be released when it was never acquired (failed before WaitAsync)"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task StartDownloadsAsync_BackgroundPartHttpFails_ReleasesHttpSemaphore() + { + // Arrange - Test that background part download failures properly release semaphore + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var httpThrottler = new SemaphoreSlim(2, 2); + var initialCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + // WaitForCapacityAsync succeeds for all parts + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // PrepareAsync succeeds + mockDataHandler + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // ProcessPartAsync succeeds for Part 1, but not called for Part 2 (HTTP fails first) + mockDataHandler + .Setup(x => x.ProcessPartAsync(1, It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // ReleaseCapacity is called on failure + mockDataHandler + .Setup(x => x.ReleaseCapacity()); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var callCount = 0; + var mockClient = new Mock(); + mockClient.Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) + .Returns(() => + { + callCount++; + if (callCount == 1) + { + // Discovery call succeeds + return Task.FromResult(MultipartDownloadTestHelpers.CreateMultipartFirstPartResponse( + partSize, totalParts, totalObjectSize, "test-etag")); + } + else + { + // Background part HTTP request fails + throw new InvalidOperationException("Simulated HTTP failure for background part"); + } + }); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // After discovery, semaphore should have 1 slot held (for Part 1) + Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, + "After discovery, semaphore should have 1 slot held"); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + // Wait for background task to fail + try + { + await coordinator.DownloadCompletionTask; + } + catch (InvalidOperationException) + { + // Expected failure from background task + } + + // Assert - Semaphore should be fully released (Part 1 released in StartDownloadsAsync, + // Parts 2 and 3 released in CreateDownloadTaskAsync catch blocks) + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be fully released after background part HTTP failure"); + + // Verify ReleaseCapacity was called twice (once for Part 2 that failed, once for Part 3 that got cancelled) + // With sequential capacity acquisition, Part 3 acquired capacity before Part 2's HTTP call failed + mockDataHandler.Verify(x => x.ReleaseCapacity(), Times.Exactly(2), + "ReleaseCapacity should be called for both Part 2 (failed) and Part 3 (cancelled after acquiring capacity)"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task StartDownloadsAsync_BackgroundPartProcessingFails_ReleasesHttpSemaphore() + { + // Arrange - Test that background part ProcessPartAsync failures properly release semaphore + var totalParts = 3; + var partSize = 8 * 1024 * 1024; + var totalObjectSize = totalParts * partSize; + + var httpThrottler = new SemaphoreSlim(2, 2); + var initialCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + // WaitForCapacityAsync succeeds for all parts + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // PrepareAsync succeeds + mockDataHandler + .Setup(x => x.PrepareAsync(It.IsAny(), It.IsAny())) + .Returns(Task.CompletedTask); + + // ProcessPartAsync succeeds for Part 1, fails for Part 2 + var processCallCount = 0; + mockDataHandler + .Setup(x => x.ProcessPartAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((partNum, response, ct) => + { + processCallCount++; + if (partNum == 1) + { + return Task.CompletedTask; // Part 1 succeeds + } + throw new InvalidOperationException($"Simulated processing failure for Part {partNum}"); + }); + + // ReleaseCapacity is called on failure + mockDataHandler + .Setup(x => x.ReleaseCapacity()); + + mockDataHandler + .Setup(x => x.OnDownloadComplete(It.IsAny())); + + var mockClient = MultipartDownloadTestHelpers.CreateMockS3ClientForMultipart( + totalParts, partSize, totalObjectSize, "test-etag", usePartStrategy: true); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest( + downloadType: MultipartDownloadType.PART); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(concurrentRequests: 1); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + var discoveryResult = await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + + // After discovery, semaphore should have 1 slot held + Assert.AreEqual(initialCount - 1, httpThrottler.CurrentCount, + "After discovery, semaphore should have 1 slot held"); + + // Act + await coordinator.StartDownloadsAsync(discoveryResult, null, CancellationToken.None); + + // Wait for background task to fail + try + { + await coordinator.DownloadCompletionTask; + } + catch (InvalidOperationException) + { + // Expected failure from background task + } + + // Assert - Semaphore should be fully released + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be fully released after background part processing failure"); + + // Verify ReleaseCapacity was called twice (once for Part 2 that failed, once for Part 3 that may have continued) + // With sequential capacity acquisition, Part 3 acquired capacity before Part 2's processing failed + mockDataHandler.Verify(x => x.ReleaseCapacity(), Times.Exactly(2), + "ReleaseCapacity should be called for both Part 2 (failed) and Part 3 (cancelled/failed after acquiring capacity)"); + + // Cleanup + httpThrottler.Dispose(); + } + + [TestMethod] + public async Task Discovery_HttpRequestAfterCapacityFails_ReleasesHttpSemaphore() + { + // Arrange - Test semaphore release when HTTP request fails after capacity is acquired + var httpThrottler = new SemaphoreSlim(2, 2); + var initialCount = httpThrottler.CurrentCount; + + var mockDataHandler = new Mock(); + + // WaitForCapacityAsync succeeds (capacity acquired) + mockDataHandler + .Setup(x => x.WaitForCapacityAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // HTTP request fails AFTER both capacity types are acquired + var mockClient = new Mock(); + mockClient + .Setup(x => x.GetObjectAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException("Simulated S3 failure after capacity acquired")); + + var request = MultipartDownloadTestHelpers.CreateOpenStreamRequest(); + var config = MultipartDownloadTestHelpers.CreateBufferedDownloadConfiguration(); + var coordinator = new MultipartDownloadManager( + mockClient.Object, request, config, mockDataHandler.Object, null, httpThrottler); + + // Act & Assert + try + { + await coordinator.DiscoverDownloadStrategyAsync(CancellationToken.None); + Assert.Fail("Expected InvalidOperationException to be thrown"); + } + catch (InvalidOperationException ex) + { + Assert.AreEqual("Simulated S3 failure after capacity acquired", ex.Message); + } + + // Assert - HTTP semaphore should be released by catch block in discovery + Assert.AreEqual(initialCount, httpThrottler.CurrentCount, + "HTTP semaphore should be released when HTTP request fails in discovery"); + + // Cleanup + httpThrottler.Dispose(); + } + + #endregion + #region ContentRange and Part Range Calculation Tests [TestMethod] From 0baca42e5a7dcc6639f7abb1d0de25acc25c76ad Mon Sep 17 00:00:00 2001 From: Garrett Beatty Date: Tue, 9 Dec 2025 14:37:46 -0500 Subject: [PATCH 3/3] update tests info --- .../UnitTests/Custom/MultipartDownloadManagerTests.cs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs index 511119182915..fbbd1a410975 100644 --- a/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs +++ b/sdk/test/Services/S3/UnitTests/Custom/MultipartDownloadManagerTests.cs @@ -2174,7 +2174,6 @@ public async Task Discovery_SinglePart_StillCallsCapacityCheck() public async Task HttpSemaphore_HeldThroughProcessPartAsync() { // Arrange - Test that HTTP semaphore is NOT released until ProcessPartAsync completes - // This is the core bug that commit daf985e fixed var totalParts = 2; var partSize = 8 * 1024 * 1024; var totalObjectSize = totalParts * partSize; @@ -2246,11 +2245,9 @@ public async Task HttpSemaphore_HeldThroughProcessPartAsync() await coordinator.DownloadCompletionTask; // Assert - This is the deterministic test of the fix - // Before fix (commit daf985e): semaphore was released after HTTP download but BEFORE ProcessPartAsync - // After fix: semaphore is held through the ENTIRE operation including ProcessPartAsync Assert.IsFalse(semaphoreAvailableDuringProcessing, "HTTP semaphore should NOT be released while ProcessPartAsync is executing. " + - "Before fix (daf985e): semaphore.CurrentCount would be > 0 (released early). " + + "Before fix semaphore.CurrentCount would be > 0 (released early). " + "After fix: semaphore.CurrentCount should be 0 (held through ProcessPartAsync)."); Assert.IsFalse(semaphoreWasReleasedDuringPart1, @@ -2333,7 +2330,7 @@ public async Task HttpSemaphore_RangeStrategy_HeldThroughProcessPartAsync() [TestMethod] public async Task StartDownloadsAsync_PrepareAsyncFails_ReleasesHttpSemaphore() { - // Arrange - Test critical bug: PrepareAsync fails but semaphore was acquired during discovery + // Arrange - PrepareAsync fails but semaphore was acquired during discovery var httpThrottler = new SemaphoreSlim(2, 2); var initialCount = httpThrottler.CurrentCount; @@ -2378,10 +2375,8 @@ public async Task StartDownloadsAsync_PrepareAsyncFails_ReleasesHttpSemaphore() Assert.AreEqual("Simulated prepare failure", ex.Message); } - // Assert - CRITICAL BUG: Semaphore should be released but currently ISN'T - // This test will FAIL until the bug is fixed Assert.AreEqual(initialCount, httpThrottler.CurrentCount, - "HTTP semaphore should be released when PrepareAsync fails (BUG: currently leaks!)"); + "HTTP semaphore should be released when PrepareAsync fails"); // Cleanup httpThrottler.Dispose();