Skip to content

Commit

Permalink
Added small benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
Federico Alterio authored and Federico Alterio committed Dec 18, 2024
1 parent 477b363 commit 0e2fd65
Show file tree
Hide file tree
Showing 11 changed files with 365 additions and 187 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ jobs:
- name: Build
run: dotnet build -c Release ./SignalsDotnet --no-restore
- name: Test
run: dotnet test -c Release ./SignalsDotnet/SignalsDotnet.Tests --no-build --verbosity normal
run: dotnet test -c Release ./SignalsDotnet/SignalsDotnet.Tests --no-build --verbosity normal
- name: Benchmarks
run: |
dotnet build -c Release ./SignalsDotnet/SignalsDotnet.PeformanceTests/
dotnet run -c Release --project ./SignalsDotnet/SignalsDotnet.PeformanceTests/
45 changes: 45 additions & 0 deletions SignalsDotnet/SignalsDotnet.PeformanceTests/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Configs;
using BenchmarkDotNet.Jobs;
using BenchmarkDotNet.Running;
using BenchmarkDotNet.Toolchains.InProcess.NoEmit;
using SignalsDotnet;

BenchmarkRunner.Run<ComputedBenchmarks>();

public class BenchmarkConfig : ManualConfig
{
public BenchmarkConfig()
{
AddJob(Job.MediumRun
.WithToolchain(InProcessNoEmitToolchain.Instance));
}
}

[MemoryDiagnoser]
[Config(typeof(BenchmarkConfig))]
public class ComputedBenchmarks : IDisposable
{
readonly Signal<int> _signal = new(0);
readonly IReadOnlySignal<int> _computed;

public ComputedBenchmarks()
{
_computed = Signal.Computed(() => _signal.Value, x => x with{SubscribeWeakly = false});
_ = _computed.Value;
}

[Benchmark]
public int ComputedRoundTrip()
{
_ = _computed.Value;
_signal.Value = 0;
_signal.Value = 1;
return _computed.Value;
}

public void Dispose()
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="BenchmarkDotNet" Version="0.14.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\SignalsDotnet\SignalsDotnet.csproj" />
</ItemGroup>

</Project>
4 changes: 4 additions & 0 deletions SignalsDotnet/SignalsDotnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SignalsDotnet", "SignalsDot
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SignalsDotnet.Tests", "SignalsDotnet.Tests\SignalsDotnet.Tests.csproj", "{D6DA2778-8C20-4D1D-A4E7-956FEAE51859}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SignalsDotnet.PeformanceTests", "SignalsDotnet.PeformanceTests\SignalsDotnet.PeformanceTests.csproj", "{C7366C63-2562-435B-B78D-9D51C0649CBA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -21,6 +23,8 @@ Global
{D6DA2778-8C20-4D1D-A4E7-956FEAE51859}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D6DA2778-8C20-4D1D-A4E7-956FEAE51859}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D6DA2778-8C20-4D1D-A4E7-956FEAE51859}.Release|Any CPU.Build.0 = Release|Any CPU
{C7366C63-2562-435B-B78D-9D51C0649CBA}.Debug|Any CPU.ActiveCfg = Debug|AnyCPU
{C7366C63-2562-435B-B78D-9D51C0649CBA}.Release|Any CPU.ActiveCfg = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
165 changes: 165 additions & 0 deletions SignalsDotnet/SignalsDotnet/Internals/ComputedObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
using SignalsDotnet.Helpers;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive;
using ObservableEx = SignalsDotnet.Internals.Helpers.ObservableEx;

namespace SignalsDotnet.Internals;

