From f3f62a89e4907ff12b3d62578faeadddb1f68d9d Mon Sep 17 00:00:00 2001 From: Edgett Hilimire Date: Tue, 18 Jun 2024 15:56:22 -0400 Subject: [PATCH] Aspect based locking to prevent parallel model operations. Spelling fixes. --- .../Components/Chat/ChatMessageList.razor | 2 +- .../Client/Services/ChatService.cs | 8 ++-- ...ChatCancelation.cs => ChatCancellation.cs} | 4 +- .../Server/PalmHill.BlazorChat.Server.csproj | 1 + .../Server/SerialExecutionAspect.cs | 44 +++++++++++++++++++ .../Server/SerialExecutionAttribute.cs | 18 ++++++++ .../Server/SignalR/WebSocketChat.cs | 6 ++- .../Server/WebApi/ApiChatController.cs | 9 ++-- .../Server/WebApi/AttachmentController.cs | 1 + 9 files changed, 80 insertions(+), 13 deletions(-) rename PalmHill.BlazorChat/Server/{ChatCancelation.cs => ChatCancellation.cs} (50%) create mode 100644 PalmHill.BlazorChat/Server/SerialExecutionAspect.cs create mode 100644 PalmHill.BlazorChat/Server/SerialExecutionAttribute.cs diff --git a/PalmHill.BlazorChat/Client/Components/Chat/ChatMessageList.razor b/PalmHill.BlazorChat/Client/Components/Chat/ChatMessageList.razor index 1e2643a..028caab 100644 --- a/PalmHill.BlazorChat/Client/Components/Chat/ChatMessageList.razor +++ b/PalmHill.BlazorChat/Client/Components/Chat/ChatMessageList.razor @@ -7,7 +7,7 @@
@* Display each prompt in a card. Display each response in a card. *@ - @foreach (var promptWithResponse in Controller!.WebsocketChatMessages) + @foreach (var promptWithResponse in Controller!.WebSocketChatMessages) { diff --git a/PalmHill.BlazorChat/Client/Services/ChatService.cs b/PalmHill.BlazorChat/Client/Services/ChatService.cs index f8bd750..d87b4f7 100644 --- a/PalmHill.BlazorChat/Client/Services/ChatService.cs +++ b/PalmHill.BlazorChat/Client/Services/ChatService.cs @@ -65,7 +65,7 @@ ILogger logger /// /// The list of chat messages. Containing a prompt and its response. /// - public List WebsocketChatMessages { get; private set; } = new List(); + public List WebSocketChatMessages { get; private set; } = new List(); /// /// The WebSocketChatService that handles the WebSocket connection. @@ -142,7 +142,7 @@ public async Task SendToWebSocketChat() var prompt = new WebSocketChatMessage(); prompt.ConversationId = ConversationId; prompt.Prompt = UserInput; - WebsocketChatMessages.Add(prompt); + WebSocketChatMessages.Add(prompt); UserInput = string.Empty; StateHasChanged(); await SendInferenceRequest(); @@ -160,7 +160,7 @@ public async Task AskDocumentApi() var prompt = new WebSocketChatMessage(); prompt.Prompt = UserInput; prompt.ConversationId = ConversationId; - WebsocketChatMessages.Add(prompt); + WebSocketChatMessages.Add(prompt); UserInput = string.Empty; StateHasChanged(); @@ -258,7 +258,7 @@ private void setupWebSocketChatConnection() WebSocketChatConnection = new WebSocketChatService( ConversationId, _navigationManager.ToAbsoluteUri("/chathub?customUserId=user1"), - WebsocketChatMessages, + WebSocketChatMessages, _localStorageService ); diff --git a/PalmHill.BlazorChat/Server/ChatCancelation.cs b/PalmHill.BlazorChat/Server/ChatCancellation.cs similarity index 50% rename from PalmHill.BlazorChat/Server/ChatCancelation.cs rename to PalmHill.BlazorChat/Server/ChatCancellation.cs index 98ab4fd..6c25f57 100644 --- a/PalmHill.BlazorChat/Server/ChatCancelation.cs +++ b/PalmHill.BlazorChat/Server/ChatCancellation.cs @@ -2,8 +2,8 @@ namespace PalmHill.BlazorChat.Server { - public static class ChatCancelation + public static class ChatCancellation { - public static ConcurrentDictionary CancelationTokens { get; private set; } = new ConcurrentDictionary(); + public static ConcurrentDictionary CancellationTokens { get; private set; } = new ConcurrentDictionary(); } } diff --git a/PalmHill.BlazorChat/Server/PalmHill.BlazorChat.Server.csproj b/PalmHill.BlazorChat/Server/PalmHill.BlazorChat.Server.csproj index 6781839..b719597 100644 --- a/PalmHill.BlazorChat/Server/PalmHill.BlazorChat.Server.csproj +++ b/PalmHill.BlazorChat/Server/PalmHill.BlazorChat.Server.csproj @@ -7,6 +7,7 @@ + diff --git a/PalmHill.BlazorChat/Server/SerialExecutionAspect.cs b/PalmHill.BlazorChat/Server/SerialExecutionAspect.cs new file mode 100644 index 0000000..c48fa5a --- /dev/null +++ b/PalmHill.BlazorChat/Server/SerialExecutionAspect.cs @@ -0,0 +1,44 @@ +namespace PalmHill.BlazorChat.Server +{ + using AspectInjector.Broker; + using System; + using System.Collections.Concurrent; + using System.Reflection; + using System.Threading; + using System.Threading.Tasks; + + [Aspect(Scope.Global)] + public class SerialExecutionAspect + { + private static readonly ConcurrentDictionary _semaphores = new ConcurrentDictionary(); + + [Advice(Kind.Around, Targets = Target.Method)] + public object? Handle( + [Argument(Source.Name)] string methodName, + [Argument(Source.Target)] Func method, + [Argument(Source.Arguments)] object[] args, + [Argument(Source.Metadata)] MethodBase methodBase) + { + var attribute = methodBase.GetCustomAttribute(); + var key = attribute?.Key ?? string.Empty; + var semaphore = _semaphores.GetOrAdd(key, _ => new SemaphoreSlim(1, 1)); + + semaphore.Wait(); + try + { + var result = method(args); + if (result is Task task) + { + task.GetAwaiter().GetResult(); // Ensure the task completes + return task; + } + return result; + } + finally + { + semaphore.Release(); + } + } + } + +} \ No newline at end of file diff --git a/PalmHill.BlazorChat/Server/SerialExecutionAttribute.cs b/PalmHill.BlazorChat/Server/SerialExecutionAttribute.cs new file mode 100644 index 0000000..aa7e63b --- /dev/null +++ b/PalmHill.BlazorChat/Server/SerialExecutionAttribute.cs @@ -0,0 +1,18 @@ +namespace PalmHill.BlazorChat.Server +{ + using AspectInjector.Broker; + + [AttributeUsage(AttributeTargets.Method)] + [Injection(typeof(SerialExecutionAspect))] + public class SerialExecutionAttribute : Attribute + { + public string Key { get; } + + public SerialExecutionAttribute(string key) + { + Key = key; + } + } + + +} diff --git a/PalmHill.BlazorChat/Server/SignalR/WebSocketChat.cs b/PalmHill.BlazorChat/Server/SignalR/WebSocketChat.cs index 17e3611..af726e2 100644 --- a/PalmHill.BlazorChat/Server/SignalR/WebSocketChat.cs +++ b/PalmHill.BlazorChat/Server/SignalR/WebSocketChat.cs @@ -1,5 +1,6 @@ using Azure.AI.OpenAI; using LLama; +using Microsoft.AspNetCore.Components; using Microsoft.AspNetCore.SignalR; using Microsoft.SemanticKernel.ChatCompletion; using PalmHill.BlazorChat.Server.WebApi; @@ -39,7 +40,7 @@ public async Task InferenceRequest(InferenceRequest chatConversation) { var conversationId = chatConversation.Id; var cancellationTokenSource = new CancellationTokenSource(); - ChatCancelation.CancelationTokens[conversationId] = cancellationTokenSource; + ChatCancellation.CancellationTokens[conversationId] = cancellationTokenSource; try { @@ -70,7 +71,7 @@ public async Task InferenceRequest(InferenceRequest chatConversation) finally { //ThreadLock.InferenceLock.Release(); - ChatCancelation.CancelationTokens.TryRemove(conversationId, out _); + ChatCancellation.CancellationTokens.TryRemove(conversationId, out _); } } @@ -83,6 +84,7 @@ public async Task InferenceRequest(InferenceRequest chatConversation) /// The unique identifier for the message. /// The chat conversation to use for inference. /// A Task that represents the asynchronous operation. + [SerialExecution("ModelOperation")] private async Task DoInferenceAndRespondToClient(ISingleClientProxy respondToClient, InferenceRequest chatConversation, CancellationToken cancellationToken) { // Create a context for the model and a chat session for the conversation diff --git a/PalmHill.BlazorChat/Server/WebApi/ApiChatController.cs b/PalmHill.BlazorChat/Server/WebApi/ApiChatController.cs index e4903de..370cb41 100644 --- a/PalmHill.BlazorChat/Server/WebApi/ApiChatController.cs +++ b/PalmHill.BlazorChat/Server/WebApi/ApiChatController.cs @@ -59,7 +59,7 @@ public async Task> Chat([FromBody] InferenceRequest convers var conversationId = conversation.Id; var cancellationTokenSource = new CancellationTokenSource(); - ChatCancelation.CancelationTokens[conversationId] = cancellationTokenSource; + ChatCancellation.CancellationTokens[conversationId] = cancellationTokenSource; try { @@ -80,7 +80,7 @@ public async Task> Chat([FromBody] InferenceRequest convers finally { //ThreadLock.InferenceLock.Release(); - ChatCancelation.CancelationTokens.TryRemove(conversationId, out _); + ChatCancellation.CancellationTokens.TryRemove(conversationId, out _); } _logger.LogError(errorText); @@ -98,7 +98,7 @@ public async Task> Ask(InferenceRequest chatConversati var conversationId = chatConversation.Id; var cancellationTokenSource = new CancellationTokenSource(); - ChatCancelation.CancelationTokens[conversationId] = cancellationTokenSource; + ChatCancellation.CancellationTokens[conversationId] = cancellationTokenSource; var question = chatConversation.ChatMessages.LastOrDefault()?.Message; if (question == null) @@ -137,7 +137,7 @@ public async Task> Ask(InferenceRequest chatConversati [HttpDelete("cancel/{conversationId}", Name = "CancelChat")] public async Task CancelChat(Guid conversationId) { - var cancelToken = ChatCancelation.CancelationTokens[conversationId]; + var cancelToken = ChatCancellation.CancellationTokens[conversationId]; if (cancelToken == null) { return false; @@ -154,6 +154,7 @@ public async Task CancelChat(Guid conversationId) /// /// The chat conversation for which to perform inference. /// Returns the inference result as a string. + [SerialExecution("ModelOperation")] private async Task DoInference(InferenceRequest conversation, CancellationToken cancellationToken) { diff --git a/PalmHill.BlazorChat/Server/WebApi/AttachmentController.cs b/PalmHill.BlazorChat/Server/WebApi/AttachmentController.cs index 09956e3..6970ea2 100644 --- a/PalmHill.BlazorChat/Server/WebApi/AttachmentController.cs +++ b/PalmHill.BlazorChat/Server/WebApi/AttachmentController.cs @@ -98,6 +98,7 @@ public async Task> AddAttachment([FromForm] FileUpl return attachmentInfo; } + [SerialExecution("ModelOperation")] private async Task DoImportAsync(string? userId, AttachmentInfo attachmentInfo) { try