Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.A2A/A2AAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Shared.Diagnostics;
using Microsoft.Agents.AI;

namespace Microsoft.Agents.AI.A2A;

Expand Down Expand Up @@ -219,6 +220,13 @@ private A2AAgentThread GetA2AThread(AgentThread? thread, AgentRunOptions? option
throw new InvalidOperationException("A thread must be provided when AllowBackgroundResponses is enabled.");
}

// New logic: Check for ContextId
string? contextId = options?.ContextId ?? AgentContext.ContextId;
if (thread is null && contextId is not null)
{
thread = this.GetNewThread(contextId);
}

thread ??= this.GetNewThread();

if (thread is not A2AAgentThread typedThread)
Expand Down
73 changes: 73 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Abstractions/AgentContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Threading;
using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI;

/// <summary>
/// Provides access to the current agent execution context.
/// </summary>
public static class AgentContext
{
private class ContextState
{
public AgentThread? Thread { get; init; }
public string? ContextId { get; init; }
public AdditionalPropertiesDictionary? AdditionalProperties { get; init; }
}

private static readonly AsyncLocal<ContextState?> _current = new();

/// <summary>
/// Gets the current agent thread for the executing operation.
/// </summary>
public static AgentThread? CurrentThread => _current.Value?.Thread;

/// <summary>
/// Gets the current context ID (e.g., session ID, correlation ID).
/// </summary>
public static string? ContextId => _current.Value?.ContextId;

/// <summary>
/// Gets the additional properties associated with the current execution context.
/// </summary>
public static AdditionalPropertiesDictionary? AdditionalProperties => _current.Value?.AdditionalProperties;

/// <summary>
/// Sets the current agent thread for the scope of the returned disposable.
/// </summary>
/// <param name="thread">The thread to set as current.</param>
/// <returns>A disposable that restores the previous thread when disposed.</returns>
public static IDisposable SetCurrentThread(AgentThread? thread)
{
return BeginScope(thread: thread);
}

/// <summary>
/// Begins a new execution scope with the specified context data.
/// </summary>
/// <param name="contextId">The context ID.</param>
/// <param name="thread">The agent thread.</param>
/// <param name="additionalProperties">The additional properties.</param>
/// <returns>A disposable object that restores the previous context when disposed.</returns>
public static IDisposable BeginScope(string? contextId = null, AgentThread? thread = null, AdditionalPropertiesDictionary? additionalProperties = null)
{
var parent = _current.Value;
_current.Value = new ContextState
{
Thread = thread ?? parent?.Thread,
ContextId = contextId ?? parent?.ContextId,
AdditionalProperties = additionalProperties ?? parent?.AdditionalProperties?.Clone()
};
return new DisposableAction(() => _current.Value = parent);
}

private sealed class DisposableAction : IDisposable
{
private readonly Action _action;
public DisposableAction(Action action) => _action = action;
public void Dispose() => _action();
}
}
14 changes: 14 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Abstractions/AgentRunOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ public AgentRunOptions(AgentRunOptions options)
_ = Throw.IfNull(options);
this.ContinuationToken = options.ContinuationToken;
this.AllowBackgroundResponses = options.AllowBackgroundResponses;
this.ContextId = options.ContextId;
this.AdditionalProperties = options.AdditionalProperties?.Clone();
}

/// <summary>
/// Gets or sets the context identifier (e.g., session ID, correlation ID) for the agent run.
/// </summary>
public string? ContextId { get; set; }

