Skip to content

Commit bf2618a

Browse files
authored
Ensure Streaming Checkpoint Cancellation on Timeout (#1385)
* cancel streaming checkpoint if timeout occurred * propagate cancellation token to stateMachineDriver when initiating a checkpoint
1 parent 3ac77e8 commit bf2618a

File tree

3 files changed

+48
-30
lines changed

3 files changed

+48
-30
lines changed

libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSyncManager.cs

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -280,47 +280,60 @@ async Task TakeStreamingCheckpoint()
280280

281281
// Iterate through main store
282282
var mainStoreCheckpointTask = ClusterProvider.storeWrapper.store.
283-
TakeFullCheckpointAsync(CheckpointType.StreamingSnapshot, streamingSnapshotIteratorFunctions: manager.mainStoreSnapshotIterator);
283+
TakeFullCheckpointAsync(CheckpointType.StreamingSnapshot, cancellationToken: cts.Token, streamingSnapshotIteratorFunctions: manager.mainStoreSnapshotIterator);
284284

285285
var result = await WaitOrDie(checkpointTask: mainStoreCheckpointTask, iteratorManager: manager);
286286
if (!result.success)
287-
throw new InvalidOperationException("Main store checkpoint stream failed!");
287+
throw new GarnetException("Main store checkpoint stream failed!");
288288

289289
if (!ClusterProvider.serverOptions.DisableObjects)
290290
{
291291
// Iterate through object store
292292
var objectStoreCheckpointTask = ClusterProvider.storeWrapper.objectStore.
293-
TakeFullCheckpointAsync(CheckpointType.StreamingSnapshot, streamingSnapshotIteratorFunctions: manager.objectStoreSnapshotIterator);
293+
TakeFullCheckpointAsync(CheckpointType.StreamingSnapshot, cancellationToken: cts.Token, streamingSnapshotIteratorFunctions: manager.objectStoreSnapshotIterator);
294294
result = await WaitOrDie(checkpointTask: objectStoreCheckpointTask, iteratorManager: manager);
295295
if (!result.success)
296-
throw new InvalidOperationException("Object store checkpoint stream failed!");
296+
throw new GarnetException("Object store checkpoint stream failed!");
297297
}
298298

299299
// Note: We do not truncate the AOF here as this was just a "virtual" checkpoint
300-
300+
// WaitOrDie is needed here to check if streaming checkpoint is making progress.
301+
// We cannot use a timeout on the cancellationToken because we don't know in total how long the streaming checkpoint will take
301302
async ValueTask<(bool success, Guid token)> WaitOrDie(ValueTask<(bool success, Guid token)> checkpointTask, SnapshotIteratorManager iteratorManager)
302303
{
303-
var timeout = replicaSyncTimeout;
304-
var delay = TimeSpan.FromSeconds(1);
305-
while (true)
304+
try
306305
{
307-
// Check if cancellation requested
308-
cts.Token.ThrowIfCancellationRequested();
306+
var timeout = replicaSyncTimeout;
307+
var delay = TimeSpan.FromSeconds(1);
308+
while (true)
309+
{
310+
// Check if cancellation requested
311+
cts.Token.ThrowIfCancellationRequested();
309312

310-
// Wait for stream sync to make some progress
311-
await Task.Delay(delay);
313+
// Wait for stream sync to make some progress
314+
await Task.Delay(delay);
312315

313-
// Check if checkpoint has completed
314-
if (checkpointTask.IsCompleted)
315-
return await checkpointTask;
316+
// Check if checkpoint has completed
317+
if (checkpointTask.IsCompleted)
318+
return await checkpointTask;
316319

317-
// Check if we made some progress
318-
timeout = !manager.IsProgressing() ? timeout.Subtract(delay) : replicaSyncTimeout;
320+
// Check if we made some progress
321+
timeout = !manager.IsProgressing() ? timeout.Subtract(delay) : replicaSyncTimeout;
319322

320-
// Throw timeout equals to zero
321-
if (timeout.TotalSeconds <= 0)
322-
throw new TimeoutException("Streaming snapshot checkpoint timed out");
323+
// Throw timeout equals to zero
324+
if (timeout.TotalSeconds <= 0)
325+
{
326+
cts.Cancel();
327+
throw new TimeoutException("Streaming snapshot checkpoint timed out");
328+
}
329+
}
330+
}
331+
catch (Exception ex)
332+
{
333+
logger?.LogError(ex, "{method} faulted", nameof(WaitOrDie));
334+
cts.Cancel();
323335
}
336+
return (false, default);
324337
}
325338
}
326339
}

