Skip to content

Commit

Permalink
.Net Processes - Update Runtime Exception Handling / Logging (microso…
Browse files Browse the repository at this point in the history
…ft#9457)

### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

Throwing and logging exceptions in the runtime is a little verbose. It
also creates quite a few conditional arcs for test coverage (or lack
there-of). Utility method streamlines this pattern and eliminates all
but one conditional arc.

```c#
throw new KernelException("message").Log(this._logger);
```

Note: Considered this pattern, opted on favoring the _never-null_ object
as the root of the extention method.

```c#
throw this._logger.LogException(new KernelException("message"));
```


### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

- Introduced shared exception extension to streamline logging and
minimize conditional arcs.
- Switched to `LogError` override that accepts the exception object.
- Initialize logger based on `Kernel.LoggingFactory` (was previously
always null)
- Opportunistically added parameter names to usage of `Verify`
- Opportunistically removed `async`/`await` for methods where not
required.

### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄
  • Loading branch information
crickman authored Oct 30, 2024
1 parent c371705 commit 82f248f
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,39 @@ public sealed class LocalKernelProcessContext : IDisposable

internal LocalKernelProcessContext(KernelProcess process, Kernel kernel)
{
Verify.NotNull(process);
Verify.NotNull(process, nameof(process));
Verify.NotNull(kernel, nameof(kernel));
Verify.NotNullOrWhiteSpace(process.State?.Name);
Verify.NotNull(kernel);

this._kernel = kernel;
this._localProcess = new LocalProcess(
process,
kernel: kernel,
parentProcessId: null,
loggerFactory: null);
this._localProcess = new LocalProcess(process, kernel);
}

internal async Task StartWithEventAsync(KernelProcessEvent? initialEvent, Kernel? kernel = null)
{
await this._localProcess.RunOnceAsync(initialEvent).ConfigureAwait(false);
}
internal Task StartWithEventAsync(KernelProcessEvent? initialEvent, Kernel? kernel = null) =>
this._localProcess.RunOnceAsync(initialEvent, kernel);

/// <summary>
/// Sends a message to the process.
/// </summary>
/// <param name="processEvent">The event to sent to the process.</param>
/// <returns>A <see cref="Task"/></returns>
public async Task SendEventAsync(KernelProcessEvent processEvent) =>
await this._localProcess.SendMessageAsync(processEvent).ConfigureAwait(false);
public Task SendEventAsync(KernelProcessEvent processEvent) =>
this._localProcess.SendMessageAsync(processEvent);

/// <summary>
/// Stops the process.
/// </summary>
/// <returns>A <see cref="Task"/></returns>
public async Task StopAsync() => await this._localProcess.StopAsync().ConfigureAwait(false);
public Task StopAsync() => this._localProcess.StopAsync();

/// <summary>
/// Gets a snapshot of the current state of the process.
/// </summary>
/// <returns>A <see cref="Task{T}"/> where T is <see cref="KernelProcess"/></returns>
public async Task<KernelProcess> GetStateAsync() => await this._localProcess.GetProcessInfoAsync().ConfigureAwait(false);
public Task<KernelProcess> GetStateAsync() => this._localProcess.GetProcessInfoAsync();

/// <summary>
/// Disposes of the resources used by the process.
/// </summary>
public void Dispose() => this._localProcess?.Dispose();
public void Dispose() => this._localProcess.Dispose();
}
33 changes: 13 additions & 20 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ internal sealed class LocalProcess : LocalStep, IDisposable
/// <param name="process">The <see cref="KernelProcess"/> instance.</param>
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="parentProcessId">Optional. The Id of the parent process if one exists, otherwise null.</param>
/// <param name="loggerFactory">Optional. A <see cref="ILoggerFactory"/>.</param>
internal LocalProcess(KernelProcess process, Kernel kernel, string? parentProcessId = null, ILoggerFactory? loggerFactory = null)
: base(process, kernel, parentProcessId, loggerFactory)
internal LocalProcess(KernelProcess process, Kernel kernel, string? parentProcessId = null)
: base(process, kernel, parentProcessId)
{
Verify.NotNull(process.Steps);

Expand All @@ -47,7 +46,7 @@ internal LocalProcess(KernelProcess process, Kernel kernel, string? parentProces
this._externalEventChannel = Channel.CreateUnbounded<KernelProcessEvent>();
this._joinableTaskContext = new JoinableTaskContext();
this._joinableTaskFactory = new JoinableTaskFactory(this._joinableTaskContext);
this._logger = this.LoggerFactory?.CreateLogger(this.Name) ?? new NullLogger<LocalStep>();
this._logger = this._kernel.LoggerFactory?.CreateLogger(this.Name) ?? new NullLogger<LocalStep>();
}

/// <summary>
Expand Down Expand Up @@ -75,7 +74,8 @@ internal async Task StartAsync(Kernel? kernel = null, bool keepAlive = true)
/// <returns>A <see cref="Task"/></returns>
internal async Task RunOnceAsync(KernelProcessEvent? processEvent, Kernel? kernel = null)
{
Verify.NotNull(processEvent);
Verify.NotNull(processEvent, nameof(processEvent));

await Task.Yield(); // Ensure that the process has an opportunity to run in a different synchronization context.
await this._externalEventChannel.Writer.WriteAsync(processEvent).ConfigureAwait(false);
await this.StartAsync(kernel, keepAlive: false).ConfigureAwait(false);
Expand Down Expand Up @@ -117,20 +117,17 @@ internal async Task StopAsync()
/// <param name="processEvent">Required. The <see cref="KernelProcessEvent"/> to start the process with.</param>
/// <param name="kernel">Optional. A <see cref="Kernel"/> to use when executing the process.</param>
/// <returns>A <see cref="Task"/></returns>
internal async Task SendMessageAsync(KernelProcessEvent processEvent, Kernel? kernel = null)
internal Task SendMessageAsync(KernelProcessEvent processEvent, Kernel? kernel = null)
{
Verify.NotNull(processEvent);
await this._externalEventChannel.Writer.WriteAsync(processEvent).ConfigureAwait(false);
Verify.NotNull(processEvent, nameof(processEvent));
return this._externalEventChannel.Writer.WriteAsync(processEvent).AsTask();
}

/// <summary>
/// Gets the process information.
/// </summary>
/// <returns>An instance of <see cref="KernelProcess"/></returns>
internal async Task<KernelProcess> GetProcessInfoAsync()
{
return await this.ToKernelProcessAsync().ConfigureAwait(false);
}
internal Task<KernelProcess> GetProcessInfoAsync() => this.ToKernelProcessAsync();

/// <summary>
/// Handles a <see cref="ProcessMessage"/> that has been sent to the process. This happens only in the case
Expand All @@ -144,9 +141,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message)
{
if (string.IsNullOrWhiteSpace(message.TargetEventId))
{
string errorMessage = "Internal Process Error: The target event id must be specified when sending a message to a step.";
this._logger.LogError("{ErrorMessage}", errorMessage);
throw new KernelException(errorMessage);
throw new KernelException("Internal Process Error: The target event id must be specified when sending a message to a step.").Log(this._logger);
}

string eventId = message.TargetEventId!;
Expand Down Expand Up @@ -191,8 +186,7 @@ private ValueTask InitializeProcessAsync()
var process = new LocalProcess(
process: kernelStep,
kernel: this._kernel,
parentProcessId: this.Id,
loggerFactory: this.LoggerFactory);
parentProcessId: this.Id);

//await process.StartAsync(kernel: this._kernel, keepAlive: true).ConfigureAwait(false);
localStep = process;
Expand All @@ -205,8 +199,7 @@ private ValueTask InitializeProcessAsync()
localStep = new LocalStep(
stepInfo: step,
kernel: this._kernel,
parentProcessId: this.Id,
loggerFactory: this.LoggerFactory);
parentProcessId: this.Id);
}

