Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: [dotnet] open control channel for grpc save/load #5489

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions dotnet/src/Microsoft.AutoGen/Core.Grpc/GrpcAgentRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ public async ValueTask OnMessageAsync(Message message, CancellationToken cancell
var cloudEvent = message.CloudEvent ?? throw new InvalidOperationException("CloudEvent is null.");
await HandlePublish(cloudEvent);
break;
throw new InvalidOperationException("ControlMessage is not supported.");
default:
throw new InvalidOperationException($"Unexpected message '{message}'.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,19 @@
/// </summary>
private readonly IMessageRegistryGrain _messageRegistry;
private readonly IGateway _reference;
private readonly ConcurrentDictionary<string, List<GrpcWorkerConnection>> _supportedAgentTypes = [];
public readonly ConcurrentDictionary<string, GrpcWorkerConnection> _workers = new();
private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection> _agentDirectory = new();
private readonly ConcurrentDictionary<(GrpcWorkerConnection, string), TaskCompletionSource<RpcResponse>> _pendingRequests = new();
private readonly ConcurrentDictionary<string, List<GrpcWorkerConnection<Message>>> _supportedAgentTypes = [];

Check warning on line 34 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L34

Added line #L34 was not covered by tests
/// <summary>
/// a map of the clientids form the grpc connection headers to the worker connections that service that id
/// for the Message Channel
/// </summary>
public readonly ConcurrentDictionary<string, GrpcWorkerConnection<Message>> _workers = new();

Check warning on line 39 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L39

Added line #L39 was not covered by tests
/// <summary>
/// a map of the clientids form the grpc connection headers to the worker connections for the Control Channel
/// </summary>
public readonly ConcurrentDictionary<string, GrpcWorkerConnection<ControlMessage>> _controlWorkers = new();
private readonly ConcurrentDictionary<(string Type, string Key), GrpcWorkerConnection<Message>> _agentDirectory = new();
private readonly ConcurrentDictionary<(GrpcWorkerConnection<Message>, string), TaskCompletionSource<RpcResponse>> _pendingRequests = new();
private readonly ConcurrentDictionary<(GrpcWorkerConnection<ControlMessage>, string), TaskCompletionSource<RpcResponse>> _pendingControlRequests = new();

Check warning on line 46 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L43-L46

Added lines #L43 - L46 were not covered by tests

/// <summary>
/// Initializes a new instance of the <see cref="GrpcGateway"/> class.
Expand Down Expand Up @@ -255,16 +264,25 @@
/// <param name="responseStream">The response stream.</param>
/// <param name="context">The server call context.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
internal async Task ConnectToWorkerProcess(IAsyncStreamReader<Message> requestStream, IServerStreamWriter<Message> responseStream, ServerCallContext context)
internal async Task ConnectToWorkerProcess<TMessage>(IAsyncStreamReader<TMessage> requestStream, IServerStreamWriter<TMessage> responseStream, ServerCallContext context)
where TMessage : class
{
_logger.LogInformation("Received new connection from {Peer}.", context.Peer);
var clientId = (context.RequestHeaders.Get("client-id")?.Value) ??
throw new RpcException(new Status(StatusCode.InvalidArgument, "Client ID is required."));
var workerProcess = new GrpcWorkerConnection(this, requestStream, responseStream, context);
_workers.GetOrAdd(clientId, workerProcess);

await this.AttachDanglingRegistrations(clientId).ConfigureAwait(false);

var clientId = context.RequestHeaders.Get("client-id")?.Value
?? throw new RpcException(new Status(StatusCode.InvalidArgument, "Client ID is required."));
var workerProcess = new GrpcWorkerConnection<TMessage>(this, requestStream, responseStream, context);

Check warning on line 273 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L272-L273

Added lines #L272 - L273 were not covered by tests
if (typeof(TMessage) == typeof(Message))
{
_workers.GetOrAdd(clientId, _ => (GrpcWorkerConnection<Message>)(object)workerProcess);

Check warning on line 276 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L276

Added line #L276 was not covered by tests
}
else if (typeof(TMessage) == typeof(ControlMessage))
{
_controlWorkers.GetOrAdd(clientId, _ => (GrpcWorkerConnection<ControlMessage>)(object)workerProcess);

Check warning on line 280 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L280

Added line #L280 was not covered by tests
}
else
{
throw new InvalidOperationException($"Unsupported message type: {typeof(TMessage).Name}");

Check warning on line 284 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L284

Added line #L284 was not covered by tests
}
await workerProcess.Connect().ConfigureAwait(false);
}

Expand Down Expand Up @@ -301,20 +319,35 @@
/// <param name="message">The received message.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
internal async Task OnReceivedMessageAsync(GrpcWorkerConnection connection, Message message, CancellationToken cancellationToken = default)
internal async Task OnReceivedMessageAsync<TMessage>(GrpcWorkerConnection<TMessage> connection, TMessage message, CancellationToken cancellationToken = default)
where TMessage : class
{
_logger.LogInformation("Received message {Message} from connection {Connection}.", message, connection);
switch (message.MessageCase)

switch (message)
{
case Message.MessageOneofCase.Request:
await DispatchRequestAsync(connection, message.Request);
break;
case Message.MessageOneofCase.Response:
DispatchResponse(connection, message.Response);
break;
case Message.MessageOneofCase.CloudEvent:
await DispatchEventAsync(message.CloudEvent, cancellationToken);
case Message msg:
switch (msg.MessageCase)
{
case Message.MessageOneofCase.Request:
await DispatchRequestAsync(connection, msg.Request);
break;

Check warning on line 334 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L333-L334

Added lines #L333 - L334 were not covered by tests
case Message.MessageOneofCase.Response:
DispatchResponse(connection, msg.Response);
break;

Check warning on line 337 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L336-L337

Added lines #L336 - L337 were not covered by tests
case Message.MessageOneofCase.CloudEvent:
await DispatchEventAsync(msg.CloudEvent, cancellationToken);
break;

Check warning on line 340 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L339-L340

Added lines #L339 - L340 were not covered by tests
default:
await RespondBadRequestAsync(connection, $"Unknown message type for message '{msg}'.");
break;

Check warning on line 343 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L342-L343

Added lines #L342 - L343 were not covered by tests
}
break;

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to remove this

//case ControlMessage controlMsg:
// //await HandleControlMessageAsync(connection, controlMsg, cancellationToken);
// break;

default:
await RespondBadRequestAsync(connection, $"Unknown message type for message '{message}'.");
break;
Expand All @@ -326,14 +359,25 @@
/// </summary>
/// <param name="connection">The worker connection.</param>
/// <param name="response">The RPC response.</param>
private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse response)
private void DispatchResponse<TMessage>(GrpcWorkerConnection<TMessage> connection, RpcResponse response)
where TMessage : class
{
if (!_pendingRequests.TryRemove((connection, response.RequestId), out var completion))
if (connection is GrpcWorkerConnection<Message> messageConnection)
{
_logger.LogWarning("Received response for unknown request id: {RequestId}.", response.RequestId);
return;
if (_pendingRequests.TryRemove((messageConnection, response.RequestId), out var completion))
{
completion.SetResult(response);
return;

Check warning on line 370 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L369-L370

Added lines #L369 - L370 were not covered by tests
}
}
else if (connection is GrpcWorkerConnection<ControlMessage> controlConnection)
{
if (_pendingControlRequests.TryRemove((controlConnection, response.RequestId), out var completion))
{
completion.SetResult(response);
return;

Check warning on line 378 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L377-L378

Added lines #L377 - L378 were not covered by tests
}
}
completion.SetResult(response);
}

/// <summary>
Expand Down Expand Up @@ -392,7 +436,8 @@
/// <param name="connection">The worker connection.</param>
/// <param name="request">The RPC request.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request)
private async ValueTask DispatchRequestAsync<TMessage>(GrpcWorkerConnection<TMessage> connection, RpcRequest request)
where TMessage : class
{
var requestId = request.RequestId;
if (request.Target is null)
Expand Down Expand Up @@ -422,42 +467,60 @@
/// <param name="request">The RPC request.</param>
/// <param name="func">The function to invoke.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
private static async Task InvokeRequestDelegate(GrpcWorkerConnection connection, RpcRequest request, Func<RpcRequest, Task<RpcResponse>> func)
private static async Task InvokeRequestDelegate<TMessage>(GrpcWorkerConnection<TMessage> connection, RpcRequest request, Func<RpcRequest, Task<RpcResponse>> func)
where TMessage : class
{
try
{
var response = await func(request);
response.RequestId = request.RequestId;
await connection.ResponseStream.WriteAsync(new Message { Response = response }).ConfigureAwait(false);

if (connection is GrpcWorkerConnection<Message> messageConnection)
{
await messageConnection.ResponseStream.WriteAsync(new Message { Response = response }).ConfigureAwait(false);

Check warning on line 480 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L480

Added line #L480 was not covered by tests
}
}
catch (Exception ex)
{
await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }).ConfigureAwait(false);
if (connection is GrpcWorkerConnection<Message> messageConnection)
{
await messageConnection.ResponseStream.WriteAsync(
new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }
).ConfigureAwait(false);

Check warning on line 489 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L487-L489

Added lines #L487 - L489 were not covered by tests
}
}
}

/// <summary>
/// Handles the removal of a worker process.
/// </summary>
/// <param name="workerProcess">The worker process.</param>
internal void OnRemoveWorkerProcess(GrpcWorkerConnection workerProcess)
internal void OnRemoveWorkerProcess<TMessage>(GrpcWorkerConnection<TMessage> workerProcess)
where TMessage : class
{
var clientId = workerProcess.ServerCallContext.RequestHeaders.Get("client-id")?.Value ??
throw new RpcException(new Status(StatusCode.InvalidArgument, "Grpc Client ID is required."));
var clientId = workerProcess.ServerCallContext.RequestHeaders.Get("client-id")?.Value
?? throw new RpcException(new Status(StatusCode.InvalidArgument, "Grpc Client ID is required."));

Check warning on line 502 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L502

Added line #L502 was not covered by tests

_workers.TryRemove(clientId, out _);
_controlWorkers.TryRemove(clientId, out _);

Check warning on line 505 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L505

Added line #L505 was not covered by tests

var types = workerProcess.GetSupportedTypes();
foreach (var type in types)
{
if (_supportedAgentTypes.TryGetValue(type, out var supported))
if (_supportedAgentTypes.TryGetValue(type, out var supported) && workerProcess is GrpcWorkerConnection<Message> messageWorker)
{
supported.Remove(workerProcess);
supported.Remove(messageWorker);

Check warning on line 512 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L512

Added line #L512 was not covered by tests
}
}
foreach (var pair in _agentDirectory)

if (workerProcess is GrpcWorkerConnection<Message> messageWorkerInstance)
{
if (pair.Value == workerProcess)
foreach (var pair in _agentDirectory.ToList())
{
((IDictionary<(string Type, string Key), GrpcWorkerConnection>)_agentDirectory).Remove(pair);
if (ReferenceEquals(pair.Value, messageWorkerInstance)) // Ensures exact instance match
{
_agentDirectory.TryRemove(pair.Key, out _);

Check warning on line 522 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGateway.cs#L522

Added line #L522 was not covered by tests
}
}
}
}
Expand All @@ -468,7 +531,8 @@
/// <param name="connection">The worker connection.</param>
/// <param name="error">The error message.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
private static async ValueTask RespondBadRequestAsync(GrpcWorkerConnection connection, string error)
private static async ValueTask RespondBadRequestAsync<TMessage>(GrpcWorkerConnection<TMessage> connection, string error)
where TMessage : class
{
throw new RpcException(new Status(StatusCode.InvalidArgument, error));
}
Expand All @@ -480,7 +544,7 @@
/// <param name="cloudEvent">The cloud event.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
private async Task WriteResponseAsync(GrpcWorkerConnection connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
private async Task WriteResponseAsync(GrpcWorkerConnection<Message> connection, CloudEvent cloudEvent, CancellationToken cancellationToken = default)
{
await connection.ResponseStream.WriteAsync(new Message { CloudEvent = cloudEvent }, cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,31 @@
}
}

/// <summary>
/// Open channel for the Control Channel (defined in the proto file).
/// </summary>
/// <param name="requestStream">The request stream.</param>
/// <param name="responseStream">The response stream.</param>
/// <param name="context">The server call context.</param>
/// <returns>A task that represents the asynchronous operation.</returns>
/// <remarks>Control channel is used for control messages between the agent worker and the cluster.</remarks>
/// public virtual global::System.Threading.Tasks.Task OpenControlChannel(grpc::IAsyncStreamReader<global::Microsoft.AutoGen.Protobuf.ControlMessage> requestStream, grpc::IServerStreamWriter<global::Microsoft.AutoGen.Protobuf.ControlMessage> responseStream, grpc::ServerCallContext context)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to remove the commented out part

public override async Task OpenControlChannel(IAsyncStreamReader<ControlMessage> requestStream, IServerStreamWriter<ControlMessage> responseStream, ServerCallContext context)
{
try
{
await Gateway.ConnectToWorkerProcess(requestStream, responseStream, context).ConfigureAwait(true);
}
catch

Check warning on line 53 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs#L51-L53

Added lines #L51 - L53 were not covered by tests
{
if (context.CancellationToken.IsCancellationRequested)
{
return;

Check warning on line 57 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs#L57

Added line #L57 was not covered by tests
}
throw;

Check warning on line 59 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs#L59

Added line #L59 was not covered by tests
}
}

Check warning on line 61 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcGatewayService.cs#L61

Added line #L61 was not covered by tests

/// <summary>
/// Adds a subscription.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
// GrpcWorkerConnection.cs
using System.Threading.Channels;
using Grpc.Core;
using Microsoft.AutoGen.Protobuf;

namespace Microsoft.AutoGen.RuntimeGateway.Grpc;

public sealed class GrpcWorkerConnection : IAsyncDisposable
public sealed class GrpcWorkerConnection<TMessage> : IAsyncDisposable
where TMessage : class
{
private static long s_nextConnectionId;
private Task _readTask = Task.CompletedTask;
Expand All @@ -17,13 +17,13 @@
private readonly GrpcGateway _gateway;
private readonly CancellationTokenSource _shutdownCancellationToken = new();
public Task Completion { get; private set; } = Task.CompletedTask;
public GrpcWorkerConnection(GrpcGateway agentWorker, IAsyncStreamReader<Message> requestStream, IServerStreamWriter<Message> responseStream, ServerCallContext context)
public GrpcWorkerConnection(GrpcGateway agentWorker, IAsyncStreamReader<TMessage> requestStream, IServerStreamWriter<TMessage> responseStream, ServerCallContext context)

Check warning on line 20 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs#L20

Added line #L20 was not covered by tests
{
_gateway = agentWorker;
RequestStream = requestStream;
ResponseStream = responseStream;
ServerCallContext = context;
_outboundMessages = Channel.CreateUnbounded<Message>(new UnboundedChannelOptions { AllowSynchronousContinuations = true, SingleReader = true, SingleWriter = false });
_outboundMessages = Channel.CreateUnbounded<TMessage>(new UnboundedChannelOptions { AllowSynchronousContinuations = true, SingleReader = true, SingleWriter = false });

Check warning on line 26 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs#L26

Added line #L26 was not covered by tests
}
public Task Connect()
{
Expand All @@ -50,11 +50,11 @@
return Completion = Task.WhenAll(_readTask, _writeTask);
}

public IAsyncStreamReader<Message> RequestStream { get; }
public IServerStreamWriter<Message> ResponseStream { get; }
public IAsyncStreamReader<TMessage> RequestStream { get; }
public IServerStreamWriter<TMessage> ResponseStream { get; }

Check warning on line 54 in dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs

View check run for this annotation

Codecov / codecov/patch

dotnet/src/Microsoft.AutoGen/RuntimeGateway.Grpc/Services/Grpc/GrpcWorkerConnection.cs#L53-L54

Added lines #L53 - L54 were not covered by tests
public ServerCallContext ServerCallContext { get; }

private readonly Channel<Message> _outboundMessages;
private readonly Channel<TMessage> _outboundMessages;

/// <inheritdoc/>
public void AddSupportedType(string type)
Expand All @@ -75,7 +75,7 @@
}

/// <inheritdoc/>
public async Task SendMessage(Message message)
public async Task SendMessage(TMessage message)
{
await _outboundMessages.Writer.WriteAsync(message).ConfigureAwait(false);
}
Expand Down
Loading
Loading