Skip to content

Commit 4baf558

Browse files
author
davidwei
committed
Handle Redis timeout in RedisSyncQueueService and improve SyncQueueProcessor logging
1 parent afb7640 commit 4baf558

File tree

2 files changed

+71
-49
lines changed

2 files changed

+71
-49
lines changed

src/HappyNotes.Services/SyncQueue/Services/RedisSyncQueueService.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,11 @@ private async Task ProcessDelayedTasks(string service)
346346
new RedisValue[] { now, batchLimit }
347347
);
348348
}
349+
catch (RedisTimeoutException ex)
350+
{
351+
_logger.LogWarning(ex, "Redis timeout during delayed tasks processing for service {Service} - skipping this cycle", service);
352+
return; // Skip this cycle, will retry on next dequeue attempt
353+
}
349354
catch (StackExchange.Redis.RedisServerException ex) when (ex.Message.Contains("WRONGTYPE"))
350355
{
351356
_logger.LogWarning("Delayed queue {DelayedKey} has wrong type, recreating as sorted set", delayedKey);

src/HappyNotes.Services/SyncQueue/Services/SyncQueueProcessor.cs

Lines changed: 66 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -77,71 +77,88 @@ private async Task ProcessServiceQueue(string serviceName, CancellationToken can
7777
_logger.LogInformation("Started processing queue for service: {ServiceName}", serviceName);
7878

7979
using var semaphore = new SemaphoreSlim(_options.Processing.MaxConcurrentTasks);
80+
var exitReason = "normal";
8081

81-
while (!cancellationToken.IsCancellationRequested)
82+
try
8283
{
83-
try
84+
while (!cancellationToken.IsCancellationRequested)
8485
{
85-
await semaphore.WaitAsync(cancellationToken);
86-
87-
var task = await _queueService.DequeueAsync<object>(serviceName, cancellationToken);
88-
89-
if (task == null)
86+
try
9087
{
91-
semaphore.Release();
92-
await Task.Delay(_options.Processing.PollingInterval, cancellationToken);
93-
continue;
94-
}
88+
await semaphore.WaitAsync(cancellationToken);
9589

96-
// Process task in background with fresh scope per task
97-
_ = Task.Run(async () =>
98-
{
99-
using var taskScope = _serviceProvider.CreateScope();
100-
try
90+
var task = await _queueService.DequeueAsync<object>(serviceName, cancellationToken);
91+
92+
if (task == null)
10193
{
102-
// Resolve handler fresh for each task (with clean scoped dependencies)
103-
var handler = taskScope.ServiceProvider.GetServices<ISyncHandler>()
104-
.FirstOrDefault(h => h.ServiceName == serviceName);
94+
semaphore.Release();
95+
await Task.Delay(_options.Processing.PollingInterval, cancellationToken);
96+
continue;
97+
}
10598

106-
if (handler == null)
99+
// Process task in background with fresh scope per task
100+
_ = Task.Run(async () =>
101+
{
102+
using var taskScope = _serviceProvider.CreateScope();
103+
try
107104
{
108-
_logger.LogError("No handler found for service {ServiceName}", serviceName);
109-
return;
105+
// Resolve handler fresh for each task (with clean scoped dependencies)
106+
var handler = taskScope.ServiceProvider.GetServices<ISyncHandler>()
107+
.FirstOrDefault(h => h.ServiceName == serviceName);
108+
109+
if (handler == null)
110+
{
111+
_logger.LogError("No handler found for service {ServiceName}", serviceName);
112+
return;
113+
}
114+
115+
var syncTask = new SyncTask
116+
{
117+
Id = task.Id,
118+
Service = task.Service,
119+
Action = task.Action,
120+
EntityId = task.EntityId,
121+
UserId = task.UserId,
122+
Payload = task.Payload,
123+
AttemptCount = task.AttemptCount,
124+
CreatedAt = task.CreatedAt,
125+
ScheduledFor = task.ScheduledFor,
126+
Metadata = task.Metadata
127+
};
128+
await ProcessTask(handler, syncTask, cancellationToken);
110129
}
111-
112-
var syncTask = new SyncTask
130+
finally
113131
{
114-
Id = task.Id,
115-
Service = task.Service,
116-
Action = task.Action,
117-
EntityId = task.EntityId,
118-
UserId = task.UserId,
119-
Payload = task.Payload,
120-
AttemptCount = task.AttemptCount,
121-
CreatedAt = task.CreatedAt,
122-
ScheduledFor = task.ScheduledFor,
123-
Metadata = task.Metadata
124-
};
125-
await ProcessTask(handler, syncTask, cancellationToken);
126-
}
127-
finally
128-
{
129-
semaphore.Release();
130-
}
131-
}, cancellationToken);
132+
semaphore.Release();
133+
}
134+
}, cancellationToken);
135+
}
136+
catch (OperationCanceledException)
137+
{
138+
exitReason = "cancelled";
139+
break;
140+
}
141+
catch (Exception ex)
142+
{
143+
semaphore.Release(); // Release semaphore to prevent deadlock
144+
_logger.LogError(ex, "Error in queue processing loop for service {ServiceName}", serviceName);
145+
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
146+
}
132147
}
133-
catch (OperationCanceledException)
148+
}
149+
finally
150+
{
151+
if (exitReason == "cancelled")
134152
{
135-
break;
153+
_logger.LogWarning("Processing queue for service {ServiceName} was cancelled at {StopTime}",
154+
serviceName, DateTime.UtcNow);
136155
}
137-
catch (Exception ex)
156+
else
138157
{
139-
_logger.LogError(ex, "Error in queue processing loop for service {ServiceName}", serviceName);
140-
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
158+
_logger.LogInformation("Processing queue for service {ServiceName} stopped normally at {StopTime}",
159+
serviceName, DateTime.UtcNow);
141160
}
142161
}
143-
144-
_logger.LogInformation("Stopped processing queue for service: {ServiceName}", serviceName);
145162
}
146163

147164
private async Task ProcessTask(ISyncHandler handler, SyncTask task, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)