/// <summary>
/// Gets or sets the continuation token for resuming and getting the result of the agent response identified by this token.
/// </summary>
Expand Down Expand Up @@ -88,6 +94,14 @@ public AgentRunOptions(AgentRunOptions options)
/// Additional properties provide a way to include custom metadata or provider-specific
/// information that doesn't fit into the standard options schema. This is useful for
/// preserving implementation-specific details or extending the options with custom data.
/// <para>
/// <b>Note on A2A Protocol Usage:</b>
/// When communicating via the A2A protocol, this dictionary maps to the <c>Metadata</c> field of A2A messages.
/// Use this for "out-of-band" information like Trace IDs, Tenant IDs, or routing hints.
/// Do <b>not</b> use this for core message content or data payloads; those should be passed as
/// <see cref="ChatMessage"/> parts (e.g., TextPart, DataPart) in the message body.
/// Use <see cref="ContextId"/> for session or correlation grouping.
/// </para>
/// </remarks>
public AdditionalPropertiesDictionary? AdditionalProperties { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ async Task<A2AResponse> OnMessageReceivedAsync(MessageSendParams messageSendPara
var contextId = messageSendParams.Message.ContextId ?? Guid.NewGuid().ToString("N");
var thread = await hostAgent.GetOrCreateThreadAsync(contextId, cancellationToken).ConfigureAwait(false);

var runOptions = new AgentRunOptions { ContextId = contextId };

var response = await hostAgent.RunAsync(
messageSendParams.ToChatMessages(),
thread: thread,
options: runOptions,
cancellationToken: cancellationToken).ConfigureAwait(false);

await hostAgent.SaveThreadAsync(contextId, thread, cancellationToken).ConfigureAwait(false);
Expand Down
19 changes: 18 additions & 1 deletion dotnet/src/Microsoft.Agents.AI/AgentExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,24 @@ async Task<string> InvokeAgentAsync(
[Description("Input query to invoke the agent.")] string query,
CancellationToken cancellationToken)
{
var response = await agent.RunAsync(query, thread: thread, cancellationToken: cancellationToken).ConfigureAwait(false);
// If no thread was provided at creation time, try to use the current ambient thread context.
var effectiveThread = thread ?? AgentContext.CurrentThread;

// Propagate ambient context (ContextId, AdditionalProperties)
var contextId = AgentContext.ContextId;
var additionalProperties = AgentContext.AdditionalProperties;

AgentRunOptions? runOptions = null;
if (contextId is not null || additionalProperties is not null)
{
runOptions = new AgentRunOptions
{
ContextId = contextId,
AdditionalProperties = additionalProperties?.Clone()
};
}

var response = await agent.RunAsync(query, thread: effectiveThread, options: runOptions, cancellationToken: cancellationToken).ConfigureAwait(false);
return response.Text;
}

Expand Down
72 changes: 39 additions & 33 deletions dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,51 +218,54 @@ public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync

IAsyncEnumerator<ChatResponseUpdate> responseUpdatesEnumerator;

try
{
// Using the enumerator to ensure we consider the case where no updates are returned for notification.
responseUpdatesEnumerator = chatClient.GetStreamingResponseAsync(inputMessagesForChatClient, chatOptions, cancellationToken).GetAsyncEnumerator(cancellationToken);
}
catch (Exception ex)
{
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}

this._logger.LogAgentChatClientInvokedStreamingAgent(nameof(RunStreamingAsync), this.Id, loggingAgentName, this._chatClientType);

bool hasUpdates;
try
{
// Ensure we start the streaming request
hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}

while (hasUpdates)
using (AgentContext.BeginScope(options?.ContextId, safeThread, options?.AdditionalProperties))
{
var update = responseUpdatesEnumerator.Current;
if (update is not null)
try
{
update.AuthorName ??= this.Name;

responseUpdates.Add(update);
yield return new(update) { AgentId = this.Id };
// Using the enumerator to ensure we consider the case where no updates are returned for notification.
responseUpdatesEnumerator = chatClient.GetStreamingResponseAsync(inputMessagesForChatClient, chatOptions, cancellationToken).GetAsyncEnumerator(cancellationToken);
}
catch (Exception ex)
{
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}

this._logger.LogAgentChatClientInvokedStreamingAgent(nameof(RunStreamingAsync), this.Id, loggingAgentName, this._chatClientType);

bool hasUpdates;
try
{
// Ensure we start the streaming request
hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}

while (hasUpdates)
{
var update = responseUpdatesEnumerator.Current;
if (update is not null)
{
update.AuthorName ??= this.Name;

responseUpdates.Add(update);
yield return new(update) { AgentId = this.Id };
}

try
{
hasUpdates = await responseUpdatesEnumerator.MoveNextAsync().ConfigureAwait(false);
}
catch (Exception ex)
{
await NotifyAIContextProviderOfFailureAsync(safeThread, ex, inputMessages, aiContextProviderMessages, cancellationToken).ConfigureAwait(false);
throw;
}
}
}

var chatResponse = responseUpdates.ToChatResponse();
Expand Down Expand Up @@ -394,7 +397,10 @@ private async Task<TAgentRunResponse> RunCoreAsync<TAgentRunResponse, TChatClien
TChatClientResponse chatResponse;
try
{
chatResponse = await chatClientRunFunc.Invoke(chatClient, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false);
using (AgentContext.BeginScope(options?.ContextId, safeThread, options?.AdditionalProperties))
{
chatResponse = await chatClientRunFunc.Invoke(chatClient, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Expand Down