Skip to content

Commit 6392986

Browse files
authored
Improves shutdown and cleanup logic for producers and consumers (#440)
This PR improves shutdown and cleanup logic for producers and consumers, focusing on graceful termination and better error handling during close operations. - Adds cancellation token checks to prevent processing loops from continuing after shutdown - Ensures channel writers are properly completed to unblock waiting consumers - Demotes timeout error logs to debug level to reduce log noise during normal shutdown scenarios - Consolidates ignore logic for cleaner handling of already-closed connections --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent b2ab24a commit 6392986

File tree

4 files changed

+35
-12
lines changed

4 files changed

+35
-12
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,10 @@ protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool igno
9393
}
9494

9595
UpdateStatusToClosed();
96-
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
97-
98-
if (_client is { IsClosed: true })
99-
{
100-
return result;
101-
}
96+
// we can ignore if the ignoreIfAlreadyDeleted or the socket is already closed
97+
// DeleteEntityFromTheServer must be called anyway because it cleans the internal lists
98+
var ignore = ignoreIfAlreadyDeleted || _client.IsClosed;
99+
var result = await DeleteEntityFromTheServer(ignore).ConfigureAwait(false);
102100

103101
var closed = await _client.MaybeClose($"closing: {EntityId}", config.Pool)
104102
.ConfigureAwait(false);

RabbitMQ.Stream.Client/Client.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,7 @@ private async Task<CloseResponse> Close(string reason, string closedStatus)
772772
}
773773
catch (TimeoutException)
774774
{
775-
_logger.LogError(
775+
_logger.LogDebug(
776776
"Timeout while closing the connection. The connection will be closed anyway");
777777
}
778778
catch (Exception e)

RabbitMQ.Stream.Client/RawConsumer.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ private void ProcessChunks()
457457
{
458458
// need to wait the subscription is completed
459459
// else the _subscriberId could be incorrect
460-
_completeSubscription.Task.Wait();
460+
await _completeSubscription.Task.ConfigureAwait(false);
461461

462462
try
463463
{
@@ -555,6 +555,18 @@ await _client.Credit(EntityId, 1)
555555
"Error while process chunks the stream: {EntityInfo} The ProcessChunks task will be closed",
556556
DumpEntityConfiguration());
557557
}
558+
finally
559+
{
560+
// best-effort: mark the writer complete so no producers stay blocked
561+
try
562+
{
563+
_chunksBuffer.Writer.TryComplete();
564+
}
565+
catch
566+
{
567+
// ignored
568+
}
569+
}
558570
}, Token);
559571
}
560572

@@ -764,7 +776,7 @@ protected override async Task<ResponseCode> DeleteEntityFromTheServer(bool ignor
764776

765777
catch (TimeoutException)
766778
{
767-
Logger.LogError(
779+
Logger.LogDebug(
768780
"Timeout removing the consumer id: {SubscriberId}, {EntityInfo} from the server. " +
769781
"The consumer will be closed anyway",
770782
EntityId, DumpEntityConfiguration());
@@ -785,7 +797,7 @@ public override async Task<ResponseCode> Close()
785797
// when the consumer is closed we must be sure that the
786798
// subscription is completed to avoid problems with the connection
787799
// It could happen when the closing is called just after the creation
788-
_completeSubscription.Task.Wait();
800+
await _completeSubscription.Task.ConfigureAwait(false);
789801
return await Shutdown(_config).ConfigureAwait(false);
790802
}
791803

RabbitMQ.Stream.Client/RawProducer.cs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ private async Task ProcessBuffer()
346346
try
347347
{
348348
var messages = new List<(ulong, Message)>(_config.MessagesBufferSize);
349-
while (await _messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false))
349+
while (await _messageBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false))
350350
{
351351
while (_messageBuffer.Reader.TryRead(out var msg))
352352
{
@@ -371,6 +371,16 @@ private async Task ProcessBuffer()
371371

372372
public override async Task<ResponseCode> Close()
373373
{
374+
// Complete the channel writer to signal ProcessBuffer to finish
375+
try
376+
{
377+
_messageBuffer.Writer.TryComplete();
378+
}
379+
catch
380+
{
381+
// ignored
382+
}
383+
374384
return await Shutdown(_config).ConfigureAwait(false);
375385
}
376386

@@ -382,8 +392,11 @@ protected override async Task<ResponseCode> DeleteEntityFromTheServer(bool ignor
382392
// in this case we reduce the waiting time
383393
// the producer could be removed because of stream deleted
384394
// so it is not necessary to wait.
395+
// we can ignore if the ignoreIfAlreadyDeleted or the socket is already closed
396+
// DeletePublisher must be called anyway because it cleans the internal lists
397+
var ignore = ignoreIfAlreadyDeleted || _client.IsClosed;
385398
var closeResponse =
386-
await _client.DeletePublisher(EntityId, ignoreIfAlreadyDeleted).ConfigureAwait(false);
399+
await _client.DeletePublisher(EntityId, ignore).ConfigureAwait(false);
387400
return closeResponse.ResponseCode;
388401
}
389402
catch (Exception e)

0 commit comments

Comments
 (0)