Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a6c98e8
wip
TalZaccai Sep 16, 2025
4f1f4b5
wip
TalZaccai Sep 16, 2025
62e6624
wip
TalZaccai Sep 16, 2025
2c709f1
Added unified store session
TalZaccai Sep 17, 2025
671e708
Correcting generic typing
TalZaccai Sep 18, 2025
93dfa72
Merge branch 'talzacc/storage-v2' of https://github.com/microsoft/gar…
TalZaccai Sep 18, 2025
38035ab
Added MEMORY USAGE + TYPE to unified ops
TalZaccai Sep 18, 2025
222963e
Added TTL, EXPIRETIME and EXISTS to unified ops
TalZaccai Sep 18, 2025
d677e6a
implemented DEL in unified ops
TalZaccai Sep 19, 2025
a446bac
wip - expire & persist (broken)
TalZaccai Sep 19, 2025
92cab87
wip - adding expire to unified ops
TalZaccai Sep 22, 2025
6dbeecc
wip - expire
TalZaccai Sep 22, 2025
36bf62f
add cref to server-side replication inter-node commands
vazois Sep 23, 2025
5ed6204
fix server-side BeginRecoverReplica
vazois Sep 23, 2025
9a05c22
Add link to new 'Unsafe code best practices' -article (#1386)
PaulusParssinen Sep 23, 2025
61c206f
[Bugfix] AOF Replay Double Replaying Finalize (#1372)
hamdaankhalid Sep 23, 2025
3ac77e8
Refactoring TxnKeyManager to use Key Specifications (#1381)
TalZaccai Sep 23, 2025
8a547bb
wip
TalZaccai Sep 23, 2025
048f66a
Merge branch 'talzacc/storage-v2' of https://github.com/microsoft/gar…
TalZaccai Sep 23, 2025
bf2618a
Ensure Streaming Checkpoint Cancellation on Timeout (#1385)
vazois Sep 24, 2025
d2d6944
Resetting replication recovery when cluster reset is issued (#1319)
Xizt Sep 24, 2025
6d78395
merging from latest main
TalZaccai Sep 24, 2025
3773b26
Fix transaction key locking
TalZaccai Sep 24, 2025
feec549
format
TalZaccai Sep 24, 2025
0db1967
Some test fixes
TalZaccai Sep 24, 2025
360c7e6
Fixing tests
TalZaccai Sep 25, 2025
b68e538
reverting a couple of unnecessary changes
TalZaccai Sep 25, 2025
485dd03
Eliminating more multi-context methods from API
TalZaccai Sep 26, 2025
f3708a5
Removed some unnecessary stuff
TalZaccai Sep 26, 2025
6a3acd0
Some more cleanup to TransactionManager
TalZaccai Sep 29, 2025
c925e3f
Merge branch 'talzacc/storage-v2' of https://github.com/microsoft/gar…
TalZaccai Sep 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Version.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<!-- VersionPrefix property for builds and packages -->
<PropertyGroup>
<VersionPrefix>1.0.83</VersionPrefix>
<VersionPrefix>1.0.84</VersionPrefix>
</PropertyGroup>
</Project>
8 changes: 4 additions & 4 deletions benchmark/BDN.benchmark/Custom/CustomTxnSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public override bool Prepare<TGarnetReadApi>(TGarnetReadApi api, ref CustomProce
valueC = GetNextArg(ref procInput, ref offset);
valueD = GetNextArg(ref procInput, ref offset);

AddKey(setA, LockType.Exclusive, isObject: false);
AddKey(setB, LockType.Exclusive, isObject: false);
AddKey(setC, LockType.Exclusive, isObject: false);
AddKey(setD, LockType.Exclusive, isObject: false);
AddKey(setA, LockType.Exclusive, StoreType.Main);
AddKey(setB, LockType.Exclusive, StoreType.Main);
AddKey(setC, LockType.Exclusive, StoreType.Main);
AddKey(setD, LockType.Exclusive, StoreType.Main);

return true;
}
Expand Down
69 changes: 0 additions & 69 deletions libs/client/ClientSession/GarnetClientSessionClusterExtensions.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public sealed unsafe partial class GarnetClientSession : IServerHook, IMessageCo
/// <param name="aofBeginAddress"></param>
/// <param name="aofTailAddress"></param>
/// <returns></returns>
/// <seealso cref="T:Garnet.cluster.ClusterSession.NetworkClusterInitiateReplicaSync"/>
public Task<string> ExecuteReplicaSync(string nodeId, string primary_replid, byte[] checkpointEntryData, long aofBeginAddress, long aofTailAddress)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -112,6 +113,7 @@ public Task<string> ExecuteReplicaSync(string nodeId, string primary_replid, byt
/// <param name="fileTokenBytes"></param>
/// <param name="fileType"></param>
/// <param name="data"></param>
/// <seealso cref="T:Garnet.cluster.ClusterSession.NetworkClusterSendCheckpointMetadata"/>
public Task<string> ExecuteSendCkptMetadata(Memory<byte> fileTokenBytes, int fileType, Memory<byte> data)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -178,6 +180,7 @@ public Task<string> ExecuteSendCkptMetadata(Memory<byte> fileTokenBytes, int fil
/// <param name="startAddress"></param>
/// <param name="data"></param>
/// <param name="segmentId"></param>
/// <seealso cref="T:Garnet.cluster.ClusterSession.NetworkClusterSendCheckpointFileSegment"/>
public Task<string> ExecuteSendFileSegments(Memory<byte> fileTokenBytes, int fileType, long startAddress, Span<byte> data, int segmentId = -1)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -257,19 +260,19 @@ public Task<string> ExecuteSendFileSegments(Memory<byte> fileTokenBytes, int fil
/// Signal replica to recover
/// </summary>
/// <param name="sendStoreCheckpoint"></param>
/// <param name="sendObjectStoreCheckpoint"></param>
/// <param name="replayAOF"></param>
/// <param name="primary_replid"></param>
/// <param name="checkpointEntryData"></param>
/// <param name="beginAddress"></param>
/// <param name="tailAddress"></param>
/// <returns></returns>
public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool sendObjectStoreCheckpoint, bool replayAOF, string primary_replid, byte[] checkpointEntryData, long beginAddress, long tailAddress)
/// <seealso cref="T:Garnet.cluster.ClusterSession.NetworkClusterBeginReplicaRecover"/>
public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool replayAOF, string primary_replid, byte[] checkpointEntryData, long beginAddress, long tailAddress)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
tcsQueue.Enqueue(tcs);
byte* curr = offset;
int arraySize = 9;
int arraySize = 8;

while (!RespWriteUtils.TryWriteArrayLength(arraySize, ref curr, end))
{
Expand Down Expand Up @@ -303,46 +306,38 @@ public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se
offset = curr;

//4
while (!RespWriteUtils.TryWriteBulkString(sendObjectStoreCheckpoint ? "1"u8 : "0"u8, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//5
while (!RespWriteUtils.TryWriteBulkString(replayAOF ? "1"u8 : "0"u8, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//6
//5
while (!RespWriteUtils.TryWriteAsciiBulkString(primary_replid, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//7
//6
while (!RespWriteUtils.TryWriteBulkString(checkpointEntryData, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//8
//7
while (!RespWriteUtils.TryWriteArrayItem(beginAddress, ref curr, end))
{
Flush();
curr = offset;
}
offset = curr;

//9
//8
while (!RespWriteUtils.TryWriteArrayItem(tailAddress, ref curr, end))
{
Flush();
Expand All @@ -360,6 +355,7 @@ public Task<string> ExecuteBeginReplicaRecover(bool sendStoreCheckpoint, bool se
/// </summary>
/// <param name="syncMetadata"></param>
/// <returns></returns>
/// <seealso cref="T:Garnet.cluster.ClusterSession.NetworkClusterAttachSync"/>
public Task<string> ExecuteAttachSync(byte[] syncMetadata)
{
var tcs = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);
Expand Down Expand Up @@ -408,6 +404,7 @@ public Task<string> ExecuteAttachSync(byte[] syncMetadata)
/// </summary>
/// <param name="sourceNodeId"></param>
/// <param name="isMainStore"></param>
/// <seealso cref="T:Garnet.cluster.ClusterSession.NetworkClusterSync"/>
public void SetClusterSyncHeader(string sourceNodeId, bool isMainStore)
{
// Unlike Migration, where we don't know at the time of header initialization if we have a record or not, in Replication
Expand Down
11 changes: 7 additions & 4 deletions libs/cluster/Server/ClusterManagerSlotState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>>>,
/* MainStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>,
BasicContext<ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>,
BasicContext<UnifiedStoreInput, GarnetUnifiedStoreOutput, long, UnifiedSessionFunctions,
/* UnifiedStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>>;

/// <summary>
Expand Down Expand Up @@ -482,7 +485,7 @@ public static unsafe void DeleteKeysInSlotsFromMainStore(BasicGarnetApi BasicGar
var key = iter.Key;
var s = HashSlotUtils.HashSlot(key);
if (slots.Contains(s))
_ = BasicGarnetApi.DELETE(PinnedSpanByte.FromPinnedSpan(key), StoreType.Main);
_ = BasicGarnetApi.DELETE(PinnedSpanByte.FromPinnedSpan(key));
}
}

Expand All @@ -499,7 +502,7 @@ public static unsafe void DeleteKeysInSlotsFromObjectStore(BasicGarnetApi BasicG
var key = iterObject.Key;
var s = HashSlotUtils.HashSlot(key);
if (slots.Contains(s))
_ = BasicGarnetApi.DELETE(PinnedSpanByte.FromPinnedSpan(key), StoreType.Object);
_ = BasicGarnetApi.DELETE(PinnedSpanByte.FromPinnedSpan(key));
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions libs/cluster/Server/ClusterManagerWorkerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
try
{
SuspendConfigMerge();

// Reset recovery operations before proceeding with reset
clusterProvider.replicationManager.ResetRecovery();

var resp = CmdStrings.RESP_OK;
while (true)
{
Expand All @@ -113,8 +117,9 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
this.clusterConnectionStore.CloseAll();

var newNodeId = soft ? current.LocalNodeId : Generator.CreateHexId();
var address = current.LocalNodeIp;
var port = current.LocalNodePort;
var endpoint = clusterProvider.storeWrapper.GetClusterEndpoint();
var address = endpoint.Address.ToString();
var port = endpoint.Port;

var configEpoch = soft ? current.LocalNodeConfigEpoch : 0;
var expiry = DateTimeOffset.UtcNow.Ticks + TimeSpan.FromSeconds(expirySeconds).Ticks;
Expand Down
25 changes: 7 additions & 18 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
namespace Garnet.cluster
{
using BasicGarnetApi = GarnetApi<BasicContext<RawStringInput, SpanByteAndMemory, long, MainSessionFunctions,
/* MainStoreFunctions */ StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>,
SpanByteAllocator<StoreFunctions<SpanByteComparer, SpanByteRecordDisposer>>>,
/* MainStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>,
BasicContext<ObjectInput, GarnetObjectStoreOutput, long, ObjectSessionFunctions,
/* ObjectStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>,
BasicContext<UnifiedStoreInput, GarnetUnifiedStoreOutput, long, UnifiedSessionFunctions,
/* UnifiedStoreFunctions */ StoreFunctions<SpanByteComparer, DefaultRecordDisposer>,
ObjectAllocator<StoreFunctions<SpanByteComparer, DefaultRecordDisposer>>>>;

/// <summary>
Expand Down Expand Up @@ -157,7 +160,6 @@ public void FlushConfig()
public void FlushDB(bool unsafeTruncateLog = false)
{
storeWrapper.store.Log.ShiftBeginAddress(storeWrapper.store.Log.TailAddress, truncateLog: unsafeTruncateLog);
storeWrapper.objectStore?.Log.ShiftBeginAddress(storeWrapper.objectStore.Log.TailAddress, truncateLog: unsafeTruncateLog);
}

/// <inheritdoc />
Expand All @@ -171,12 +173,6 @@ public void SafeTruncateAOF(bool full, long CheckpointCoveredAofAddress, Guid st
entry.metadata.storeCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.storePrimaryReplId = replicationManager.PrimaryReplId;

entry.metadata.objectStoreVersion = serverOptions.DisableObjects ? -1 : storeWrapper.objectStore.CurrentVersion;
entry.metadata.objectStoreHlogToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectStoreIndexToken = serverOptions.DisableObjects ? default : objectStoreCheckpointToken;
entry.metadata.objectCheckpointCoveredAofAddress = CheckpointCoveredAofAddress;
entry.metadata.objectStorePrimaryReplId = replicationManager.PrimaryReplId;

// Keep track of checkpoints for replica
// Used to delete old checkpoints and cleanup and also cleanup during attachment to new primary
replicationManager.AddCheckpointEntry(entry, full);
Expand Down Expand Up @@ -241,8 +237,6 @@ public MetricsItem[] GetReplicationInfo()
new("second_repl_offset", replication_offset2),
new("store_current_safe_aof_address", clusterEnabled ? replicationManager.StoreCurrentSafeAofAddress.ToString() : "N/A"),
new("store_recovered_safe_aof_address", clusterEnabled ? replicationManager.StoreRecoveredSafeAofTailAddress.ToString() : "N/A"),
new("object_store_current_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreCurrentSafeAofAddress.ToString() : "N/A"),
new("object_store_recovered_safe_aof_address", clusterEnabled && !serverOptions.DisableObjects ? replicationManager.ObjectStoreRecoveredSafeAofTailAddress.ToString() : "N/A"),
new("recover_status", replicationManager.currentRecoveryStatus.ToString()),
new("last_failover_state", !clusterEnabled ? FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER) : failoverManager.GetLastFailoverStatus())
};
Expand Down Expand Up @@ -427,15 +421,10 @@ public void ExtractKeySpecs(RespCommandsInfo commandInfo, RespCommand cmd, ref S
public void ClusterPublish(RespCommand cmd, ref Span<byte> channel, ref Span<byte> message)
=> clusterManager.TryClusterPublish(cmd, ref channel, ref message);

internal GarnetClusterCheckpointManager GetReplicationLogCheckpointManager(StoreType storeType)
internal GarnetClusterCheckpointManager GetReplicationLogCheckpointManager()
{
Debug.Assert(serverOptions.EnableCluster);
return storeType switch
{
StoreType.Main => (GarnetClusterCheckpointManager)storeWrapper.store.CheckpointManager,
StoreType.Object => (GarnetClusterCheckpointManager)storeWrapper.objectStore?.CheckpointManager,
_ => throw new Exception($"GetCkptManager: unexpected state {storeType}")
};
return (GarnetClusterCheckpointManager)storeWrapper.store.CheckpointManager;
}

/// <summary>
Expand Down
7 changes: 2 additions & 5 deletions libs/cluster/Server/Migration/MigrateSessionKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,8 @@ public bool MigrateKeys()
return false;

// Migrate object store keys
if (!clusterProvider.serverOptions.DisableObjects)
{
if (!MigrateKeysFromObjectStore())
return false;
}
if (!MigrateKeysFromObjectStore())
return false;
}
catch (Exception ex)
{
Expand Down
11 changes: 0 additions & 11 deletions libs/cluster/Server/Migration/MigrateSessionSlots.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,6 @@ public async Task<bool> MigrateSlotsDriverInline()
var success = await CreateAndRunMigrateTasks(StoreType.Main, storeBeginAddress, storeTailAddress, mainStorePageSize);
if (!success) return false;

// Send object store
if (!clusterProvider.serverOptions.DisableObjects)
{
var objectStoreBeginAddress = clusterProvider.storeWrapper.objectStore.Log.BeginAddress;
var objectStoreTailAddress = clusterProvider.storeWrapper.objectStore.Log.TailAddress;
var objectStorePageSize = 1 << clusterProvider.serverOptions.ObjectStorePageSizeBits();
logger?.LogWarning("Object Store migrate scan range [{objectStoreBeginAddress}, {objectStoreTailAddress}]", objectStoreBeginAddress, objectStoreTailAddress);
success = await CreateAndRunMigrateTasks(StoreType.Object, objectStoreBeginAddress, objectStoreTailAddress, objectStorePageSize);
if (!success) return false;
}

return true;

async Task<bool> CreateAndRunMigrateTasks(StoreType storeType, long beginAddress, long tailAddress, int pageSize)
Expand Down
Loading
Loading