this._steps.Add(localStep);
Expand Down Expand Up @@ -283,7 +276,7 @@ private async Task Internal_ExecuteAsync(Kernel? kernel = null, int maxSuperstep
}
catch (Exception ex)
{
this._logger?.LogError("An error occurred while running the process: {ErrorMessage}.", ex.Message);
this._logger?.LogError(ex, "An error occurred while running the process.");
throw;
}
finally
Expand Down
43 changes: 20 additions & 23 deletions dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ internal class LocalStep : IKernelProcessMessageChannel
private readonly ILogger _logger;

protected readonly Kernel _kernel;
protected readonly Dictionary<string, KernelFunction> _functions = [];

protected KernelProcessStepState _stepState;
protected Dictionary<string, Dictionary<string, object?>?>? _inputs = [];
protected Dictionary<string, Dictionary<string, object?>?>? _initialInputs = [];
protected readonly Dictionary<string, KernelFunction> _functions = [];
protected readonly string? ParentProcessId;
protected readonly ILoggerFactory? LoggerFactory;
protected Dictionary<string, List<KernelProcessEdge>> _outputEdges;

/// <summary>
Expand All @@ -38,30 +37,34 @@ internal class LocalStep : IKernelProcessMessageChannel
/// <param name="stepInfo">An instance of <see cref="KernelProcessStepInfo"/></param>
/// <param name="kernel">Required. An instance of <see cref="Kernel"/>.</param>
/// <param name="parentProcessId">Optional. The Id of the parent process if one exists.</param>
/// <param name="loggerFactory">An instance of <see cref="LoggerFactory"/> used to create loggers.</param>
public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentProcessId = null, ILoggerFactory? loggerFactory = null)
public LocalStep(KernelProcessStepInfo stepInfo, Kernel kernel, string? parentProcessId = null)
{
Verify.NotNull(kernel, nameof(kernel));
Verify.NotNull(stepInfo, nameof(stepInfo));

// This special handling will be removed with the refactoring of KernelProcessState
if (string.IsNullOrEmpty(stepInfo.State.Id) && stepInfo is KernelProcess)
{
stepInfo = stepInfo with { State = stepInfo.State with { Id = Guid.NewGuid().ToString() } };
}

Verify.NotNull(stepInfo);
Verify.NotNull(kernel);
Verify.NotNull(stepInfo.State.Id);

this.ParentProcessId = parentProcessId;
this.LoggerFactory = loggerFactory;
this._kernel = kernel;
this._stepInfo = stepInfo;
this._stepState = stepInfo.State;
this._initializeTask = new Lazy<ValueTask>(this.InitializeStepAsync);
this._logger = this.LoggerFactory?.CreateLogger(this._stepInfo.InnerStepType) ?? new NullLogger<LocalStep>();
this._logger = this._kernel.LoggerFactory?.CreateLogger(this._stepInfo.InnerStepType) ?? new NullLogger<LocalStep>();
this._outputEdges = this._stepInfo.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList());
this._eventNamespace = $"{this._stepInfo.State.Name}_{this._stepInfo.State.Id}";
}