internal class ComputedObservable<T> : IObservable<T>
{
readonly Func<CancellationToken, ValueTask<T>> _func;
readonly Func<Optional<T>> _fallbackValue;
readonly Func<Unit, IObservable<Unit>>? _scheduler;
readonly ConcurrentChangeStrategy _concurrentChangeStrategy;

public ComputedObservable(Func<CancellationToken, ValueTask<T>> func,
Func<Optional<T>> fallbackValue,
Func<Unit, IObservable<Unit>>? scheduler = null,
ConcurrentChangeStrategy concurrentChangeStrategy = default)
{
_func = func;
_fallbackValue = fallbackValue;
_scheduler = scheduler;
_concurrentChangeStrategy = concurrentChangeStrategy;
}

public IDisposable Subscribe(IObserver<T> observer) => new Subscription(this, observer);

class Subscription : IDisposable
{
readonly ComputedObservable<T> _observable;
readonly IObserver<T> _observer;
readonly MultipleAssignmentDisposable _disposable = new();
public Subscription(ComputedObservable<T> observable, IObserver<T> observer)
{
_observable = observable;
_observer = observer;
_disposable.Disposable = ObservableEx.FromAsyncUsingAsyncContext(ComputeResult)
.Take(1)
.Subscribe(OnNewResult);
}

void OnNewResult(ComputationResult result)
{
var valueNotified = false;

_disposable.Disposable = result.ShouldComputeNextResult
.Take(1)
.SelectMany(_ =>
{
NotifyValueIfNotAlready();
return ObservableEx.FromAsyncUsingAsyncContext(ComputeResult);
})
.Subscribe(OnNewResult);
NotifyValueIfNotAlready();

// We notify a new value only if the func() evaluation succeeds.
void NotifyValueIfNotAlready()
{
if (valueNotified)
return;

valueNotified = true;
if (result.ResultOptional.TryGetValue(out var propertyValue))
{
_observer.OnNext(propertyValue);
}
}
}

async ValueTask<ComputationResult> ComputeResult(CancellationToken cancellationToken)
{
var referenceEquality = ReferenceEqualityComparer.Instance;
HashSet<IReadOnlySignal> signalRequested = new(referenceEquality);
Optional<T> result;
BehaviorSubject<bool> stopListeningForSignals = new(false);

var signalChangedObservable = Signal.SignalsRequested()
.TakeUntil(stopListeningForSignals.Where(x => x))
.Where(x => signalRequested.Add(x))
.Select(x => x.Changed.Skip(1))
.Merge()
.Take(1);

if (_observable._concurrentChangeStrategy == ConcurrentChangeStrategy.CancelCurrent)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cancellationToken = cts.Token;
signalChangedObservable = signalChangedObservable.Do(_ => cts.Cancel())
.Finally(cts.Dispose);
}

var scheduler = _observable._scheduler;
if (scheduler is not null)
{
signalChangedObservable = signalChangedObservable.Select(scheduler)
.Switch();
}

var shouldComputeNextResult = signalChangedObservable.Replay(1);

var disconnect = shouldComputeNextResult.Connect();

try
{
try
{
result = new(await _observable._func(cancellationToken));
}
finally
{
stopListeningForSignals.OnNext(true);
stopListeningForSignals.OnCompleted();
}
}
catch (OperationCanceledException)
{
result = Optional<T>.Empty;
}
catch
{
// If something fails, the property will have the previous result,
// We still have to observe for the properties to change (maybe next time the exception will not be thrown)
try
{
result = _observable._fallbackValue();
}
catch
{
result = Optional<T>.Empty;
}
}

var resultObservable = new DisconnectOnDisposeObservable<Unit>(shouldComputeNextResult, Disposable.Create(() =>
{
disconnect.Dispose();
}));

return new(resultObservable, result);
}


public void Dispose() => _disposable.Dispose();
}

record struct ComputationResult(IObservable<Unit> ShouldComputeNextResult, Optional<T> ResultOptional);
class DisconnectOnDisposeObservable<TV> : IObservable<TV>
{
readonly IObservable<TV> _observable;
readonly IDisposable _disconnect;

public DisconnectOnDisposeObservable(IObservable<TV> observable, IDisposable disconnect)
{
_observable = observable;
_disconnect = disconnect;
}

public IDisposable Subscribe(IObserver<TV> observer)
{
_observable.Subscribe(observer);
return Disposable.Create(() => _disconnect.Dispose());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void NotifySomeoneAskedAValue()
_someoneAskedValueSubject.Dispose();
}

public IDisposable Subscribe(IObserver<T?> observer) => this.OnPropertyChanged(nameof(Value), () => Value)
public IDisposable Subscribe(IObserver<T?> observer) => this.OnPropertyChanged()
.Subscribe(observer);

public bool Equals(FromObservableSignal<T?>? other)
Expand Down
45 changes: 20 additions & 25 deletions SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableEx.cs
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Disposables;

namespace SignalsDotnet.Internals.Helpers;

internal static class ObservableEx
{
public static IObservable<T> FromAsyncUsingAsyncContext<T>(Func<CancellationToken, Task<T>> asyncAction)
public static IObservable<T> FromAsyncUsingAsyncContext<T>(Func<CancellationToken, ValueTask<T>> asyncAction)
{
if (asyncAction is null)
throw new ArgumentNullException(nameof(asyncAction));
return new FromAsyncContextObservable<T>(asyncAction);
}

public readonly struct FromAsyncContextObservable<T> : IObservable<T>
{
readonly Func<CancellationToken, ValueTask<T>> _asyncAction;

return Observable.Create<T>(observer =>
public FromAsyncContextObservable(Func<CancellationToken, ValueTask<T>> asyncAction)
{
_asyncAction = asyncAction;
}

public IDisposable Subscribe(IObserver<T> observer)
{
var disposable = new CancellationDisposable();
var token = disposable.Token;

try
{
var task = asyncAction(token);
if (task.IsFaulted)
var task = _asyncAction(token);
if (task.IsCompleted)
{
var exception = task.GetSingleException();
observer.OnError(exception);
observer.OnNext(task.GetAwaiter().GetResult());
observer.OnCompleted();
return disposable;
}

Expand All @@ -34,9 +41,9 @@ public static IObservable<T> FromAsyncUsingAsyncContext<T>(Func<CancellationToke
}

return disposable;
});
}

async void BindObserverToTask(Task<T> task, IObserver<T> observer)
static async void BindObserverToTask(ValueTask<T> task, IObserver<T> observer)
{
try
{
Expand All @@ -57,16 +64,4 @@ async void BindObserverToTask(Task<T> task, IObserver<T> observer)
}
}
}

static Exception GetSingleException(this Task t)
{
Debug.Assert(t is { IsFaulted: true, Exception: not null });

if (t.Exception!.InnerException != null)
{
return t.Exception.InnerException;
}

return t.Exception;
}
}
Loading

0 comments on commit 0e2fd65

Please sign in to comment.