From 0047da907f13ac9c730c1cd9cb0787967daa9b14 Mon Sep 17 00:00:00 2001 From: Zitong Yang Date: Tue, 24 Dec 2024 13:34:09 +0800 Subject: [PATCH] Add MessagePack Hub Protocol --- eng/Packages.Data.props | 9 +- .../CHANGELOG.md | 6 +- ...e.WebJobs.Extensions.SignalRService.csproj | 8 +- .../BinaryMessageFormatter.cs | 7 +- .../DefaultMessagePackHubProtocolWorker.cs | 9 +- .../MemoryBufferWriter.cs | 14 +- .../MessagePackHubProtocol.cs | 50 +- .../MessagePackHubProtocolWorker.cs | 589 ++---------------- .../TriggerBindings/Utils/JTokenWrapper.cs | 29 +- .../Config/IConfigurationExtensionsFacts.cs | 7 +- 10 files changed, 113 insertions(+), 615 deletions(-) diff --git a/eng/Packages.Data.props b/eng/Packages.Data.props index d5623cb1eb3c7..27c58f124ba51 100644 --- a/eng/Packages.Data.props +++ b/eng/Packages.Data.props @@ -209,11 +209,10 @@ - - - - - + + + + diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/CHANGELOG.md b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/CHANGELOG.md index 8ac9d34f17e03..04ccf9143e1e6 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/CHANGELOG.md +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/CHANGELOG.md @@ -1,14 +1,18 @@ # Release History -## 1.16.0-beta.1 (Unreleased) +## 2.0.0-beta.1 (Unreleased) ### Features Added ### Breaking Changes +* Remove .NET 6.0 support. ### Bugs Fixed +* Correctly support returning result for SignalR invocation in MessagePack protocol from isolated-worker process. ### Other Changes +* Update `MessagePack` to 2.5.192 +* Update `Microsoft.Azure.SignalR`, `Microsoft.Azure.SignalR.Management`, `Microsoft.Azure.SignalR.Protocols` to 1.29.0 ## 1.15.0 (2024-10-14) diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/Microsoft.Azure.WebJobs.Extensions.SignalRService.csproj b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/Microsoft.Azure.WebJobs.Extensions.SignalRService.csproj index 74a6ad0d7660d..c7aebb34648e2 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/Microsoft.Azure.WebJobs.Extensions.SignalRService.csproj +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/Microsoft.Azure.WebJobs.Extensions.SignalRService.csproj @@ -1,10 +1,9 @@ - + - $(RequiredTargetFrameworks);net6.0;net8.0 + $(RequiredTargetFrameworks) Microsoft.Azure.WebJobs.Extensions.SignalRService - 1.16.0-beta.1 - 1.15.0 + 2.0.0-beta.1 true true @@ -16,7 +15,6 @@ - diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/BinaryMessageFormatter.cs b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/BinaryMessageFormatter.cs index d30853007e2f4..d7c16c0ac6306 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/BinaryMessageFormatter.cs +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/BinaryMessageFormatter.cs @@ -1,11 +1,14 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. using System; using System.Buffers; namespace Microsoft.AspNetCore.Internal; +/// +/// Copied from https://github.com/dotnet/aspnetcore/blob/0825def633c99d9fdd74e47e69bcde3935a5fe74/ +/// internal static class BinaryMessageFormatter { public static void WriteLengthPrefix(long length, IBufferWriter output) diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/DefaultMessagePackHubProtocolWorker.cs b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/DefaultMessagePackHubProtocolWorker.cs index 7e9c5c1dd7204..476a8d94c8748 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/DefaultMessagePackHubProtocolWorker.cs +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/DefaultMessagePackHubProtocolWorker.cs @@ -1,5 +1,5 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. using System; using System.IO; @@ -8,6 +8,11 @@ namespace Microsoft.AspNetCore.SignalR.Protocol; +#nullable enable + +/// +/// Copied from https://github.com/dotnet/aspnetcore/blob/0825def633c99d9fdd74e47e69bcde3935a5fe74/ +/// internal sealed class DefaultMessagePackHubProtocolWorker : MessagePackHubProtocolWorker { private readonly MessagePackSerializerOptions _messagePackSerializerOptions; diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MemoryBufferWriter.cs b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MemoryBufferWriter.cs index 7861509636683..73bee7bbafa8d 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MemoryBufferWriter.cs +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MemoryBufferWriter.cs @@ -1,23 +1,23 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -#nullable enable - using System; using System.Buffers; using System.Collections.Generic; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.IO; using System.Threading; using System.Threading.Tasks; namespace Microsoft.AspNetCore.Internal; +/// +/// Copied from https://github.com/dotnet/aspnetcore/blob/0825def633c99d9fdd74e47e69bcde3935a5fe74/ +/// internal sealed class MemoryBufferWriter : Stream, IBufferWriter { [ThreadStatic] - private static MemoryBufferWriter? _cachedInstance; + private static MemoryBufferWriter _cachedInstance; #if DEBUG private bool _inUse; @@ -26,8 +26,8 @@ internal sealed class MemoryBufferWriter : Stream, IBufferWriter private readonly int _minimumSegmentSize; private int _bytesWritten; - private List? _completedSegments; - private byte[]? _currentSegment; + private List _completedSegments; + private byte[] _currentSegment; private int _position; public MemoryBufferWriter(int minimumSegmentSize = 4096) @@ -146,7 +146,6 @@ public override Task CopyToAsync(Stream destination, int bufferSize, Cancellatio return CopyToSlowAsync(destination, cancellationToken); } - [MemberNotNull(nameof(_currentSegment))] private void EnsureCapacity(int sizeHint) { // This does the Right Thing. It only subtracts _position from the current segment length if it's non-null. @@ -166,7 +165,6 @@ private void EnsureCapacity(int sizeHint) AddSegment(sizeHint); } - [MemberNotNull(nameof(_currentSegment))] private void AddSegment(int sizeHint = 0) { if (_currentSegment != null) diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocol.cs b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocol.cs index 66fe3b701d5e0..54ff38e5759c4 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocol.cs +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocol.cs @@ -1,5 +1,5 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. using System; using System.Buffers; @@ -11,70 +11,54 @@ using MessagePack.Resolvers; using Microsoft.AspNetCore.Connections; -using Microsoft.Extensions.Options; namespace Microsoft.AspNetCore.SignalR.Protocol; +#nullable enable + /// -/// Implements the SignalR Hub Protocol using MessagePack. +/// Implements the SignalR Hub Protocol using MessagePack. Copied from https://github.com/dotnet/aspnetcore/blob/0825def633c99d9fdd74e47e69bcde3935a5fe74/ /// -public class MessagePackHubProtocol : IHubProtocol +internal class MessagePackHubProtocol : IHubProtocol { private const string ProtocolName = "messagepack"; private const int ProtocolVersion = 2; private readonly DefaultMessagePackHubProtocolWorker _worker; + private readonly MessagePackSerializerOptions _messagePackSerializerOptions = + MessagePackSerializerOptions + .Standard + .WithResolver(SignalRResolver.Instance) + .WithSecurity(MessagePackSecurity.UntrustedData); - /// public string Name => ProtocolName; - /// public int Version => ProtocolVersion; - /// public TransferFormat TransferFormat => TransferFormat.Binary; - /// - /// Initializes a new instance of the class. - /// public MessagePackHubProtocol() - : this(Options.Create(new MessagePackHubProtocolOptions())) - { } - - /// - /// Initializes a new instance of the class. - /// - /// The options used to initialize the protocol. - public MessagePackHubProtocol(IOptions options) { - ArgumentNullThrowHelper.ThrowIfNull(options); - - _worker = new DefaultMessagePackHubProtocolWorker(options.Value.SerializerOptions); + _worker = new DefaultMessagePackHubProtocolWorker(_messagePackSerializerOptions); } - /// public bool IsVersionSupported(int version) { return version <= Version; } - /// +#if NETSTANDARD2_0 + public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, out HubMessage? message) +#else public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) - => _worker.TryParseMessage(ref input, binder, out message); +#endif + => throw new NotImplementedException(); - /// public void WriteMessage(HubMessage message, IBufferWriter output) => _worker.WriteMessage(message, output); - /// public ReadOnlyMemory GetMessageBytes(HubMessage message) => _worker.GetMessageBytes(message); - internal static MessagePackSerializerOptions CreateDefaultMessagePackSerializerOptions() => - MessagePackSerializerOptions - .Standard - .WithResolver(SignalRResolver.Instance) - .WithSecurity(MessagePackSecurity.UntrustedData); - internal sealed class SignalRResolver : IFormatterResolver { public static readonly IFormatterResolver Instance = new SignalRResolver(); diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocolWorker.cs b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocolWorker.cs index 7c1f23f8bd215..81b01619c40e1 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocolWorker.cs +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/MessagePackHubProtocol/MessagePackHubProtocolWorker.cs @@ -1,27 +1,21 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -#pragma warning disable IDE0005 // This file is shared across multiple projects making it ugly to ignore unused usings +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. using System; using System.Buffers; using System.Collections.Generic; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.IO; -using System.IO.Pipelines; -using System.Runtime.ExceptionServices; -using System.Text; using MessagePack; using Microsoft.AspNetCore.Internal; -using Microsoft.Azure.SignalR.Protocol; namespace Microsoft.AspNetCore.SignalR.Protocol; +#nullable enable + /// -/// Implements support for MessagePackHubProtocol. This code is shared between SignalR and Blazor. +/// Copied from https://github.com/dotnet/aspnetcore/blob/0825def633c99d9fdd74e47e69bcde3935a5fe74/ /// internal abstract class MessagePackHubProtocolWorker { @@ -29,313 +23,8 @@ internal abstract class MessagePackHubProtocolWorker private const int VoidResult = 2; private const int NonVoidResult = 3; - public bool TryParseMessage(ref ReadOnlySequence input, IInvocationBinder binder, [NotNullWhen(true)] out HubMessage? message) - { - if (!BinaryMessageParser.TryParseMessage(ref input, out var payload)) - { - message = null; - return false; - } - - var reader = new MessagePackReader(payload); - message = ParseMessage(ref reader, binder); - return message != null; - } - - private HubMessage? ParseMessage(ref MessagePackReader reader, IInvocationBinder binder) - { - var itemCount = reader.ReadArrayHeader(); - - var messageType = ReadInt32(ref reader, "messageType"); - - switch (messageType) - { - case HubProtocolConstants.InvocationMessageType: - return CreateInvocationMessage(ref reader, binder, itemCount); - case HubProtocolConstants.StreamInvocationMessageType: - return CreateStreamInvocationMessage(ref reader, binder, itemCount); - case HubProtocolConstants.StreamItemMessageType: - return CreateStreamItemMessage(ref reader, binder); - case HubProtocolConstants.CompletionMessageType: - return CreateCompletionMessage(ref reader, binder); - case HubProtocolConstants.CancelInvocationMessageType: - return CreateCancelInvocationMessage(ref reader); - case HubProtocolConstants.PingMessageType: - return PingMessage.Instance; - case HubProtocolConstants.CloseMessageType: - return CreateCloseMessage(ref reader, itemCount); - case HubProtocolConstants.AckMessageType: - return CreateAckMessage(ref reader); - case HubProtocolConstants.SequenceMessageType: - return CreateSequenceMessage(ref reader); - default: - // Future protocol changes can add message types, old clients can ignore them - return null; - } - } - - private HubMessage CreateInvocationMessage(ref MessagePackReader reader, IInvocationBinder binder, int itemCount) - { - var headers = ReadHeaders(ref reader); - var invocationId = ReadInvocationId(ref reader); - - // For MsgPack, we represent an empty invocation ID as an empty string, - // so we need to normalize that to "null", which is what indicates a non-blocking invocation. - if (string.IsNullOrEmpty(invocationId)) - { - invocationId = null; - } - - var target = ReadString(ref reader, binder, "target"); - ThrowIfNullOrEmpty(target, "target for Invocation message"); - - object?[]? arguments; - try - { - var parameterTypes = binder.GetParameterTypes(target); - arguments = BindArguments(ref reader, parameterTypes); - } - catch (Exception ex) - { - return new InvocationBindingFailureMessage(invocationId, target, ExceptionDispatchInfo.Capture(ex)); - } - - string[]? streams = null; - // Previous clients will send 5 items, so we check if they sent a stream array or not - if (itemCount > 5) - { - streams = ReadStreamIds(ref reader); - } - - return ApplyHeaders(headers, new InvocationMessage(invocationId, target, arguments, streams)); - } - - private HubMessage CreateStreamInvocationMessage(ref MessagePackReader reader, IInvocationBinder binder, int itemCount) - { - var headers = ReadHeaders(ref reader); - var invocationId = ReadInvocationId(ref reader); - ThrowIfNullOrEmpty(invocationId, "invocation ID for StreamInvocation message"); - - var target = ReadString(ref reader, "target"); - ThrowIfNullOrEmpty(target, "target for StreamInvocation message"); - - object?[] arguments; - try - { - var parameterTypes = binder.GetParameterTypes(target); - arguments = BindArguments(ref reader, parameterTypes); - } - catch (Exception ex) - { - return new InvocationBindingFailureMessage(invocationId, target, ExceptionDispatchInfo.Capture(ex)); - } - - string[]? streams = null; - // Previous clients will send 5 items, so we check if they sent a stream array or not - if (itemCount > 5) - { - streams = ReadStreamIds(ref reader); - } - - return ApplyHeaders(headers, new StreamInvocationMessage(invocationId, target, arguments, streams)); - } - - private HubMessage CreateStreamItemMessage(ref MessagePackReader reader, IInvocationBinder binder) - { - var headers = ReadHeaders(ref reader); - var invocationId = ReadInvocationId(ref reader); - ThrowIfNullOrEmpty(invocationId, "invocation ID for StreamItem message"); - - object? value; - try - { - var itemType = binder.GetStreamItemType(invocationId); - value = DeserializeObject(ref reader, itemType, "item"); - } - catch (Exception ex) - { - return new StreamBindingFailureMessage(invocationId, ExceptionDispatchInfo.Capture(ex)); - } - - return ApplyHeaders(headers, new StreamItemMessage(invocationId, value)); - } - - private CompletionMessage CreateCompletionMessage(ref MessagePackReader reader, IInvocationBinder binder) - { - var headers = ReadHeaders(ref reader); - var invocationId = ReadInvocationId(ref reader); - ThrowIfNullOrEmpty(invocationId, "invocation ID for Completion message"); - - var resultKind = ReadInt32(ref reader, "resultKind"); - - string? error = null; - object? result = null; - var hasResult = false; - - switch (resultKind) - { - case ErrorResult: - error = ReadString(ref reader, "error"); - break; - case NonVoidResult: - hasResult = true; - var itemType = ProtocolHelper.TryGetReturnType(binder, invocationId); - if (itemType is null) - { - reader.Skip(); - } - else - { - if (itemType == typeof(RawResult)) - { - result = new RawResult(reader.ReadRaw()); - } - else - { - try - { - result = DeserializeObject(ref reader, itemType, "argument"); - } - catch (Exception ex) - { - error = $"Error trying to deserialize result to {itemType.Name}. {ex.Message}"; - hasResult = false; - } - } - } - break; - case VoidResult: - hasResult = false; - break; - default: - throw new InvalidDataException("Invalid invocation result kind."); - } - - return ApplyHeaders(headers, new CompletionMessage(invocationId, error, result, hasResult)); - } - - private static CancelInvocationMessage CreateCancelInvocationMessage(ref MessagePackReader reader) - { - var headers = ReadHeaders(ref reader); - var invocationId = ReadInvocationId(ref reader); - ThrowIfNullOrEmpty(invocationId, "invocation ID for CancelInvocation message"); - - return ApplyHeaders(headers, new CancelInvocationMessage(invocationId)); - } - - private static CloseMessage CreateCloseMessage(ref MessagePackReader reader, int itemCount) - { - var error = ReadString(ref reader, "error"); - var allowReconnect = false; - - if (itemCount > 2) - { - allowReconnect = ReadBoolean(ref reader, "allowReconnect"); - } - - // An empty string is still an error - if (error == null && !allowReconnect) - { - return CloseMessage.Empty; - } - - return new CloseMessage(error, allowReconnect); - } - - private static Dictionary? ReadHeaders(ref MessagePackReader reader) - { - var headerCount = ReadMapLength(ref reader, "headers"); - if (headerCount > 0) - { - var headers = new Dictionary(StringComparer.Ordinal); - - for (var i = 0; i < headerCount; i++) - { - var key = ReadString(ref reader, $"headers[{i}].Key"); - ThrowIfNullOrEmpty(key, "key in header"); - - var value = ReadString(ref reader, $"headers[{i}].Value"); - ThrowIfNullOrEmpty(value, "value in header"); - - headers.Add(key, value); - } - return headers; - } - else - { - return null; - } - } - - private static string[]? ReadStreamIds(ref MessagePackReader reader) - { - var streamIdCount = ReadArrayLength(ref reader, "streamIds"); - List? streams = null; - - if (streamIdCount > 0) - { - streams = new List(); - for (var i = 0; i < streamIdCount; i++) - { - var id = reader.ReadString(); - ThrowIfNullOrEmpty(id, "value in streamIds received"); - - streams.Add(id); - } - } - - return streams?.ToArray(); - } - - private static AckMessage CreateAckMessage(ref MessagePackReader reader) - { - return new AckMessage(ReadInt64(ref reader, "sequenceId")); - } - - private static SequenceMessage CreateSequenceMessage(ref MessagePackReader reader) - { - return new SequenceMessage(ReadInt64(ref reader, "sequenceId")); - } - - private object?[] BindArguments(ref MessagePackReader reader, IReadOnlyList parameterTypes) - { - var argumentCount = ReadArrayLength(ref reader, "arguments"); - - if (parameterTypes.Count != argumentCount) - { - throw new InvalidDataException( - $"Invocation provides {argumentCount} argument(s) but target expects {parameterTypes.Count}."); - } - - try - { - var arguments = new object?[argumentCount]; - for (var i = 0; i < argumentCount; i++) - { - arguments[i] = DeserializeObject(ref reader, parameterTypes[i], "argument"); - } - - return arguments; - } - catch (Exception ex) - { - throw new InvalidDataException("Error binding arguments. Make sure that the types of the provided values match the types of the hub method being invoked.", ex); - } - } - protected abstract object? DeserializeObject(ref MessagePackReader reader, Type type, string field); - private static T ApplyHeaders(IDictionary? source, T destination) where T : HubInvocationMessage - { - if (source != null && source.Count > 0) - { - destination.Headers = source; - } - - return destination; - } - - /// public void WriteMessage(HubMessage message, IBufferWriter output) { var memoryBufferWriter = MemoryBufferWriter.Get(); @@ -357,7 +46,6 @@ public void WriteMessage(HubMessage message, IBufferWriter output) } } - /// public ReadOnlyMemory GetMessageBytes(HubMessage message) { var memoryBufferWriter = MemoryBufferWriter.Get(); @@ -395,76 +83,66 @@ private void WriteMessageCore(HubMessage message, ref MessagePackWriter writer) case InvocationMessage invocationMessage: WriteInvocationMessage(invocationMessage, ref writer); break; - case StreamInvocationMessage streamInvocationMessage: - WriteStreamInvocationMessage(streamInvocationMessage, ref writer); - break; - case StreamItemMessage streamItemMessage: - WriteStreamingItemMessage(streamItemMessage, ref writer); - break; case CompletionMessage completionMessage: WriteCompletionMessage(completionMessage, ref writer); break; - case CancelInvocationMessage cancelInvocationMessage: - WriteCancelInvocationMessage(cancelInvocationMessage, ref writer); - break; - case PingMessage: - WritePingMessage(ref writer); - break; - case CloseMessage closeMessage: - WriteCloseMessage(closeMessage, ref writer); - break; - case AckMessage ackMessage: - WriteAckMessage(ackMessage, ref writer); - break; - case SequenceMessage sequenceMessage: - WriteSequenceMessage(sequenceMessage, ref writer); - break; default: - throw new InvalidDataException($"Unexpected message type: {message.GetType().Name}"); + throw new NotSupportedException($"Not supported message type: {message.GetType().Name}"); } writer.Flush(); } - private void WriteInvocationMessage(InvocationMessage message, ref MessagePackWriter writer) + private void WriteArgument(object? argument, ref MessagePackWriter writer) { - writer.WriteArrayHeader(6); - - writer.Write(HubProtocolConstants.InvocationMessageType); - PackHeaders(message.Headers, ref writer); - if (string.IsNullOrEmpty(message.InvocationId)) + if (argument == null) { writer.WriteNil(); } +#if NET8_0_OR_GREATER + else if (argument is RawResult result) + { + writer.WriteRaw(result.RawSerializedData); + } +#endif else { - writer.Write(message.InvocationId); + Serialize(ref writer, argument.GetType(), argument); } - writer.Write(message.Target); + } - if (message.Arguments is null) + protected abstract void Serialize(ref MessagePackWriter writer, Type type, object value); + + private static void WriteStreamIds(string[]? streamIds, ref MessagePackWriter writer) + { + if (streamIds != null) { - writer.WriteArrayHeader(0); + writer.WriteArrayHeader(streamIds.Length); + foreach (var streamId in streamIds) + { + writer.Write(streamId); + } } else { - writer.WriteArrayHeader(message.Arguments.Length); - foreach (var arg in message.Arguments) - { - WriteArgument(arg, ref writer); - } + writer.WriteArrayHeader(0); } - - WriteStreamIds(message.StreamIds, ref writer); } - private void WriteStreamInvocationMessage(StreamInvocationMessage message, ref MessagePackWriter writer) + private void WriteInvocationMessage(InvocationMessage message, ref MessagePackWriter writer) { writer.WriteArrayHeader(6); - writer.Write(HubProtocolConstants.StreamInvocationMessageType); + writer.Write(HubProtocolConstants.InvocationMessageType); PackHeaders(message.Headers, ref writer); - writer.Write(message.InvocationId); + if (string.IsNullOrEmpty(message.InvocationId)) + { + writer.WriteNil(); + } + else + { + writer.Write(message.InvocationId); + } writer.Write(message.Target); if (message.Arguments is null) @@ -479,51 +157,9 @@ private void WriteStreamInvocationMessage(StreamInvocationMessage message, ref M WriteArgument(arg, ref writer); } } - +#if NET6_0_OR_GREATER WriteStreamIds(message.StreamIds, ref writer); - } - - private void WriteStreamingItemMessage(StreamItemMessage message, ref MessagePackWriter writer) - { - writer.WriteArrayHeader(4); - writer.Write(HubProtocolConstants.StreamItemMessageType); - PackHeaders(message.Headers, ref writer); - writer.Write(message.InvocationId); - WriteArgument(message.Item, ref writer); - } - - private void WriteArgument(object? argument, ref MessagePackWriter writer) - { - if (argument == null) - { - writer.WriteNil(); - } - else if (argument is RawResult result) - { - writer.WriteRaw(result.RawSerializedData); - } - else - { - Serialize(ref writer, argument.GetType(), argument); - } - } - - protected abstract void Serialize(ref MessagePackWriter writer, Type type, object value); - - private static void WriteStreamIds(string[]? streamIds, ref MessagePackWriter writer) - { - if (streamIds != null) - { - writer.WriteArrayHeader(streamIds.Length); - foreach (var streamId in streamIds) - { - writer.Write(streamId); - } - } - else - { - writer.WriteArrayHeader(0); - } +#endif } private void WriteCompletionMessage(CompletionMessage message, ref MessagePackWriter writer) @@ -549,50 +185,6 @@ private void WriteCompletionMessage(CompletionMessage message, ref MessagePackWr } } - private static void WriteCancelInvocationMessage(CancelInvocationMessage message, ref MessagePackWriter writer) - { - writer.WriteArrayHeader(3); - writer.Write(HubProtocolConstants.CancelInvocationMessageType); - PackHeaders(message.Headers, ref writer); - writer.Write(message.InvocationId); - } - - private static void WriteCloseMessage(CloseMessage message, ref MessagePackWriter writer) - { - writer.WriteArrayHeader(3); - writer.Write(HubProtocolConstants.CloseMessageType); - if (string.IsNullOrEmpty(message.Error)) - { - writer.WriteNil(); - } - else - { - writer.Write(message.Error); - } - - writer.Write(message.AllowReconnect); - } - - private static void WritePingMessage(ref MessagePackWriter writer) - { - writer.WriteArrayHeader(1); - writer.Write(HubProtocolConstants.PingMessageType); - } - - private static void WriteAckMessage(AckMessage message, ref MessagePackWriter writer) - { - writer.WriteArrayHeader(2); - writer.Write(HubProtocolConstants.AckMessageType); - writer.Write(message.SequenceId); - } - - private static void WriteSequenceMessage(SequenceMessage message, ref MessagePackWriter writer) - { - writer.WriteArrayHeader(2); - writer.Write(HubProtocolConstants.SequenceMessageType); - writer.Write(message.SequenceId); - } - private static void PackHeaders(IDictionary? headers, ref MessagePackWriter writer) { if (headers != null) @@ -612,107 +204,4 @@ private static void PackHeaders(IDictionary? headers, ref Messag writer.WriteMapHeader(0); } } - - private static string? ReadInvocationId(ref MessagePackReader reader) => - ReadString(ref reader, "invocationId"); - - private static bool ReadBoolean(ref MessagePackReader reader, string field) - { - try - { - return reader.ReadBoolean(); - } - catch (Exception ex) - { - throw new InvalidDataException($"Reading '{field}' as Boolean failed.", ex); - } - } - - private static int ReadInt32(ref MessagePackReader reader, string field) - { - try - { - return reader.ReadInt32(); - } - catch (Exception ex) - { - throw new InvalidDataException($"Reading '{field}' as Int32 failed.", ex); - } - } - - private static long ReadInt64(ref MessagePackReader reader, string field) - { - try - { - return reader.ReadInt64(); - } - catch (Exception ex) - { - throw new InvalidDataException($"Reading '{field}' as Int64 failed.", ex); - } - } - - protected static string? ReadString(ref MessagePackReader reader, IInvocationBinder binder, string field) - { - try - { -#if NETCOREAPP - if (reader.TryReadStringSpan(out var span)) - { - return binder.GetTarget(span) ?? Encoding.UTF8.GetString(span); - } - return reader.ReadString(); -#else - return reader.ReadString(); -#endif - } - catch (Exception ex) - { - throw new InvalidDataException($"Reading '{field}' as String failed.", ex); - } - } - - protected static string? ReadString(ref MessagePackReader reader, string field) - { - try - { - return reader.ReadString(); - } - catch (Exception ex) - { - throw new InvalidDataException($"Reading '{field}' as String failed.", ex); - } - } - - private static long ReadMapLength(ref MessagePackReader reader, string field) - { - try - { - return reader.ReadMapHeader(); - } - catch (Exception ex) - { - throw new InvalidDataException($"Reading map length for '{field}' failed.", ex); - } - } - - private static long ReadArrayLength(ref MessagePackReader reader, string field) - { - try - { - return reader.ReadArrayHeader(); - } - catch (Exception ex) - { - throw new InvalidDataException($"Reading array length for '{field}' failed.", ex); - } - } - - private static void ThrowIfNullOrEmpty([NotNull] string? target, string message) - { - if (string.IsNullOrEmpty(target)) - { - throw new InvalidDataException($"Null or empty {message}."); - } - } } \ No newline at end of file diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/Utils/JTokenWrapper.cs b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/Utils/JTokenWrapper.cs index 7f84b762a3c70..8753d3ec8884a 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/Utils/JTokenWrapper.cs +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/src/TriggerBindings/Utils/JTokenWrapper.cs @@ -3,7 +3,12 @@ using System; using System.Text.Json; + +using MessagePack; +using MessagePack.Formatters; + using Microsoft.AspNetCore.SignalR.Protocol; + using Newtonsoft.Json; using Newtonsoft.Json.Linq; @@ -15,14 +20,10 @@ namespace Microsoft.Azure.WebJobs.Extensions.SignalRService; /// [System.Text.Json.Serialization.JsonConverter(typeof(JTokenWrapperJsonConverter))] -internal class JTokenWrapper +[MessagePackFormatter(typeof(JTokenWrapperMessagePackFormatter))] +internal class JTokenWrapper(JToken value) { - public JTokenWrapper(JToken value) - { - Value = value; - } - - public JToken Value { get; } + public JToken Value { get; } = value; } internal class JTokenWrapperJsonConverter : System.Text.Json.Serialization.JsonConverter @@ -45,3 +46,17 @@ public override void Write(Utf8JsonWriter writer, JTokenWrapper value, JsonSeria #endif } } + +internal class JTokenWrapperMessagePackFormatter : IMessagePackFormatter +{ + public JTokenWrapper Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options) + { + throw new NotImplementedException(); + } + + public void Serialize(ref MessagePackWriter writer, JTokenWrapper value, MessagePackSerializerOptions options) + { + var jsonString = value.Value.ToString(); + MessagePackSerializer.ConvertFromJson(jsonString, ref writer, options); + } +} \ No newline at end of file diff --git a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/tests/Config/IConfigurationExtensionsFacts.cs b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/tests/Config/IConfigurationExtensionsFacts.cs index 7a4ff6190a771..47852cab56e6a 100644 --- a/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/tests/Config/IConfigurationExtensionsFacts.cs +++ b/sdk/signalr/Microsoft.Azure.WebJobs.Extensions.SignalRService/tests/Config/IConfigurationExtensionsFacts.cs @@ -3,14 +3,17 @@ using System; using System.Linq; + using Azure.Core.Serialization; using Azure.Identity; + using Microsoft.Azure.SignalR; using Microsoft.Azure.SignalR.Tests.Common; using Microsoft.Azure.WebJobs.Extensions.SignalRService; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; + using Xunit; namespace SignalRServiceExtension.Tests.Config @@ -41,7 +44,7 @@ public void TestGetNamedEndpointFromIdentityWithOnlyUri() Assert.True(config.GetSection("eastus").TryGetEndpointFromIdentity(factory, out var endpoint)); Assert.Equal("eastus", endpoint.Name); Assert.Equal(uri, endpoint.Endpoint); - Assert.IsType((endpoint.AccessKey as AadAccessKey).TokenCredential); + Assert.IsType((endpoint.AccessKey as MicrosoftEntraAccessKey).TokenCredential); Assert.Equal(EndpointType.Primary, endpoint.EndpointType); } @@ -65,7 +68,7 @@ public void TestGetNamedEndpointFromIdentityWithAllEndpointField() Assert.Equal(uri, endpoint.Endpoint); Assert.Equal(new Uri("https://serverEndpoint.com"), endpoint.ServerEndpoint); Assert.Equal(new Uri("https://clientEndpoint.com"), endpoint.ClientEndpoint); - Assert.IsType((endpoint.AccessKey as AadAccessKey).TokenCredential); + Assert.IsType((endpoint.AccessKey as MicrosoftEntraAccessKey).TokenCredential); Assert.Equal(EndpointType.Secondary, endpoint.EndpointType); }