From a54c8f6640b9bf2017769637c01ca27e406283d1 Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Wed, 31 Dec 2025 11:28:44 +0100 Subject: [PATCH 1/9] fix(file): optim prefered node Signed-off-by: Guillaume Chervet --- demo/deployment-slimfaas.yml | 2 +- .../ClusterFiles/ClusterFileAnnounceQueue.cs | 2 +- .../ClusterFiles/ClusterFileAnnounceWorker.cs | 2 +- src/SlimData/ClusterFiles/ClusterFileSync.cs | 20 ++++++++++++++++--- .../ClusterFiles/ClusterFileSyncChannel.cs | 14 ++++++++++++- src/SlimData/ClusterFiles/IClusterFileSync.cs | 1 + 6 files changed, 34 insertions(+), 7 deletions(-) diff --git a/demo/deployment-slimfaas.yml b/demo/deployment-slimfaas.yml index a19d6ad11..0aee8b222 100644 --- a/demo/deployment-slimfaas.yml +++ b/demo/deployment-slimfaas.yml @@ -79,7 +79,7 @@ spec: serviceAccountName: slimfaas containers: - name: slimfaas - image: axaguildev/slimfaas:latest + image: axaguildev/slimfaas:0.58.5 livenessProbe: httpGet: path: /health diff --git a/src/SlimData/ClusterFiles/ClusterFileAnnounceQueue.cs b/src/SlimData/ClusterFiles/ClusterFileAnnounceQueue.cs index cb08f3a69..d42c72924 100644 --- a/src/SlimData/ClusterFiles/ClusterFileAnnounceQueue.cs +++ b/src/SlimData/ClusterFiles/ClusterFileAnnounceQueue.cs @@ -3,7 +3,7 @@ namespace SlimData.ClusterFiles; -public sealed record AnnouncedFile(string Id, string Sha256Hex); +public sealed record AnnouncedFile(string Id, string Sha256Hex, string? PreferredNode); public sealed class ClusterFileAnnounceQueue { diff --git a/src/SlimData/ClusterFiles/ClusterFileAnnounceWorker.cs b/src/SlimData/ClusterFiles/ClusterFileAnnounceWorker.cs index 155bff0f1..15fcc8bb1 100644 --- a/src/SlimData/ClusterFiles/ClusterFileAnnounceWorker.cs +++ b/src/SlimData/ClusterFiles/ClusterFileAnnounceWorker.cs @@ -34,7 +34,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) continue; // Pull => récupère depuis un des nœuds - var res = await _sync.PullFileIfMissingAsync(a.Id, a.Sha256Hex, stoppingToken).ConfigureAwait(false); + var res = await _sync.PullFileIfMissingAsync(a.Id, a.Sha256Hex, a.PreferredNode, stoppingToken).ConfigureAwait(false); if (res.Stream is null) { _logger.LogWarning("Auto-pull failed. Id={Id} Sha={Sha}", a.Id, a.Sha256Hex); diff --git a/src/SlimData/ClusterFiles/ClusterFileSync.cs b/src/SlimData/ClusterFiles/ClusterFileSync.cs index ba80773dc..a0df767fa 100644 --- a/src/SlimData/ClusterFiles/ClusterFileSync.cs +++ b/src/SlimData/ClusterFiles/ClusterFileSync.cs @@ -79,16 +79,30 @@ public async Task BroadcastFilePutAsync( private const int RangeChunkSizeBytes = 10 * 1024 * 1024; // 10 MiB private static readonly TimeSpan PerChunkTimeout = TimeSpan.FromMinutes(2); - public async Task PullFileIfMissingAsync(string id, string sha256Hex, CancellationToken ct) + public async Task PullFileIfMissingAsync(string id, string sha256Hex, string? preferredNode, CancellationToken ct) { // Déjà présent localement if (await _repo.ExistsAsync(id, sha256Hex, ct).ConfigureAwait(false)) return new FilePullResult(await _repo.OpenReadAsync(id, ct).ConfigureAwait(false)); // Itérer sur tous les nœuds (remotes) pour trouver celui qui a le fichier - var candidates = _bus.Members.Where(m => m.IsRemote).ToArray(); - if (candidates.Length == 0) + var candidates = _bus.Members.Where(m => m.IsRemote).ToList(); + if (candidates.Count == 0) return new FilePullResult(null); + + // Reorder: preferred first (best-effort) + if (!string.IsNullOrWhiteSpace(preferredNode)) + { + var idx = candidates.FindIndex(m => + string.Equals(SafeNode(m), preferredNode, StringComparison.OrdinalIgnoreCase)); + + if (idx > 0) + { + var preferred = candidates[idx]; + candidates.RemoveAt(idx); + candidates.Insert(0, preferred); + } + } var http = _httpFactory.CreateClient("ClusterFilesTransfer"); diff --git a/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs b/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs index 8914aba17..def791677 100644 --- a/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs +++ b/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs @@ -1,4 +1,5 @@ using System.Net.Mime; +using DotNext.Net.Cluster; using DotNext.Net.Cluster.Messaging; namespace SlimData.ClusterFiles; @@ -11,7 +12,18 @@ public bool IsSupported(string messageName, bool oneWay) public Task ReceiveSignal(ISubscriber sender, IMessage signal, object? context, CancellationToken token) { if (FileSyncProtocol.TryParseAnnounceName(signal.Name, out var idEnc, out var sha, out _, out _, out _)) - announceQueue.TryEnqueue(new AnnouncedFile(Base64UrlCodec.Decode(idEnc), sha)); + { + var id = Base64UrlCodec.Decode(idEnc); + + string? preferredNode = null; + if (sender is IClusterMember cm) + preferredNode = cm.EndPoint.ToString(); + + Console.WriteLine("Announced file: Id={0} Sha={1} PreferredNode={2}", id, sha, preferredNode); + + announceQueue.TryEnqueue(new AnnouncedFile(id, sha, preferredNode)); + } + return Task.CompletedTask; } diff --git a/src/SlimData/ClusterFiles/IClusterFileSync.cs b/src/SlimData/ClusterFiles/IClusterFileSync.cs index f728d801e..3de2c31b7 100644 --- a/src/SlimData/ClusterFiles/IClusterFileSync.cs +++ b/src/SlimData/ClusterFiles/IClusterFileSync.cs @@ -18,5 +18,6 @@ Task BroadcastFilePutAsync( Task PullFileIfMissingAsync( string id, string sha256Hex, + string? preferredNode, CancellationToken ct); } \ No newline at end of file From da7e08cb675c855cef7b0616207d936f37086755 Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Wed, 31 Dec 2025 12:08:56 +0100 Subject: [PATCH 2/9] fix test Signed-off-by: Guillaume Chervet --- src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs | 4 +++- src/SlimFaas/Data/DataFileRoutes.cs | 4 ++-- .../ClusterFiles/ClusterFileAnnounceWorkerTests.cs | 6 +++--- tests/SlimData.Tests/ClusterFiles/ClusterFileSyncTests.cs | 6 +++--- .../ClusterFiles/ClusterFileSync_AnnounceTests.cs | 6 +++--- tests/SlimFaas.Tests/Data/DataFileRoutesTests.cs | 2 +- 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs b/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs index def791677..8350f7130 100644 --- a/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs +++ b/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs @@ -17,7 +17,9 @@ public Task ReceiveSignal(ISubscriber sender, IMessage signal, object? context, string? preferredNode = null; if (sender is IClusterMember cm) - preferredNode = cm.EndPoint.ToString(); + { + preferredNode = cm.EndPoint?.ToString() ?? null; + } Console.WriteLine("Announced file: Id={0} Sha={1} PreferredNode={2}", id, sha, preferredNode); diff --git a/src/SlimFaas/Data/DataFileRoutes.cs b/src/SlimFaas/Data/DataFileRoutes.cs index e83b06f93..b47af263b 100644 --- a/src/SlimFaas/Data/DataFileRoutes.cs +++ b/src/SlimFaas/Data/DataFileRoutes.cs @@ -120,7 +120,7 @@ public static async Task PostAsync( string? actualContentType = null; string? actualFileName = null; - var finalContentType = actualContentType ?? contentType ?? "application/octet-stream"; + var finalContentType = actualContentType ?? contentType; var finalFileName = actualFileName ?? fileName ?? elementId; // Persiste localement + calcule sha/len + announce-only cluster @@ -175,7 +175,7 @@ public static async Task GetAsync( return Results.Problem("Corrupted metadata", statusCode: 500); } - var pulled = await fileSync.PullFileIfMissingAsync(elementId, meta?.Sha256Hex ?? "", ct); + var pulled = await fileSync.PullFileIfMissingAsync(elementId, meta?.Sha256Hex ?? "", null, ct); if (pulled.Stream is null) return Results.NotFound(); diff --git a/tests/SlimData.Tests/ClusterFiles/ClusterFileAnnounceWorkerTests.cs b/tests/SlimData.Tests/ClusterFiles/ClusterFileAnnounceWorkerTests.cs index 799ffa34c..0e01ead58 100644 --- a/tests/SlimData.Tests/ClusterFiles/ClusterFileAnnounceWorkerTests.cs +++ b/tests/SlimData.Tests/ClusterFiles/ClusterFileAnnounceWorkerTests.cs @@ -25,14 +25,14 @@ public async Task Worker_pulls_when_missing_and_disposes_stream() var disposedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var stream = new TrackingStream(() => disposedTcs.TrySetResult()); - sync.Setup(s => s.PullFileIfMissingAsync(id, sha, It.IsAny())) + sync.Setup(s => s.PullFileIfMissingAsync(id, sha, null, It.IsAny())) .ReturnsAsync(new FilePullResult(stream)); var worker = new ClusterFileAnnounceWorker(queue, sync.Object, repo.Object, logger.Object); await worker.StartAsync(CancellationToken.None); - Assert.True(queue.TryEnqueue(new AnnouncedFile(id, sha))); + Assert.True(queue.TryEnqueue(new AnnouncedFile(id, sha, null))); // wait until stream disposed => implies pull happened and worker disposed it await disposedTcs.Task.WaitAsync(TimeSpan.FromSeconds(2)); @@ -63,7 +63,7 @@ public async Task Worker_does_not_pull_when_already_present() await worker.StartAsync(CancellationToken.None); - Assert.True(queue.TryEnqueue(new AnnouncedFile(id, sha))); + Assert.True(queue.TryEnqueue(new AnnouncedFile(id, sha, null))); // small delay to allow worker to process await Task.Delay(150); diff --git a/tests/SlimData.Tests/ClusterFiles/ClusterFileSyncTests.cs b/tests/SlimData.Tests/ClusterFiles/ClusterFileSyncTests.cs index 308a77d1e..7d3fa12fc 100644 --- a/tests/SlimData.Tests/ClusterFiles/ClusterFileSyncTests.cs +++ b/tests/SlimData.Tests/ClusterFiles/ClusterFileSyncTests.cs @@ -189,7 +189,7 @@ public async Task PullFileIfMissingAsync_returns_local_stream_when_already_prese var sut = new ClusterFileSync(bus.Object, repo.Object, queue, loggerMock.Object, httpFactory); - var pulled = await sut.PullFileIfMissingAsync("id1", "sha", CancellationToken.None); + var pulled = await sut.PullFileIfMissingAsync("id1", "sha", null, CancellationToken.None); Assert.NotNull(pulled.Stream); @@ -273,7 +273,7 @@ public async Task PullFileIfMissingAsync_downloads_from_first_remote_with_file_a var sut = new ClusterFileSync(bus.Object, repo.Object, queue, loggerMock.Object, httpFactory); - var pulled = await sut.PullFileIfMissingAsync("id1", sha, CancellationToken.None); + var pulled = await sut.PullFileIfMissingAsync("id1", sha, null, CancellationToken.None); Assert.NotNull(pulled.Stream); @@ -314,7 +314,7 @@ public async Task PullFileIfMissingAsync_returns_null_stream_if_no_member_has_fi var sut = new ClusterFileSync(bus.Object, repo.Object, queue, loggerMock.Object, httpFactory); - var pulled = await sut.PullFileIfMissingAsync("id1", "sha", CancellationToken.None); + var pulled = await sut.PullFileIfMissingAsync("id1", "sha", null, CancellationToken.None); Assert.Null(pulled.Stream); diff --git a/tests/SlimData.Tests/ClusterFiles/ClusterFileSync_AnnounceTests.cs b/tests/SlimData.Tests/ClusterFiles/ClusterFileSync_AnnounceTests.cs index 4e7af81dd..064433f9e 100644 --- a/tests/SlimData.Tests/ClusterFiles/ClusterFileSync_AnnounceTests.cs +++ b/tests/SlimData.Tests/ClusterFiles/ClusterFileSync_AnnounceTests.cs @@ -11,8 +11,8 @@ public sealed class ClusterFileSync_AnnounceTests [Fact] public async Task ReceiveSignal_enqueue_announce_for_background_pull() { - var repo = new Mock(MockBehavior.Strict); - var bus = new Mock(MockBehavior.Strict); + var repo = new Mock(); + var bus = new Mock(); var logger = new Mock>(); var queue = new ClusterFileAnnounceQueue(); @@ -42,7 +42,7 @@ public async Task ReceiveSignal_enqueue_announce_for_background_pull() var announceName = FileSyncProtocol.BuildAnnounceName(idEnc, sha, len, contentType, overwrite); var signal = new TextMessage("", announceName); - var sender = new Mock(MockBehavior.Strict); + var sender = new Mock(); await captured!.ReceiveSignal(sender.Object, signal, context: null, token: CancellationToken.None); diff --git a/tests/SlimFaas.Tests/Data/DataFileRoutesTests.cs b/tests/SlimFaas.Tests/Data/DataFileRoutesTests.cs index 916116ac1..c37b6dd92 100644 --- a/tests/SlimFaas.Tests/Data/DataFileRoutesTests.cs +++ b/tests/SlimFaas.Tests/Data/DataFileRoutesTests.cs @@ -233,7 +233,7 @@ public async Task Get_PullsIfMissing_AndReturnsFile() .ReturnsAsync(metaBytes); var payload = Encoding.UTF8.GetBytes("abcde"); - fileSync.Setup(s => s.PullFileIfMissingAsync("id1", "sha", It.IsAny())) + fileSync.Setup(s => s.PullFileIfMissingAsync("id1", "sha", null, It.IsAny())) .ReturnsAsync(new FilePullResult(new MemoryStream(payload))); var result = await DataFileRoutes.DataFileHandlers.GetAsync("id1", fileSync.Object, db.Object, CancellationToken.None); From 799df9bf669c523b2e7ecd22d9da8a56f09b814c Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Wed, 31 Dec 2025 12:43:06 +0100 Subject: [PATCH 3/9] test Signed-off-by: Guillaume Chervet --- demo/deployment-slimfaas.yml | 6 +++--- src/SlimFaas/Data/DataFileRoutes.cs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/demo/deployment-slimfaas.yml b/demo/deployment-slimfaas.yml index 0aee8b222..83c160047 100644 --- a/demo/deployment-slimfaas.yml +++ b/demo/deployment-slimfaas.yml @@ -79,7 +79,7 @@ spec: serviceAccountName: slimfaas containers: - name: slimfaas - image: axaguildev/slimfaas:0.58.5 + image: axaguildev/slimfaas:0.58.6-pr.215393 livenessProbe: httpGet: path: /health @@ -129,10 +129,10 @@ spec: mountPath: /database resources: limits: - memory: "840Mi" + memory: "120Mi" cpu: "400m" requests: - memory: "820Mi" + memory: "120Mi" cpu: "400m" ports: - containerPort: 5000 diff --git a/src/SlimFaas/Data/DataFileRoutes.cs b/src/SlimFaas/Data/DataFileRoutes.cs index b47af263b..8e5807990 100644 --- a/src/SlimFaas/Data/DataFileRoutes.cs +++ b/src/SlimFaas/Data/DataFileRoutes.cs @@ -120,7 +120,7 @@ public static async Task PostAsync( string? actualContentType = null; string? actualFileName = null; - var finalContentType = actualContentType ?? contentType; + var finalContentType = actualContentType ?? contentType ?? "application/octet-stream" ; var finalFileName = actualFileName ?? fileName ?? elementId; // Persiste localement + calcule sha/len + announce-only cluster From 25388e9770ca91a49d5240cdade688d1d1aed802 Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Wed, 31 Dec 2025 14:10:56 +0100 Subject: [PATCH 4/9] fix Signed-off-by: Guillaume Chervet --- demo/deployment-slimfaas.yml | 6 +++--- src/SlimData/ClusterFiles/ClusterFileSync.cs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/demo/deployment-slimfaas.yml b/demo/deployment-slimfaas.yml index 83c160047..5b791e417 100644 --- a/demo/deployment-slimfaas.yml +++ b/demo/deployment-slimfaas.yml @@ -79,7 +79,7 @@ spec: serviceAccountName: slimfaas containers: - name: slimfaas - image: axaguildev/slimfaas:0.58.6-pr.215393 + image: axaguildev/slimfaas:0.58.6-pr.215394 livenessProbe: httpGet: path: /health @@ -129,10 +129,10 @@ spec: mountPath: /database resources: limits: - memory: "120Mi" + memory: "180Mi" cpu: "400m" requests: - memory: "120Mi" + memory: "180Mi" cpu: "400m" ports: - containerPort: 5000 diff --git a/src/SlimData/ClusterFiles/ClusterFileSync.cs b/src/SlimData/ClusterFiles/ClusterFileSync.cs index a0df767fa..791325aa2 100644 --- a/src/SlimData/ClusterFiles/ClusterFileSync.cs +++ b/src/SlimData/ClusterFiles/ClusterFileSync.cs @@ -108,7 +108,7 @@ public async Task PullFileIfMissingAsync(string id, string sha25 foreach (var member in candidates) { - + Console.WriteLine("Trying node " + SafeNode(member)); var baseUri = RemoveLastPathSegment(SafeNode(member)); // /cluster/files/{id}?sha=... var fileUri = new Uri($"{baseUri}/cluster/files/{Uri.EscapeDataString(id)}?sha={Uri.EscapeDataString(sha256Hex)}"); From 612d9402078928cd0084b3ec1a88659bddaefe90 Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Wed, 31 Dec 2025 17:12:23 +0100 Subject: [PATCH 5/9] fix Signed-off-by: Guillaume Chervet --- demo/deployment-slimfaas.yml | 2 +- src/SlimData/ClusterFiles/ClusterFileSync.cs | 1 - src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs | 4 +--- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/demo/deployment-slimfaas.yml b/demo/deployment-slimfaas.yml index 5b791e417..770197011 100644 --- a/demo/deployment-slimfaas.yml +++ b/demo/deployment-slimfaas.yml @@ -79,7 +79,7 @@ spec: serviceAccountName: slimfaas containers: - name: slimfaas - image: axaguildev/slimfaas:0.58.6-pr.215394 + image: axaguildev/slimfaas:latest livenessProbe: httpGet: path: /health diff --git a/src/SlimData/ClusterFiles/ClusterFileSync.cs b/src/SlimData/ClusterFiles/ClusterFileSync.cs index 791325aa2..98fc3d279 100644 --- a/src/SlimData/ClusterFiles/ClusterFileSync.cs +++ b/src/SlimData/ClusterFiles/ClusterFileSync.cs @@ -108,7 +108,6 @@ public async Task PullFileIfMissingAsync(string id, string sha25 foreach (var member in candidates) { - Console.WriteLine("Trying node " + SafeNode(member)); var baseUri = RemoveLastPathSegment(SafeNode(member)); // /cluster/files/{id}?sha=... var fileUri = new Uri($"{baseUri}/cluster/files/{Uri.EscapeDataString(id)}?sha={Uri.EscapeDataString(sha256Hex)}"); diff --git a/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs b/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs index 8350f7130..561109d77 100644 --- a/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs +++ b/src/SlimData/ClusterFiles/ClusterFileSyncChannel.cs @@ -20,9 +20,7 @@ public Task ReceiveSignal(ISubscriber sender, IMessage signal, object? context, { preferredNode = cm.EndPoint?.ToString() ?? null; } - - Console.WriteLine("Announced file: Id={0} Sha={1} PreferredNode={2}", id, sha, preferredNode); - + announceQueue.TryEnqueue(new AnnouncedFile(id, sha, preferredNode)); } From bdeb326b4c2c5c479ca5fc318f7149207f546e39 Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Thu, 1 Jan 2026 14:26:47 +0100 Subject: [PATCH 6/9] feat(slimfaas): data set Signed-off-by: Guillaume Chervet --- src/SlimFaas/Program.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SlimFaas/Program.cs b/src/SlimFaas/Program.cs index b10ce9999..a0c7042f7 100644 --- a/src/SlimFaas/Program.cs +++ b/src/SlimFaas/Program.cs @@ -488,7 +488,7 @@ }); //app.MapDataHashsetRoutes(); -//app.MapDataSetRoutes(); +app.MapDataSetRoutes(); app.MapDataFileRoutes(); app.MapDebugRoutes(); From 456e12e4fa5a85d07a2d01375100d3005f4bfacf Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Mon, 5 Jan 2026 15:58:58 +0100 Subject: [PATCH 7/9] test Signed-off-by: Guillaume Chervet --- demo/deployment-functions.yml | 4 ++-- demo/deployment-slimfaas.yml | 2 +- src/SlimFaas/Data/DataSetRoutes.cs | 14 ++++++++------ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/demo/deployment-functions.yml b/demo/deployment-functions.yml index ef4b9d743..a78bab6f3 100644 --- a/demo/deployment-functions.yml +++ b/demo/deployment-functions.yml @@ -42,13 +42,13 @@ spec: "ScaleUp": { "StabilizationWindowSeconds": 0, "Policies": [ - { "Type": "Pods", "Value": 1, "PeriodSeconds": 30 } + { "Type": "Pods", "Value": 1, "PeriodSeconds": 10 } ] }, "ScaleDown": { "StabilizationWindowSeconds": 20, "Policies": [ - { "Type": "Pods", "Value": 1, "PeriodSeconds": 30 } + { "Type": "Pods", "Value": 1, "PeriodSeconds": 10 } ] } } diff --git a/demo/deployment-slimfaas.yml b/demo/deployment-slimfaas.yml index 770197011..86019efbb 100644 --- a/demo/deployment-slimfaas.yml +++ b/demo/deployment-slimfaas.yml @@ -79,7 +79,7 @@ spec: serviceAccountName: slimfaas containers: - name: slimfaas - image: axaguildev/slimfaas:latest + image: axaguildev/slimfaas:0.58.6-pr.215397 livenessProbe: httpGet: path: /health diff --git a/src/SlimFaas/Data/DataSetRoutes.cs b/src/SlimFaas/Data/DataSetRoutes.cs index ded7be3ba..12a8000c6 100644 --- a/src/SlimFaas/Data/DataSetRoutes.cs +++ b/src/SlimFaas/Data/DataSetRoutes.cs @@ -17,13 +17,15 @@ public static class DataSetRoutes private static string DataKey(string id) => $"{SetPrefix}{id}"; private static string TtlKey(string key) => key + SlimDataInterpreter.TimeToLivePostfix; - public static IEndpointRouteBuilder MapDataSetRoutes(this IEndpointRouteBuilder app) + public static IEndpointRouteBuilder MapDataSetRoutes(this IEndpointRouteBuilder endpoints) { - app.MapPost("/data/sets", Handlers.PostAsync); - app.MapGet("/data/sets/{id}", Handlers.GetAsync); - app.MapGet("/data/sets", Handlers.ListAsync); - app.MapDelete("/data/sets/{id}", Handlers.DeleteAsync); - return app; + var group = endpoints.MapGroup("/data/sets") + .AddEndpointFilter(); + group.MapPost("", Handlers.PostAsync); + group.MapGet("/{id}", Handlers.GetAsync); + group.MapGet("", Handlers.ListAsync); + group.MapDelete("/{id}", Handlers.DeleteAsync); + return endpoints; } From 44450a7a93c47a6faca195fda372431fbe38f297 Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Sun, 11 Jan 2026 15:33:32 +0100 Subject: [PATCH 8/9] test Signed-off-by: Guillaume Chervet --- README.md | 37 ++++-- demo/deployment-slimfaas.yml | 2 +- documentation/data-sets.md | 156 +++++++++++++++++++++++ documentation/home.md | 15 +-- src/SlimFaas/Database/SlimDataService.cs | 11 -- src/SlimFaas/appsettings.json | 2 +- 6 files changed, 191 insertions(+), 32 deletions(-) create mode 100644 documentation/data-sets.md diff --git a/README.md b/README.md index 0047471c6..12107e261 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,8 @@ It’s designed to be **fast**, **simple**, and **extremely slim** — with a ve - `0 → N` wake-up from **Kafka lag** via the companion **SlimFaas Kafka** service, - `N → M` scaling powered by PromQL, - internal metrics store, debug endpoints, and scale-to-zero out of the box. -- temporary **Data Binary** endpoints to ingest and stage files (from tiny to very large) with TTL-friendly storage — perfect for caching & agentic workflows. +- temporary **Data Files** endpoints to ingest and stage binaries (from tiny to very large) with TTL-friendly storage — perfect for caching & agentic workflows. +- temporary **Data Sets** endpoints (`/data/sets`) to store small, Redis-like KV payloads (cache, JSON state, flags) with optional TTL — replicated through the cluster via a robust consensus layer. > **Looking for MCP integration?** > Check out **[SlimFaas MCP](https://slimfaas.dev/mcp)** — the companion runtime that converts *any* OpenAPI definition into MCP-ready tools on the fly. @@ -102,9 +103,13 @@ It’s designed to be **fast**, **simple**, and **extremely slim** — with a ve - Synchronously send events to **every replica** of selected functions. - No additional event bus required — ideal for cluster-local fan-out, cache invalidation, configuration refresh, etc. -### 📁 Data & Files (real-time ingestion + ephemeral caching) +### 🧰 Data Files & Data Sets (real-time ingestion + ephemeral caching + robust KV) -SlimFaas includes **Data Files** endpoints to **stream, store, and serve temporary files** — from tiny payloads to *very large* binaries. +SlimFaas includes two complementary “data” APIs: + +#### 📁 Data Files (temporary binary artifacts) + +**Data Files** endpoints are designed to **stream, store, and serve temporary files** — from tiny payloads to *very large* binaries. Ideal for **agentic workflows** and **real-time ingestion**: upload once, get an `id`, then let tools/functions consume it when they’re ready. - Stream-first uploads (without buffering in memory or disk) @@ -112,6 +117,15 @@ Ideal for **agentic workflows** and **real-time ingestion**: upload once, get an - **Ephemeral caching** for intermediate artifacts - **TTL-based** lifecycle (auto-expiration) +#### 🧠 Data Sets (Redis-like KV for small state) + +**Data Sets** endpoints provide a **small, Redis-like KV store** (raw bytes) replicated across the SlimFaas cluster. + +- Stream-first uploads +- Store **anything small**: JSON, strings, flags, lightweight cache entries +- Optional `ttl` in **milliseconds** (auto-expiration) +- Hard limit: **1 MiB per value** + ### 🧠 “Mind Changer” (Status & Wake-up API) - Built-in REST APIs to: @@ -144,16 +158,17 @@ Check out: - [Get Started](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/get-started.md) – Learn how to deploy SlimFaas on Kubernetes or Docker Compose. - Scaling - - [Autoscaling](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/autoscaling.md) – Deep-dive into `0 → N` / `N → M` autoscaling, PromQL triggers, metrics scraping, and debug endpoints. - - [Kafka Connector](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/kafka.md) – Use Kafka topic lag to wake functions from `0 → N` and keep workers alive while messages are still flowing. - - [Planet Saver](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/planet-saver.md) – See how to start and monitor replicas from a JavaScript frontend. + - [Autoscaling](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/autoscaling.md) – Deep-dive into `0 → N` / `N → M` autoscaling, PromQL triggers, metrics scraping, and debug endpoints. + - [Kafka Connector](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/kafka.md) – Use Kafka topic lag to wake functions from `0 → N` and keep workers alive while messages are still flowing. + - [Planet Saver](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/planet-saver.md) – See how to start and monitor replicas from a JavaScript frontend. - Functions & Workloads - - [Functions](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/functions.md) – See how to call functions synchronously or asynchronously. - - [Events](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/events.md) – Explore how to use internal synchronous publish/subscribe events. - - [Jobs](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/jobs.md) – Learn how to define and run one-off jobs. - - [OpenTelemetry](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/opentelemetry.md) – Enable distributed tracing, metrics, and logs with OpenTelemetry integration. + - [Functions](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/functions.md) – See how to call functions synchronously or asynchronously. + - [Events](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/events.md) – Explore how to use internal synchronous publish/subscribe events. + - [Jobs](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/jobs.md) – Learn how to define and run one-off jobs. + - [OpenTelemetry](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/opentelemetry.md) – Enable distributed tracing, metrics, and logs with OpenTelemetry integration. - Data & Files - - [Data Files](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/data-files.md) - Understand how to ingest, store, and serve temporary binary artifacts. + - [Data Files](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/data-files.md) - Understand how to ingest, store, and serve temporary binary artifacts. + - [Data Sets](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/data-sets.md) - Store small, Redis-like KV payloads (cache, JSON state, flags) replicated across the cluster, with optional TTL. - [How It Works](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/how-it-works.md) – Dive into SlimFaas’s architecture and design. - [MCP](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/mcp.md) – Discover how to convert *any* OpenAPI definition into MCP-ready tools on the fly. diff --git a/demo/deployment-slimfaas.yml b/demo/deployment-slimfaas.yml index 86019efbb..770197011 100644 --- a/demo/deployment-slimfaas.yml +++ b/demo/deployment-slimfaas.yml @@ -79,7 +79,7 @@ spec: serviceAccountName: slimfaas containers: - name: slimfaas - image: axaguildev/slimfaas:0.58.6-pr.215397 + image: axaguildev/slimfaas:latest livenessProbe: httpGet: path: /health diff --git a/documentation/data-sets.md b/documentation/data-sets.md new file mode 100644 index 000000000..52f3df268 --- /dev/null +++ b/documentation/data-sets.md @@ -0,0 +1,156 @@ +# Data Sets API + +SlimFaas provides a **Redis-like, cluster-consistent key/value store** through the `/data/sets` endpoints. + +Use it to store **small values** (cache entries, JSON state, flags, checkpoints) with a robust replication protocol (SlimData + Raft). + +- **Max payload size:** 1 MiB per entry +- **TTL unit:** milliseconds (`ttl` query parameter) + +> Not for large file storage. For large binaries (PDF, ZIP, audio, PPTX, ...), use **Data Files**: https://slimfaas.dev/data-files + +--- + +## How it works (high-level) + +```mermaid +flowchart LR + C[Client] -->|HTTP| API[SlimFaas /data/sets] + API -->|SetAsync / GetAsync / DeleteAsync| DB[SlimData] + DB -->|Raft replication| N1[(Node A)] + DB -->|Raft replication| N2[(Node B)] + DB -->|Raft replication| N3[(Node C)] + DB --> TTL[TTL metadata] + TTL --> EXP[Auto-expire] +``` + +--- + +## Endpoints + +Base path: `/data/sets` + +| Method | Path | Purpose | +|---|---|---| +| `POST` | `/data/sets?id={id?}&ttl={ttl_ms?}` | Create or overwrite a value | +| `GET` | `/data/sets/{id}` | Read a value | +| `GET` | `/data/sets` | List entries (IDs + expiration) | +| `DELETE` | `/data/sets/{id}` | Delete a value | + +### IDs + +- IDs are validated server-side (`IdValidator.IsSafeId`). +- If `id` is omitted (or empty), SlimFaas generates one (`Guid.NewGuid().ToString("N")`) and returns it. + +### TTL (milliseconds) + +- `ttl` is optional and expressed in **milliseconds**. +- When provided, the entry auto-expires after the TTL. + +Examples: +- `ttl=60000` → 1 minute +- `ttl=600000` → 10 minutes + +--- + +## Create / overwrite + +`POST /data/sets?id={id?}&ttl={ttl_ms?}` + +- Body is stored as raw bytes. +- Payload larger than 1 MiB returns **413 Payload Too Large**. + +Examples: + +Store JSON with a fixed id: +```bash +curl -X POST "http:///data/sets?id=my-usecase/session-123/state" \ + -H "Content-Type: application/json" \ + --data-binary '{"step":"route","chosen":"kb_rag","confidence":0.92}' +``` + +Store a string: +```bash +curl -X POST "http:///data/sets?id=my-usecase/session-123/flag" \ + -H "Content-Type: text/plain" \ + --data-binary "ready" +``` + +Let SlimFaas generate the id: +```bash +ID=$(curl -s -X POST "http:///data/sets" --data-binary "hello") +echo "created id=$ID" +``` + +Store with TTL (10 minutes = 600000 ms): +```bash +curl -X POST "http:///data/sets?id=my-usecase/session-123/state&ttl=600000" \ + --data-binary "temporary" +``` + +--- + +## Read + +`GET /data/sets/{id}` + +- Returns raw bytes as `application/octet-stream`. +- `404 Not Found` when missing or expired. + +Examples: +```bash +curl -L "http:///data/sets/my-usecase/session-123/state" -o value.bin +``` + +If you stored JSON: +```bash +curl -s "http:///data/sets/my-usecase/session-123/state" | jq . +``` + +--- + +## List + +`GET /data/sets` + +Returns a JSON array: +```json +[ + { "id": "abc", "expireAtUtcTicks": -1 }, + { "id": "xyz", "expireAtUtcTicks": 638720123456789012 } +] +``` + +- `expireAtUtcTicks` is UTC DateTime ticks (100 ns units). +- `-1` means “no expiration”. + +--- + +## Delete + +`DELETE /data/sets/{id}` + +- Returns **204 No Content**. + +Example: +```bash +curl -X DELETE "http:///data/sets/my-usecase/session-123/state" +``` + +--- + +## Visibility & security + +`/data/sets` is protected by the same data visibility policy used by other data endpoints (`DataVisibilityEndpointFilter`). + +`appsettings.json`: +```json +{ + "Data": { + "DefaultVisibility": "Private" + } +} +``` + +Env override: +- `Data:DefaultVisibility` → `Data__DefaultVisibility` diff --git a/documentation/home.md b/documentation/home.md index d42297895..dd83563ad 100644 --- a/documentation/home.md +++ b/documentation/home.md @@ -19,7 +19,8 @@ It’s designed to be **fast**, **simple**, and **extremely slim** — with a ve - `0 → N` wake-up from **Kafka lag** via the companion **SlimFaas Kafka** service, - `N → M` scaling powered by PromQL, - internal metrics store, debug endpoints, and scale-to-zero out of the box. -- temporary **Data Binary** endpoints to ingest and stage files (from tiny to very large) with TTL-friendly storage — perfect for caching & agentic workflows. +- temporary **Data Sets** endpoints (Redis-like KV) to store small blobs/JSON with **TTL (milliseconds)** — perfect for cache & agentic state. +- temporary **Data Files** endpoints to ingest and stage files (from tiny to very large) with TTL-friendly storage — perfect for caching & agentic workflows. > **Looking for MCP integration?** > Check out **[SlimFaas MCP](https://slimfaas.dev/mcp)** — the companion runtime that converts *any* OpenAPI definition into MCP-ready tools on the fly. @@ -93,15 +94,12 @@ SlimFaas puts autoscaling at the center of the design: - Synchronously broadcast events to **every replica** of selected functions. - No external event bus required — perfect for simple fan-out, cache invalidation, or configuration refresh scenarios. -### 📁 Data & Files (real-time ingestion + ephemeral caching) +### 🧺 Data Sets & 📁 Data Files (ephemeral state + real-time ingestion) -SlimFaas includes **Data Files** endpoints to **stream, store, and serve temporary files** — from tiny payloads to *very large* binaries. -Ideal for **agentic workflows** and **real-time ingestion**: upload once, get an `id`, then let tools/functions consume it when they’re ready. +SlimFaas includes two complementary data APIs: -- Stream-first uploads (without buffering in memory or disk) -- **Agentic-ready** attachments & multi-step flows -- **Ephemeral caching** for intermediate artifacts -- **TTL-based** lifecycle (auto-expiration) +- **Data Sets** (`/data/sets`): a Redis-like, cluster-consistent **KV store** for small payloads (cache, JSON state, flags, checkpoints) with optional **TTL in milliseconds** and a **1 MiB** payload limit. +- **Data Files** (`/data/files`): stream-first endpoints to ingest, store, and serve temporary files — from tiny payloads to *very large* binaries — ideal for **agentic workflows** and **real-time ingestion** (upload once, get an `id`, then let tools/functions consume it when they’re ready). ### 🧠 “Mind Changer” (Status & Wake-up API) @@ -143,6 +141,7 @@ Dive into the documentation: - [Jobs](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/jobs.md) – Learn how to define and run one-off jobs. - [OpenTelemetry](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/opentelemetry.md) – Enable distributed tracing, metrics, and logs with OpenTelemetry integration. - Data & Files + - [Data Sets](https://slimfaas.dev/data-sets) - Store small blobs/JSON in a Redis-like KV store with TTL (milliseconds). - [Data Files](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/data-files.md) - Understand how to ingest, store, and serve temporary binary artifacts. - [How It Works](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/how-it-works.md) – Dive into SlimFaas’s architecture and design. - [MCP](https://github.com/SlimPlanet/SlimFaas/blob/main/documentation/mcp.md) – Discover how to convert *any* OpenAPI definition into MCP-ready tools on the fly. diff --git a/src/SlimFaas/Database/SlimDataService.cs b/src/SlimFaas/Database/SlimDataService.cs index f06e10185..e2e646cc2 100644 --- a/src/SlimFaas/Database/SlimDataService.cs +++ b/src/SlimFaas/Database/SlimDataService.cs @@ -42,17 +42,6 @@ public SlimDataService( maxWaitPerTick: TimeSpan.FromSeconds(5) ); - var tiersLlp = new[] - { - new RateTier(20, TimeSpan.FromMilliseconds(250)), - new RateTier(300, TimeSpan.FromMilliseconds(500)), - }; - var tiersLcb = new[] - { - new RateTier(50, TimeSpan.FromMilliseconds(150)), - new RateTier(500, TimeSpan.FromMilliseconds(400)), - }; - _batcher.RegisterKind( kind: "llp", batchHandler: BatchHandlerAsync, diff --git a/src/SlimFaas/appsettings.json b/src/SlimFaas/appsettings.json index fbd44a815..d0d7f7eb9 100644 --- a/src/SlimFaas/appsettings.json +++ b/src/SlimFaas/appsettings.json @@ -28,7 +28,7 @@ "Endpoint": "", "ServiceName": "", "EnableConsoleExporter": false, - "ExcludedUrls": ["/health", "/metrics", "/ready", "/SlimData", "/status-functions", "/cluster-consensus"] + "ExcludedUrls": ["/health", "/metrics", "/ready", "/SlimData", "/status-functions", "/cluster-consensus", "/wake-function"] }, "Data": { "DefaultVisibility": "Private" From 6a90e307ca058afbfa000703d4daca0900a4e153 Mon Sep 17 00:00:00 2001 From: Guillaume Chervet Date: Sun, 11 Jan 2026 18:29:56 +0100 Subject: [PATCH 9/9] update doc Signed-off-by: Guillaume Chervet --- src/SlimFaasSite/src/components/NavBar.tsx | 65 ++++++++++++++++++---- src/SlimFaasSite/src/pages/data-sets.tsx | 33 +++++++++++ 2 files changed, 87 insertions(+), 11 deletions(-) create mode 100644 src/SlimFaasSite/src/pages/data-sets.tsx diff --git a/src/SlimFaasSite/src/components/NavBar.tsx b/src/SlimFaasSite/src/components/NavBar.tsx index 018fe3232..8c2f6ae3d 100644 --- a/src/SlimFaasSite/src/components/NavBar.tsx +++ b/src/SlimFaasSite/src/components/NavBar.tsx @@ -6,23 +6,33 @@ const Navbar: React.FC = () => { const [isMenuOpen, setIsMenuOpen] = useState(false); const [isScaleOpen, setIsScaleOpen] = useState(false); const [isFunctionsOpen, setIsFunctionsOpen] = useState(false); + const [isDataOpen, setIsDataOpen] = useState(false); const toggleMenu = () => setIsMenuOpen((prev) => !prev); const toggleScale = () => { setIsScaleOpen((prev) => !prev); setIsFunctionsOpen(false); + setIsDataOpen(false); }; const toggleFunctions = () => { setIsFunctionsOpen((prev) => !prev); setIsScaleOpen(false); + setIsDataOpen(false); + }; + + const toggleData = () => { + setIsDataOpen((prev) => !prev); + setIsScaleOpen(false); + setIsFunctionsOpen(false); }; const closeAll = () => { setIsMenuOpen(false); setIsScaleOpen(false); setIsFunctionsOpen(false); + setIsDataOpen(false); }; return ( @@ -190,21 +200,54 @@ const Navbar: React.FC = () => { -
  • - + + +
  • +
  • - + How it works
  • diff --git a/src/SlimFaasSite/src/pages/data-sets.tsx b/src/SlimFaasSite/src/pages/data-sets.tsx new file mode 100644 index 000000000..b6beedef0 --- /dev/null +++ b/src/SlimFaasSite/src/pages/data-sets.tsx @@ -0,0 +1,33 @@ +import React from 'react'; +import Layout from '../components/Layout'; +import { GetStaticProps } from 'next'; +import { fetchMarkdownFile, MarkdownData, MarkdownMetadata } from '../lib/github'; +import { renderMarkdownWithHighlight } from '@/lib/markdown'; + +export interface DocPageProps { + contentHtml: string; + metadata: MarkdownMetadata; +} + + +export const getStaticProps: GetStaticProps = async () => { + const markdownFile: MarkdownData = await fetchMarkdownFile(`documentation/data-sets.md`); + const contentHtml = await renderMarkdownWithHighlight(markdownFile.contentHtml); + + return { + props: { + contentHtml: contentHtml, + metadata: markdownFile.metadata || {}, + }, + }; +}; + +const PlanetSaver = ({ contentHtml }: DocPageProps) => ( + +
    +
    +
    + +); + +export default PlanetSaver;