/// <summary>
/// The Id of the parent process if one exists.
/// </summary>
protected string? ParentProcessId { get; }

/// <summary>
/// The name of the step.
/// </summary>
Expand Down Expand Up @@ -122,14 +125,14 @@ public ValueTask EmitEventAsync(KernelProcessEvent processEvent)
/// <exception cref="KernelException"></exception>
internal virtual async Task HandleMessageAsync(ProcessMessage message)
{
Verify.NotNull(message);
Verify.NotNull(message, nameof(message));

// Lazy one-time initialization of the step before processing a message
await this._initializeTask.Value.ConfigureAwait(false);

if (this._functions is null || this._inputs is null || this._initialInputs is null)
{
throw new KernelException("The step has not been initialized.");
throw new KernelException("The step has not been initialized.").Log(this._logger);
}

string messageLogParameters = string.Join(", ", message.Values.Select(kvp => $"{kvp.Key}: {kvp.Value}"));
Expand Down Expand Up @@ -232,28 +235,22 @@ protected virtual async ValueTask InitializeStepAsync()

if (stateObject is null)
{
var errorMessage = "The state object for the KernelProcessStep could not be created.";
this._logger.LogError("{ErrorMessage}", errorMessage);
throw new KernelException(errorMessage);
throw new KernelException("The state object for the KernelProcessStep could not be created.").Log(this._logger);
}

MethodInfo? methodInfo = this._stepInfo.InnerStepType.GetMethod(nameof(KernelProcessStep.ActivateAsync), [stateType]);

if (methodInfo is null)
{
var errorMessage = "The ActivateAsync method for the KernelProcessStep could not be found.";
this._logger.LogError("{ErrorMessage}", errorMessage);
throw new KernelException(errorMessage);
throw new KernelException("The ActivateAsync method for the KernelProcessStep could not be found.").Log(this._logger);
}

this._stepState = stateObject;

ValueTask? activateTask = (ValueTask?)methodInfo.Invoke(stepInstance, [stateObject]);
if (activateTask == null)
{
var errorMessage = "The ActivateAsync method failed to complete.";
this._logger.LogError("{ErrorMessage}", errorMessage);
throw new KernelException(errorMessage);
throw new KernelException("The ActivateAsync method failed to complete.").Log(this._logger);
}

await stepInstance.ActivateAsync(stateObject).ConfigureAwait(false);
Expand Down Expand Up @@ -282,7 +279,7 @@ internal virtual async Task<KernelProcessStepInfo> ToKernelProcessStepInfoAsync(
// This allows state information to be extracted even if the step has not been activated.
await this._initializeTask.Value.ConfigureAwait(false);

var stepInfo = new KernelProcessStepInfo(this._stepInfo.InnerStepType, this._stepState!, this._outputEdges);
KernelProcessStepInfo stepInfo = new(this._stepInfo.InnerStepType, this._stepState!, this._outputEdges);
return stepInfo;
}

Expand All @@ -303,7 +300,7 @@ protected void EmitEvent(ProcessEvent localEvent)
/// <returns>A <see cref="ProcessEvent"/> with the correctly scoped namespace.</returns>
protected ProcessEvent ScopedEvent(ProcessEvent localEvent)
{
Verify.NotNull(localEvent);
Verify.NotNull(localEvent, nameof(localEvent));
return localEvent with { Namespace = $"{this.Name}_{this.Id}" };
}

Expand All @@ -314,7 +311,7 @@ protected ProcessEvent ScopedEvent(ProcessEvent localEvent)
/// <returns>A <see cref="ProcessEvent"/> with the correctly scoped namespace.</returns>
protected ProcessEvent ScopedEvent(KernelProcessEvent processEvent)
{
Verify.NotNull(processEvent);
Verify.NotNull(processEvent, nameof(processEvent));
return ProcessEvent.FromKernelProcessEvent(processEvent, $"{this.Name}_{this.Id}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ internal sealed class ProcessActor : StepActor, IProcess, IDisposable
/// </summary>
/// <param name="host">The Dapr host actor</param>
/// <param name="kernel">An instance of <see cref="Kernel"/></param>
/// <param name="loggerFactory">Optional. A <see cref="ILoggerFactory"/>.</param>
public ProcessActor(ActorHost host, Kernel kernel, ILoggerFactory? loggerFactory)
: base(host, kernel, loggerFactory)
public ProcessActor(ActorHost host, Kernel kernel)
: base(host, kernel)
{
this._externalEventChannel = Channel.CreateUnbounded<KernelProcessEvent>();
this._joinableTaskContext = new JoinableTaskContext();
Expand Down Expand Up @@ -78,7 +77,7 @@ public Task StartAsync(bool keepAlive)
{
if (!this._isInitialized)
{
throw new InvalidOperationException("The process cannot be started before it has been initialized.");
throw new InvalidOperationException("The process cannot be started before it has been initialized.").Log(this._logger);
}

this._processCancelSource = new CancellationTokenSource();
Expand All @@ -96,7 +95,7 @@ public Task StartAsync(bool keepAlive)
/// <returns>A <see cref="Task"/></returns>
public async Task RunOnceAsync(KernelProcessEvent processEvent)
{
Verify.NotNull(processEvent);
Verify.NotNull(processEvent, nameof(processEvent));
var externalEventQueue = this.ProxyFactory.CreateActorProxy<IExternalEventBuffer>(new ActorId(this.Id.GetId()), nameof(ExternalEventBufferActor));
await externalEventQueue.EnqueueAsync(processEvent).ConfigureAwait(false);
await this.StartAsync(keepAlive: false).ConfigureAwait(false);
Expand Down Expand Up @@ -139,7 +138,7 @@ public async Task StopAsync()
/// <returns>A <see cref="Task"/></returns>
public async Task SendMessageAsync(KernelProcessEvent processEvent)
{
Verify.NotNull(processEvent);
Verify.NotNull(processEvent, nameof(processEvent));
await this._externalEventChannel.Writer.WriteAsync(processEvent).ConfigureAwait(false);
}

Expand Down Expand Up @@ -175,7 +174,7 @@ protected override async Task OnActivateAsync()
/// <summary>
/// The name of the step.
/// </summary>
protected override string Name => this._process?.State.Name ?? throw new KernelException("The Process must be initialized before accessing the Name property.");
protected override string Name => this._process?.State.Name ?? throw new KernelException("The Process must be initialized before accessing the Name property.").Log(this._logger);

#endregion

Expand All @@ -189,9 +188,7 @@ internal override async Task HandleMessageAsync(ProcessMessage message)
{
if (string.IsNullOrWhiteSpace(message.TargetEventId))
{
string errorMessage = "Internal Process Error: The target event id must be specified when sending a message to a step.";
this._logger?.LogError("{ErrorMessage}", errorMessage);
throw new KernelException(errorMessage);
throw new KernelException("Internal Process Error: The target event id must be specified when sending a message to a step.").Log(this._logger);
}

string eventId = message.TargetEventId!;
Expand Down Expand Up @@ -223,13 +220,13 @@ protected override ValueTask ActivateStepAsync()

private async Task InitializeProcessActorAsync(DaprProcessInfo processInfo, string? parentProcessId)
{
Verify.NotNull(processInfo);
Verify.NotNull(processInfo, nameof(processInfo));
Verify.NotNull(processInfo.Steps);

this.ParentProcessId = parentProcessId;
this._process = processInfo;
this._stepsInfos = new List<DaprStepInfo>(this._process.Steps);
this._logger = this.LoggerFactory?.CreateLogger(this._process.State.Name) ?? new NullLogger<ProcessActor>();
this._logger = this._kernel.LoggerFactory?.CreateLogger(this._process.State.Name) ?? new NullLogger<ProcessActor>();

// Initialize the input and output edges for the process
this._outputEdges = this._process.Edges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToList());
Expand Down
Loading

0 comments on commit 82f248f

Please sign in to comment.