libs/storage/Tsavorite/cs/src/core/Index/Checkpointing/StateMachineDriver.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,11 @@ async Task RunStateMachine(CancellationToken token = default)
299299
_ = Interlocked.Exchange(ref stateMachine, null);
300300
if (ex != null)
301301
{
302-
_ = _stateMachineCompleted.TrySetException(ex);
302+
// If the state machine stopped due to cancellation, propagate cancellation to the completion TCS
303+
if (ex is OperationCanceledException || ex is TaskCanceledException)
304+
_ = _stateMachineCompleted.TrySetCanceled();
305+
else
306+
_ = _stateMachineCompleted.TrySetException(ex);
303307
}
304308
else
305309
{

libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Tsavorite.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,12 +188,13 @@ public TsavoriteKV(KVSettings<TKey, TValue> kvSettings, TStoreFunctions storeFun
188188
/// <param name="token">Checkpoint token</param>
189189
/// <param name="checkpointType">Checkpoint type</param>
190190
/// <param name="streamingSnapshotIteratorFunctions">Iterator for streaming snapshot records</param>
191+
/// <param name="cancellationToken">Caller's cancellation token</param>
191192
/// <returns>
192193
/// Whether we successfully initiated the checkpoint (initiation may
193194
/// fail if we are already taking a checkpoint or performing some other
194195
/// operation such as growing the index). Use CompleteCheckpointAsync to wait completion.
195196
/// </returns>
196-
public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, IStreamingSnapshotIteratorFunctions<TKey, TValue> streamingSnapshotIteratorFunctions = null)
197+
public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointType, IStreamingSnapshotIteratorFunctions<TKey, TValue> streamingSnapshotIteratorFunctions = null, CancellationToken cancellationToken = default)
197198
{
198199
IStateMachine stateMachine;
199200

@@ -208,7 +209,7 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
208209
{
209210
stateMachine = Checkpoint.Full(this, checkpointType, out token);
210211
}
211-
return stateMachineDriver.Register(stateMachine);
212+
return stateMachineDriver.Register(stateMachine, cancellationToken);
212213
}
213214

214215
/// <summary>
@@ -228,7 +229,7 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
228229
public async ValueTask<(bool success, Guid token)> TakeFullCheckpointAsync(CheckpointType checkpointType,
229230
CancellationToken cancellationToken = default, IStreamingSnapshotIteratorFunctions<TKey, TValue> streamingSnapshotIteratorFunctions = null)
230231
{
231-
var success = TryInitiateFullCheckpoint(out Guid token, checkpointType, streamingSnapshotIteratorFunctions);
232+
var success = TryInitiateFullCheckpoint(out Guid token, checkpointType, streamingSnapshotIteratorFunctions, cancellationToken);
232233

233234
if (success)
234235
await CompleteCheckpointAsync(cancellationToken).ConfigureAwait(false);
@@ -241,10 +242,10 @@ public bool TryInitiateFullCheckpoint(out Guid token, CheckpointType checkpointT
241242
/// </summary>
242243
/// <param name="token">Checkpoint token</param>
243244
/// <returns>Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.</returns>
244-
public bool TryInitiateIndexCheckpoint(out Guid token)
245+
public bool TryInitiateIndexCheckpoint(out Guid token, CancellationToken cancellationToken = default)
245246
{
246247
var stateMachine = Checkpoint.IndexOnly(this, out token);
247-
return stateMachineDriver.Register(stateMachine);
248+
return stateMachineDriver.Register(stateMachine, cancellationToken);
248249
}
249250

250251
/// <summary>
@@ -261,7 +262,7 @@ public bool TryInitiateIndexCheckpoint(out Guid token)
261262
/// </returns>
262263
public async ValueTask<(bool success, Guid token)> TakeIndexCheckpointAsync(CancellationToken cancellationToken = default)
263264
{
264-
var success = TryInitiateIndexCheckpoint(out Guid token);
265+
var success = TryInitiateIndexCheckpoint(out Guid token, cancellationToken);
265266

266267
if (success)
267268
await CompleteCheckpointAsync(cancellationToken).ConfigureAwait(false);
@@ -277,7 +278,7 @@ public bool TryInitiateIndexCheckpoint(out Guid token)
277278
/// <param name="tryIncremental">For snapshot, try to store as incremental delta over last snapshot</param>
278279
/// <returns>Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.</returns>
279280
public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, bool tryIncremental = false,
280-
IStreamingSnapshotIteratorFunctions<TKey, TValue> streamingSnapshotIteratorFunctions = null)
281+
IStreamingSnapshotIteratorFunctions<TKey, TValue> streamingSnapshotIteratorFunctions = null, CancellationToken cancellationToken = default)
281282
{
282283
IStateMachine stateMachine;
283284

@@ -305,7 +306,7 @@ public bool TryInitiateHybridLogCheckpoint(out Guid token, CheckpointType checkp
305306
stateMachine = Checkpoint.HybridLogOnly(this, checkpointType, out token);
306307
}
307308
}
308-
return stateMachineDriver.Register(stateMachine);
309+
return stateMachineDriver.Register(stateMachine, cancellationToken);
309310
}
310311

311312
/// <summary>
@@ -340,7 +341,7 @@ public bool CanTakeIncrementalCheckpoint(CheckpointType checkpointType, out Guid
340341
public async ValueTask<(bool success, Guid token)> TakeHybridLogCheckpointAsync(CheckpointType checkpointType,
341342
bool tryIncremental = false, CancellationToken cancellationToken = default)
342343
{
343-
var success = TryInitiateHybridLogCheckpoint(out Guid token, checkpointType, tryIncremental);
344+
var success = TryInitiateHybridLogCheckpoint(out Guid token, checkpointType, tryIncremental, cancellationToken: cancellationToken);
344345

345346
if (success)
346347
await CompleteCheckpointAsync(cancellationToken).ConfigureAwait(false);

0 commit comments

Comments
 (0)