diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml deleted file mode 100644 index 7792134..0000000 --- a/.github/workflows/dotnet.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: .NET - -on: - push: - branches: [ "*" ] - -jobs: - build: - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v4 - - name: Setup .NET - uses: actions/setup-dotnet@v4 - with: - dotnet-version: 6.0.x - - name: Restore dependencies - run: dotnet restore ./SignalsDotnet - - name: Build - run: dotnet build ./SignalsDotnet --no-restore - - name: Test - run: dotnet test ./SignalsDotnet/SignalsDotnet.Tests --no-build --verbosity normal diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml new file mode 100644 index 0000000..bf67f4c --- /dev/null +++ b/.github/workflows/push.yml @@ -0,0 +1,27 @@ +name: .NET + +on: + push: + branches: [ "*" ] + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v4 + with: + dotnet-version: 6.0.x + - name: Restore dependencies + run: dotnet restore ./SignalsDotnet + - name: Build + run: dotnet build -c Release ./SignalsDotnet --no-restore + - name: Test + run: dotnet test -c Release ./SignalsDotnet/SignalsDotnet.Tests --no-build --verbosity normal + - name: Benchmarks + run: | + dotnet build -c Release ./SignalsDotnet/SignalsDotnet.PeformanceTests/ + dotnet run -c Release --project ./SignalsDotnet/SignalsDotnet.PeformanceTests/ \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..ca54f90 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,27 @@ +name: Release to NuGet + +on: + release: + types: [published] + +jobs: + build: + runs-on: ubuntu-latest + timeout-minutes: 5 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Setup .NET + uses: actions/setup-dotnet@v4 + with: + dotnet-version: 6.0.x + - name: Restore dependencies + run: dotnet restore ./SignalsDotnet + - name: Build + run: dotnet build -c Release ./SignalsDotnet --no-restore + - name: Test + run: dotnet test -c Release ./SignalsDotnet/SignalsDotnet.Tests --no-build --verbosity normal + - name: Pack nugets + run: dotnet pack SignalsDotnet/SignalsDotnet -c Release --no-build --output . + - name: Push to NuGet + run: dotnet nuget push "*.nupkg" --api-key ${{secrets.nuget_api_key}} --source https://api.nuget.org/v3/index.json \ No newline at end of file diff --git a/README.md b/README.md index 050f3b7..2ae48a2 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,10 @@ + + + # Nuget https://www.nuget.org/packages/SignalsDotnet # Angular Signals for .Net -This library is a porting of the Angular Signals in the .Net World, adapted to the .Net MVVM UI Frameworks and based on ReactiveX. +This library is a porting of the Angular Signals in the .Net World, adapted to the .Net MVVM UI Frameworks and based on [R3](https://github.com/Cysharp/R3) (variant of ReactiveX). If you need an introduction to what a signal is, try to see: https://angular.io/guide/signals. # Get Started @@ -40,6 +43,86 @@ public static class DelegateCommandExtensions ## Example 2 ```c# +public class LoginViewModel : IActivatableViewModel +{ + // Value set from outside. + public Signal IsDeactivated { get; } = new(false); + public LoginViewModel() + { + var computedFactory = ComputedSignalFactory.Default + .DisconnectEverythingWhen(isDeactivated.Values) + .OnException(exception => + { + /* log or do something with it */ + }); + + // Will be cancelled on deactivation, of if the username signal changes during the await + IsUsernameValid = computedFactory.AsyncComputed(async cancellationToken => await IsUsernameValidAsync(Username.Value, cancellationToken), + false, + ConcurrentChangeStrategy.CancelCurrent); + + + // async computed signals have a (sync) signal that notifies us when the async computation is running + CanLogin = computedFactory.Computed(() => !IsUsernameValid.IsComputing.Value + && IsUsernameValid.Value + && !string.IsNullOrWhiteSpace(Password.Value)); + + computedFactory.Effect(UpdateApiCalls); + + // This signal will be recomputed both when the collection changes, and when endDate of the last element changes automatically! + TotalApiCallsText = computedFactory.Computed(() => + { + var lastCall = ApiCalls.Value.LastOrDefault(); + return $"Total api calls: {ApiCalls.Value.Count}. Last started at {lastCall?.StartedAt}, and ended at {lastCall?.EndedAt.Value}"; + })!; + + // Signals are observable, so they can easily integrated with reactiveUI + LoginCommand = ReactiveCommand.Create(() => { /* login.. */ }, CanLogin); + } + + public ViewModelActivator Activator { get; } = new(); + public ReactiveCommand LoginCommand { get; } + public Signal Username { get; } = new(""); + public Signal Password { get; } = new(""); + public IAsyncReadOnlySignal IsUsernameValid { get; } + public IReadOnlySignal CanLogin { get; } + public IReadOnlySignal TotalApiCallsText { get; } + public IReadOnlySignal> ApiCalls { get; } = new ObservableCollection().ToCollectionSignal(); + + async Task IsUsernameValidAsync(string? username, CancellationToken cancellationToken) + { + await Task.Delay(3000, cancellationToken); + return username?.Length > 2; + } + void UpdateApiCalls() + { + var isComputingUsername = IsUsernameValid.IsComputing.Value; + using var _ = Signal.UntrackedScope(); + + if (isComputingUsername) + { + ApiCalls.Value.Add(new ApiCall(startedAt: DateTime.Now)); + return; + } + + var call = ApiCalls.Value.LastOrDefault(); + if (call is { EndedAt.Value: null }) + { + call.EndedAt.Value = DateTime.Now; + } + } +} + +public class ApiCall(DateTime startedAt) +{ + public DateTime StartedAt => startedAt; + public Signal EndedAt { get; } = new(); +} +``` + + +## Example 3 +```c# public class YoungestPersonViewModel {     public YoungestPersonViewModel() @@ -84,21 +167,21 @@ public class City public record PersonCoordinates(Person Person, Room Room, House House, City City); ``` -Every signal implements the IObservable interface, so we can apply against them all ReactiveX operators we want. -## `Singal` +Every signal has a property `Values` that is an Observable and notifies us whenever the signal changes. +## `Signal` ```c# public Signal Person { get; } = new(); public Signal Person2 { get; } = new(config => config with { Comparer = new CustomPersonEqualityComparer() }); ``` -A `Singal` is a wrapper around a `T`. It has a property `Value` that can be set, and that when changed raises the INotifyPropertyChanged event. +A `Signal` is a wrapper around a `T`. It has a property `Value` that can be set, and that when changed raises the INotifyPropertyChanged event. It is possible to specify a custom `EqualityComparer` that will be used to check if raise the `PropertyChanged` event. It is also possible to force it to raise the event everytime someone sets the property -## `CollectionSingal` +## `CollectionSignal` -A `CollectionSingal` is a wrapper around an `ObservableCollection` (or in general something that implements the `INotifyCollectionChanged` interface). It listens to both changes of its Value Property, and modifications of the `ObservableCollection` it is wrapping +A `CollectionSignal` is a wrapper around an `ObservableCollection` (or in general something that implements the `INotifyCollectionChanged` interface). It listens to both changes of its Value Property, and modifications of the `ObservableCollection` it is wrapping It is possible to specify a custom `EqualityComparer` that will be used to check if raise the `PropertyChanged` event. It is also possible to force it to raise the event everytime someone sets the property @@ -117,32 +200,56 @@ public CollectionSignal> People { get; } = new(coll ## Computed Signals ```c# -public class LoginViewModel -{ - public LoginViewModel() - { - CanLogin = Signal.Computed(() => !string.IsNullOrWhiteSpace(Username.Value) && !string.IsNullOrWhiteSpace(Password.Value)); - } - - public Signal Username { get; } = new(); - public Signal Password { get; } = new(); - public IReadOnlySignal CanLogin { get; } -} + public LoginViewModel() + { + IObservable isDeactivated = this.IsDeactivated(); + + var computedFactory = ComputedSignalFactory.Default + .DisconnectEverythingWhen(isDeactivated) + .OnException(exception => + { + /* log or do something with it */ + }); + + IsUsernameValid = computedFactory.AsyncComputed(async cancellationToken => await IsUsernameValidAsync(Username.Value, cancellationToken), + false, + ConcurrentChangeStrategy.CancelCurrent); + + + CanLogin = computedFactory.Computed(() => !IsUsernameValid.IsComputing.Value + && IsUsernameValid.Value + && !string.IsNullOrWhiteSpace(Password.Value)); + } ``` A computed signal, is a signal that depends by other signals. -Basically to create it you need to pass a function that computes the value. +Basically to create it you need to pass a function that computes the value. That function can be synchronous or asynchronous. It automatically recognize which are the signals it depends by, and listen for them to change. Whenever a signal changes, the function is executed again, and a new value is produced (the `INotifyPropertyChanged` is raised). -It is possible to specify whether or not to subscribe weakly (default option), or strongly. It is possible also here to specify a custom `EqualityComparer` +It is possible to specify whether or not to subscribe weakly, or strongly (default option). It is possible also here to specify a custom `EqualityComparer`. + +Usually you want to stop all asynchronous computation according to some boolean condition. +This can be easily done via `ComputedSignalFactory.DisconnectEverythingWhen(isDeactivated)`. Whenever the isDeactivated observable notfies `true`, every pending async computation will be cancelled. Later on, when it notifies a `false`, all the computed signals will be recomputed again. + +You can find useful also `CancellationSignal.Create(booleanObservable)`, that converts a boolean observable into a `IReadOnlySignal`, that automatically creates, cancels and disposes new cancellation tokens according to a boolean observable. + +## ConcurrentChangeStrategy +In an async computed signal, the signals it depends by can be changed while the computation function is running. You can use the enum `ConcurrentChangeStrategy` to specify what you want to do in that cases. For now there are 2 options: + +- `ConcurrentChangeStrategy.CancelCurrent`: The current cancellationToken will be cancelled, and a new computation will start immediately + +- `ConcurrentChangeStrategy.ScheduleNext`: The current cancellationToken will NOT be cancelled, and a new computation will be queued up immediately after the current. Note that only 1 computation can be queued up at most. So using this option, multiple concurrent changes are equivalent to a single concurrent change. + +Note also that what already said about `DisconnectEverythingWhen` method is independent from that `ConcurrentChangeStrategy` enum. So in both cases, when the disconnection notification arrive, the async computation will be cancelled. + ### How it works? Basically the getter (not the setter!) of the Signals property Value raises a static event that notifies someone just requested that signal. This is used by the Computed signal before executing the computation function. -The computed signals register to that event (filtering out notifications of other threads), and in that way they know, when the function returns, what are the signals that have been just accessed. +The computed signals register to that event (filtering out notifications of other signals, using some async locals state), and in that way they know, when the function returns, what are the signals that have been just accessed. At this point it subscribes to the changes of all those signals in order to know when it should recompute again the value. diff --git a/SignalsDotnet/SignalsDotnet.PeformanceTests/MainThreadAwaitableExtensions.cs b/SignalsDotnet/SignalsDotnet.PeformanceTests/MainThreadAwaitableExtensions.cs new file mode 100644 index 0000000..018a401 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.PeformanceTests/MainThreadAwaitableExtensions.cs @@ -0,0 +1,23 @@ +using System.Runtime.CompilerServices; + +namespace SignalsDotnet.Tests.Helpers; + +public static class MainThreadAwaitableExtensions +{ + public static MainThreadAwaitable SwitchToMainThread(this object _) => new(); +} + +/// +/// If awaited, force the continuation to run on a Single-threaded synchronization context. +/// That's the exact behavior of Wpf Synchronization Context (DispatcherSynchronizationContext) +/// So basically: +/// 1) after the await we switch thread. +/// 2) Every other continuation will run on the same thread as it happens in Wpf. +/// +public readonly struct MainThreadAwaitable : INotifyCompletion +{ + public MainThreadAwaitable GetAwaiter() => this; + public bool IsCompleted => SynchronizationContext.Current == TestSingleThreadSynchronizationContext.Instance; + public void OnCompleted(Action action) => TestSingleThreadSynchronizationContext.Instance.Post(_ => action(), null); + public void GetResult() { } +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet.PeformanceTests/Program.cs b/SignalsDotnet/SignalsDotnet.PeformanceTests/Program.cs new file mode 100644 index 0000000..b09f8c0 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.PeformanceTests/Program.cs @@ -0,0 +1,61 @@ +using BenchmarkDotNet.Attributes; +using BenchmarkDotNet.Configs; +using BenchmarkDotNet.Jobs; +using BenchmarkDotNet.Running; +using BenchmarkDotNet.Toolchains.InProcess.NoEmit; +using R3; +using SignalsDotnet; +using SignalsDotnet.Tests.Helpers; + +BenchmarkRunner.Run(); + +public class BenchmarkConfig : ManualConfig +{ + public BenchmarkConfig() + { + AddJob(Job.MediumRun + .WithToolchain(InProcessNoEmitToolchain.Instance)); + } +} + +[MemoryDiagnoser] +[Config(typeof(BenchmarkConfig))] +public class ComputedBenchmarks +{ + readonly Signal _signal = new(0); + readonly IAsyncReadOnlySignal _asyncComputed; + readonly IReadOnlySignal _computed; + + public ComputedBenchmarks() + { + _computed = Signal.Computed(() => _signal.Value, x => x with{SubscribeWeakly = false}); + _asyncComputed = Signal.AsyncComputed(async _ => + { + var x = _signal.Value; + await Task.Yield(); + return x; + }, -1); + } + + [Benchmark] + public int ComputedRoundTrip() + { + _ = _computed.Value; + _signal.Value = 0; + _signal.Value = 1; + return _computed.Value; + } + + [Benchmark] + public async ValueTask AsyncComputedRoundTrip() + { + await this.SwitchToMainThread(); + + _ = _asyncComputed.Value; + _signal.Value = 0; + _signal.Value = 1; + return await _asyncComputed.Values + .FirstAsync(x => x == 1) + .ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + } +} diff --git a/SignalsDotnet/SignalsDotnet.PeformanceTests/SignalsDotnet.PeformanceTests.csproj b/SignalsDotnet/SignalsDotnet.PeformanceTests/SignalsDotnet.PeformanceTests.csproj new file mode 100644 index 0000000..15fa9ec --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.PeformanceTests/SignalsDotnet.PeformanceTests.csproj @@ -0,0 +1,18 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + diff --git a/SignalsDotnet/SignalsDotnet.PeformanceTests/TestSingleThreadSynchronizationContext.cs b/SignalsDotnet/SignalsDotnet.PeformanceTests/TestSingleThreadSynchronizationContext.cs new file mode 100644 index 0000000..813e95d --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.PeformanceTests/TestSingleThreadSynchronizationContext.cs @@ -0,0 +1,45 @@ +using System.Collections.Concurrent; + +namespace SignalsDotnet.Tests.Helpers; + +internal sealed class TestSingleThreadSynchronizationContext : SynchronizationContext +{ + public Thread MainThread { get; } + readonly BlockingCollection<(SendOrPostCallback callback, object? state)> _callbacksWithState = []; + + public TestSingleThreadSynchronizationContext() + { + MainThread = new Thread(MainThreadLoop) + { + IsBackground = true + }; + + MainThread.Start(); + } + + public static TestSingleThreadSynchronizationContext Instance { get; } = new(); + + void MainThreadLoop() + { + SetSynchronizationContext(this); + + foreach (var (callback, state) in _callbacksWithState.GetConsumingEnumerable()) + callback.Invoke(state); + } + + public override void Post(SendOrPostCallback callback, object? state) + { + _callbacksWithState.Add((callback, state)); + } + + public override void Send(SendOrPostCallback callback, object? state) + { + if (Current == this) + { + callback(state); + return; + } + + _callbacksWithState.Add((callback, state)); + } +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet.Tests/AsyncComputedSignalTests.cs b/SignalsDotnet/SignalsDotnet.Tests/AsyncComputedSignalTests.cs new file mode 100644 index 0000000..7d58c0f --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.Tests/AsyncComputedSignalTests.cs @@ -0,0 +1,229 @@ +using FluentAssertions; +using SignalsDotnet.Helpers; +using SignalsDotnet.Tests.Helpers; +using R3; + +namespace SignalsDotnet.Tests; +public class AsyncComputedSignalTests +{ + [Fact] + public async Task ShouldNotifyWhenAnyChanged() + { + await this.SwitchToMainThread(); + + var prop1 = new Signal(); + var prop2 = new Signal(); + + async ValueTask Sum(CancellationToken token = default) + { + await Task.Yield(); + return prop1.Value + prop2.Value; + } + + var computed = Signal.AsyncComputed(Sum, 0, () => Optional.Empty); + int notifiedValue = 0; + computed.Values.Subscribe(_ => notifiedValue++); + _ = computed.Value; + await TestHelpers.WaitUntil(() => notifiedValue == 1); + + notifiedValue = 0; + prop1.Value = 2; + await TestHelpers.WaitUntil(() => notifiedValue == 1); + computed.Value.Should().Be(await Sum()); + + notifiedValue = 0; + prop2.Value = 1; + await TestHelpers.WaitUntil(() => notifiedValue == 1); + computed.Value.Should().Be(await Sum()); + } + + [Fact] + public async Task SignalChangedWhileComputing_ShouldBeConsidered() + { + await this.SwitchToMainThread(); + + var prop1 = new Signal(); + var prop2 = new Signal(); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var computed = Signal.AsyncComputed(Sum, 0, () => Optional.Empty); + + + async ValueTask Sum(CancellationToken token = default) + { + await Task.Yield(); + var sum = prop1.Value + prop2.Value; + if (prop1.Value <= 3) + { + prop1.Value++; + prop2.Value++; + } + else + { + tcs.TrySetResult(); + } + + return sum; + } + + _ = computed.Value; + await tcs.Task; + await TestHelpers.WaitUntil(() => computed.Value == prop1.Value + prop2.Value); + } + + + [Fact] + public async Task ConcurrentUpdate_ShouldCancelCurrentIfRequested() + { + await this.SwitchToMainThread(); + + var prop1 = new Signal(); + var prop2 = new Signal(); + + CancellationToken computeToken = default; + var computed = Signal.AsyncComputed(async token => + { + var sum = prop1.Value + prop2.Value; + if (prop1.Value == 1) + return sum; + + prop1.Value++; + computeToken = token; + await Task.Delay(1, token); + return sum; + }, 0, ConcurrentChangeStrategy.CancelCurrent); + + _ = computed.Value; + await TestHelpers.WaitUntil(() => computeToken.IsCancellationRequested); + } + + [Fact] + public async Task OtherSignalChanges_ShouldNotBeConsidered() + { + await this.SwitchToMainThread(); + + var signal1 = new Signal(); + var signal2 = new Signal(); + + var signal3 = new Signal(); + + var middleComputationTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var signal3ChangedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var stepNumber = 0; + var sum = Signal.AsyncComputed(async _ => + { + middleComputationTcs.TrySetResult(); + var ret = stepNumber + signal1.Value + signal2.Value; + stepNumber++; + await signal3ChangedTcs.Task; + return ret; + }, 0); + + var notifiedCount = 0; + _ = sum.Value; + await sum.Values.Where(x => x == 0) + .Take(1) + .WaitAsync() + .ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + + sum.Values.Skip(1).Subscribe(_ => notifiedCount++); + await middleComputationTcs.Task; + + _ = signal3.Value; + signal3.Value = 1; + signal3ChangedTcs.SetResult(); + + await Task.Yield(); + notifiedCount.Should().Be(0); + signal1.Value = 1; + await TestHelpers.WaitUntil(() => sum.Value == 2); + } + + + [Fact] + public async Task CancellationSignal_ShouldCancel_AllComputedSignals() + { + await this.SwitchToMainThread(); + var cancellationRequested = new Signal(); + + var waitForCancellationSignal = new Signal(false); + var cancellationToken = new Signal(); + var computedSignal = ComputedSignalFactory.Default + .DisconnectEverythingWhen(cancellationRequested.Values) + .AsyncComputed(async token => + { + await waitForCancellationSignal.Values.FirstAsync(x => x); + cancellationToken.Value = token; + return 1; + }, 0); + + _ = computedSignal.Value; + cancellationRequested.Value = true; + waitForCancellationSignal.Value = true; + + await cancellationToken.Values.FirstAsync(x => x is not null); + cancellationToken.Value!.Value.IsCancellationRequested.Should().Be(true); + + cancellationToken.Value = null; + cancellationRequested.Value = false; + + await cancellationToken.Values.FirstAsync(x => x is not null); + cancellationToken.Value!.Value.IsCancellationRequested.Should().BeFalse(); + } + + + [Fact] + public async Task ConcurrentUpdate_ShouldScheduleNext_IfRequested() + { + await this.SwitchToMainThread(); + + var prop1 = new Signal(1); + var prop2 = new Signal(); + + + var computed = Signal.AsyncComputed(async token => + { + var sum = prop1.Value + prop2.Value; + prop1.Value++; + await Task.Delay(0, token); + await Task.Yield(); + return sum; + }, -1); + + var task = computed.Values.FirstAsync(x => x == 20); + await task; + } + + [Fact] + public async Task SimpleTest() + { + await this.SwitchToMainThread(); + + Signal signal = new(0); + var asyncComputed = Signal.AsyncComputed(async _ => + { + var x = signal.Value; + await Task.Yield(); + await Task.Yield(); + await Task.Yield(); + return x; + }, -1, configuration: x => x with + { + SubscribeWeakly = false + }); + + _ = asyncComputed.Value; + signal.Value = 0; + signal.Value = 1; + signal.Value = 2; + signal.Value = 3; + signal.Value = 4; + signal.Value = 5; + + await asyncComputed.Values + .Timeout(TimeSpan.FromSeconds(1)) + .FirstAsync(x => x == 5) + .ConfigureAwait(ConfigureAwaitOptions.ForceYielding); + } +} diff --git a/SignalsDotnet/SignalsDotnet.Tests/AsyncEffectTests.cs b/SignalsDotnet/SignalsDotnet.Tests/AsyncEffectTests.cs new file mode 100644 index 0000000..7cad5be --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.Tests/AsyncEffectTests.cs @@ -0,0 +1,128 @@ +using FluentAssertions; +using SignalsDotnet.Tests.Helpers; + +namespace SignalsDotnet.Tests; +public class AsyncEffectTests +{ + [Fact] + public async Task ShouldRunWhenAnySignalChanges() + { + await this.SwitchToMainThread(); + + var number1 = new Signal(); + var number2 = new Signal(); + + int sum = -1; + var effect = new Effect(async _ => + { + await Task.Yield(); + sum = number1.Value + number2.Value; + await Task.Yield(); + }); + + await TestHelpers.WaitUntil(() => sum == 0); + + number1.Value = 1; + await TestHelpers.WaitUntil(() => sum == 1); + + number1.Value = 2; + await TestHelpers.WaitUntil(() => sum == 2); + + number2.Value = 2; + await TestHelpers.WaitUntil(() => sum == 4); + + effect.Dispose(); + + number2.Value = 3; + await TestHelpers.WaitUntil(() => sum == 4); + } + + + [Fact] + public async Task ShouldRunOnSpecifiedScheduler() + { + await this.SwitchToMainThread(); + var scheduler = new TestScheduler(); + var number1 = new Signal(); + var number2 = new Signal(); + + int sum = -1; + var effect = new Effect(async _ => + { + await Task.Yield(); + sum = number1.Value + number2.Value; + await Task.Yield(); + }, scheduler: scheduler); + + await TestHelpers.WaitUntil(() => sum == 0); + + number1.Value = 1; + await TestHelpers.WaitUntil(() => sum == 0); + + number1.Value = 2; + await TestHelpers.WaitUntil(() => sum == 0); + + scheduler.ExecuteAllPendingActions(); + await TestHelpers.WaitUntil(() => sum == 2); + + effect.Dispose(); + + number2.Value = 3; + scheduler.ExecuteAllPendingActions(); + await TestHelpers.WaitUntil(() => sum == 2); + } + + + [Fact] + public async Task EffectsShouldRunAtTheEndOfAtomicOperations() + { + await this.SwitchToMainThread(); + + var number1 = new Signal(); + var number2 = new Signal(); + + int sum = -1; + _ = new Effect(async _ => + { + await Task.Yield(); + sum = number1.Value + number2.Value; + }); + + await TestHelpers.WaitUntil(() => sum == 0); + + await Task.Delay(10); + await Effect.AtomicOperationAsync(async () => + { + await Task.Yield(); + number1.Value = 1; + await Task.Yield(); + sum.Should().Be(0); + + await Task.Yield(); + number1.Value = 2; + await Task.Yield(); + sum.Should().Be(0); + }); + await TestHelpers.WaitUntil(() => sum == 2); + + await Effect.AtomicOperationAsync(async () => + { + await Task.Yield(); + number2.Value = 2; + sum.Should().Be(2); + + await Effect.AtomicOperationAsync(async () => + { + await Task.Yield(); + number2.Value = 3; + await Task.Yield(); + sum.Should().Be(2); + await Task.Yield(); + }); + + sum.Should().Be(2); + }); + + await TestHelpers.WaitUntil(() => sum == 5); + } +} diff --git a/SignalsDotnet/SignalsDotnet.Tests/CollectionSignalTests.cs b/SignalsDotnet/SignalsDotnet.Tests/CollectionSignalTests.cs index d56b415..1ecde8e 100644 --- a/SignalsDotnet/SignalsDotnet.Tests/CollectionSignalTests.cs +++ b/SignalsDotnet/SignalsDotnet.Tests/CollectionSignalTests.cs @@ -1,5 +1,6 @@ using System.Collections.ObjectModel; using FluentAssertions; +using R3; namespace SignalsDotnet.Tests; @@ -10,7 +11,7 @@ public void ReactiveObservableCollection_ShouldObserve_NestedChanges() { var city = new City(); bool anyNiceChair = false; - city.AnyNiceChair.Subscribe(x => anyNiceChair = x); + city.AnyNiceChair.Values.Subscribe(x => anyNiceChair = x); var house = new House(); city.Houses.Value = [house]; diff --git a/SignalsDotnet/SignalsDotnet.Tests/ComputedSignalTests.cs b/SignalsDotnet/SignalsDotnet.Tests/ComputedSignalTests.cs index 7dcd9d7..f0780ce 100644 --- a/SignalsDotnet/SignalsDotnet.Tests/ComputedSignalTests.cs +++ b/SignalsDotnet/SignalsDotnet.Tests/ComputedSignalTests.cs @@ -1,5 +1,5 @@ -using System.Reactive.Linq; -using FluentAssertions; +using FluentAssertions; +using R3; namespace SignalsDotnet.Tests; @@ -14,7 +14,7 @@ public void ShouldNotifyWhenAnyChanged() int Sum() => prop1.Value + prop2.Value; var computed = Signal.Computed(Sum); int notifiedValue = 0; - computed.Subscribe(_ => notifiedValue++); + computed.Values.Subscribe(_ => notifiedValue++); _ = computed.Value; notifiedValue = 0; @@ -47,7 +47,7 @@ int Op() var computed = Signal.Computed(Op); computed.Value.Should().Be(Op()); - var computedChanged = computed.Skip(1); + var computedChanged = computed.Values.Skip(1); var notified = false; computedChanged.Subscribe(_ => notified = true); @@ -81,7 +81,7 @@ public void Untracked_ShouldNotTrack_SignalChanges() var value = 0; var computed = Signal.Computed(() => a.Value + Signal.Untracked(() => b.Value)); - computed.Subscribe(x => value = x); + computed.Values.Subscribe(x => value = x); a.Value = 1; value.Should().Be(1); a.Value = 2; @@ -101,7 +101,7 @@ public void UntrackedValue_ShouldNotTrack_SignalChanges() var value = 0; var computed = Signal.Computed(() => a.Value + b.UntrackedValue); - computed.Subscribe(x => value = x); + computed.Values.Subscribe(x => value = x); a.Value = 1; value.Should().Be(1); a.Value = 2; diff --git a/SignalsDotnet/SignalsDotnet.Tests/EffectTests.cs b/SignalsDotnet/SignalsDotnet.Tests/EffectTests.cs index 6dab9a8..ff9e0e0 100644 --- a/SignalsDotnet/SignalsDotnet.Tests/EffectTests.cs +++ b/SignalsDotnet/SignalsDotnet.Tests/EffectTests.cs @@ -1,12 +1,12 @@ -using System.Reactive.Concurrency; -using System.Reactive.Disposables; -using FluentAssertions; +using FluentAssertions; +using R3; namespace SignalsDotnet.Tests; + public class EffectTests { [Fact] - void ShouldRunWhenAnySignalChanges() + public void ShouldRunWhenAnySignalChanges() { var number1 = new Signal(); var number2 = new Signal(); @@ -31,7 +31,7 @@ void ShouldRunWhenAnySignalChanges() } [Fact] - void ShouldRunOnSpecifiedScheduler() + public void ShouldRunOnSpecifiedScheduler() { var scheduler = new TestScheduler(); var number1 = new Signal(); @@ -58,7 +58,7 @@ void ShouldRunOnSpecifiedScheduler() } [Fact] - void EffectShouldNotRunMultipleTimesInASingleSchedule() + public void EffectShouldNotRunMultipleTimesInASingleSchedule() { var scheduler = new TestScheduler(); var number1 = new Signal(); @@ -71,10 +71,10 @@ void EffectShouldNotRunMultipleTimesInASingleSchedule() executionsCount++; }, scheduler); executionsCount.Should().Be(1); - + number2.Value = 4; number2.Value = 3; - + number1.Value = 4; number1.Value = 3; executionsCount.Should().Be(1); @@ -91,51 +91,60 @@ void EffectShouldNotRunMultipleTimesInASingleSchedule() } [Fact] - void EffectsShouldRunAtTheEndOfAtomicOperations() + public async Task EffectsShouldRunAtTheEndOfAtomicOperations() { - var number1 = new Signal(); - var number2 = new Signal(); - - int sum = -1; - var effect = new Effect(() => sum = number1.Value + number2.Value); - sum.Should().Be(0); - - Effect.AtomicOperation(() => - { - number1.Value = 1; - sum.Should().Be(0); - - number1.Value = 2; - sum.Should().Be(0); - }); - sum.Should().Be(2); - - Effect.AtomicOperation(() => - { - number2.Value = 2; - sum.Should().Be(2); - - Effect.AtomicOperation(() => - { - number2.Value = 3; - sum.Should().Be(2); - }); - - sum.Should().Be(2); - }); - - sum.Should().Be(5); + await Enumerable.Range(1, 33) + .Select(__ => + { + return Observable.FromAsync(async token => await Task.Run(() => + { + var number1 = new Signal(); + var number2 = new Signal(); + + int sum = -1; + _ = new Effect(() => sum = number1.Value + number2.Value); + //sum.Should().Be(0); + + Effect.AtomicOperation(() => + { + number1.Value = 1; + sum.Should().Be(0); + + number1.Value = 2; + sum.Should().Be(0); + }); + sum.Should().Be(2); + + Effect.AtomicOperation(() => + { + number2.Value = 2; + sum.Should().Be(2); + + Effect.AtomicOperation(() => + { + number2.Value = 3; + sum.Should().Be(2); + }); + + sum.Should().Be(2); + }); + + sum.Should().Be(5); + }, token)); + }) + .Merge() + .WaitAsync(); } [Fact] - void EffectsShouldRunAtTheEndOfAtomicOperationsWithScheduler() + public void EffectsShouldRunAtTheEndOfAtomicOperationsWithScheduler() { var scheduler = new TestScheduler(); var number1 = new Signal(); var number2 = new Signal(); int sum = -1; - var effect = new Effect(() => sum = number1.Value + number2.Value, scheduler); + _ = new Effect(() => sum = number1.Value + number2.Value, scheduler); sum.Should().Be(0); Effect.AtomicOperation(() => @@ -149,30 +158,35 @@ void EffectsShouldRunAtTheEndOfAtomicOperationsWithScheduler() sum.Should().Be(0); }); sum.Should().Be(0); - + scheduler.ExecuteAllPendingActions(); sum.Should().Be(2); } +} +public class TestScheduler : TimeProvider +{ + Action? _actions; + public void ExecuteAllPendingActions() + { + _actions?.Invoke(); + } - class TestScheduler : IScheduler + public override ITimer CreateTimer(TimerCallback callback, object? state, TimeSpan dueTime, TimeSpan period) { - Action? _actions; - public void ExecuteAllPendingActions() - { - var actions = _actions; - _actions = null; - actions?.Invoke(); - } + _actions += () => callback(state); + return new FakeTimer(); + } - public IDisposable Schedule(TState state, Func action) + class FakeTimer : ITimer + { + public void Dispose() { - _actions += () => action(this, state); - return Disposable.Empty; + } - public IDisposable Schedule(TState state, TimeSpan dueTime, Func action) => Schedule(state, action); - public IDisposable Schedule(TState state, DateTimeOffset dueTime, Func action) => Schedule(state, action); - public DateTimeOffset Now => DateTimeOffset.UnixEpoch; + public ValueTask DisposeAsync() => ValueTask.CompletedTask; + + public bool Change(TimeSpan dueTime, TimeSpan period) => throw new NotImplementedException(); } -} +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet.Tests/Helpers/MainThreadAwaitableExtensions.cs b/SignalsDotnet/SignalsDotnet.Tests/Helpers/MainThreadAwaitableExtensions.cs new file mode 100644 index 0000000..018a401 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.Tests/Helpers/MainThreadAwaitableExtensions.cs @@ -0,0 +1,23 @@ +using System.Runtime.CompilerServices; + +namespace SignalsDotnet.Tests.Helpers; + +public static class MainThreadAwaitableExtensions +{ + public static MainThreadAwaitable SwitchToMainThread(this object _) => new(); +} + +/// +/// If awaited, force the continuation to run on a Single-threaded synchronization context. +/// That's the exact behavior of Wpf Synchronization Context (DispatcherSynchronizationContext) +/// So basically: +/// 1) after the await we switch thread. +/// 2) Every other continuation will run on the same thread as it happens in Wpf. +/// +public readonly struct MainThreadAwaitable : INotifyCompletion +{ + public MainThreadAwaitable GetAwaiter() => this; + public bool IsCompleted => SynchronizationContext.Current == TestSingleThreadSynchronizationContext.Instance; + public void OnCompleted(Action action) => TestSingleThreadSynchronizationContext.Instance.Post(_ => action(), null); + public void GetResult() { } +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet.Tests/Helpers/TestHelpers.cs b/SignalsDotnet/SignalsDotnet.Tests/Helpers/TestHelpers.cs new file mode 100644 index 0000000..021b1b5 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.Tests/Helpers/TestHelpers.cs @@ -0,0 +1,12 @@ +namespace SignalsDotnet.Tests.Helpers; + +internal static class TestHelpers +{ + public static async Task WaitUntil(Func predicate) + { + while (!predicate()) + { + await Task.Yield(); + } + } +} diff --git a/SignalsDotnet/SignalsDotnet.Tests/Helpers/TestSingleThreadSynchronizationContext.cs b/SignalsDotnet/SignalsDotnet.Tests/Helpers/TestSingleThreadSynchronizationContext.cs new file mode 100644 index 0000000..813e95d --- /dev/null +++ b/SignalsDotnet/SignalsDotnet.Tests/Helpers/TestSingleThreadSynchronizationContext.cs @@ -0,0 +1,45 @@ +using System.Collections.Concurrent; + +namespace SignalsDotnet.Tests.Helpers; + +internal sealed class TestSingleThreadSynchronizationContext : SynchronizationContext +{ + public Thread MainThread { get; } + readonly BlockingCollection<(SendOrPostCallback callback, object? state)> _callbacksWithState = []; + + public TestSingleThreadSynchronizationContext() + { + MainThread = new Thread(MainThreadLoop) + { + IsBackground = true + }; + + MainThread.Start(); + } + + public static TestSingleThreadSynchronizationContext Instance { get; } = new(); + + void MainThreadLoop() + { + SetSynchronizationContext(this); + + foreach (var (callback, state) in _callbacksWithState.GetConsumingEnumerable()) + callback.Invoke(state); + } + + public override void Post(SendOrPostCallback callback, object? state) + { + _callbacksWithState.Add((callback, state)); + } + + public override void Send(SendOrPostCallback callback, object? state) + { + if (Current == this) + { + callback(state); + return; + } + + _callbacksWithState.Add((callback, state)); + } +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet.Tests/SignalsDotnet.Tests.csproj b/SignalsDotnet/SignalsDotnet.Tests/SignalsDotnet.Tests.csproj index 5c83343..f93e979 100644 --- a/SignalsDotnet/SignalsDotnet.Tests/SignalsDotnet.Tests.csproj +++ b/SignalsDotnet/SignalsDotnet.Tests/SignalsDotnet.Tests.csproj @@ -10,14 +10,14 @@ - - - - + + + + runtime; build; native; contentfiles; analyzers; buildtransitive all - + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/SignalsDotnet/SignalsDotnet.Tests/ThrottleOneCycleTests.cs b/SignalsDotnet/SignalsDotnet.Tests/ThrottleOneCycleTests.cs deleted file mode 100644 index 69f301a..0000000 --- a/SignalsDotnet/SignalsDotnet.Tests/ThrottleOneCycleTests.cs +++ /dev/null @@ -1,94 +0,0 @@ -using System.Collections.Concurrent; -using System.Reactive.Concurrency; -using System.Reactive.Subjects; -using System.Runtime.CompilerServices; -using FluentAssertions; - -namespace SignalsDotnet.Tests; - -public class ThrottleOneCycleTests -{ - [Fact] - public async Task ThrottleOneCycleShouldNotNotifyInlineCalls() - { - await this.SwitchToMainThread(); - var subject = new Subject(); - int totalNotifications = 0; - subject.ThrottleOneCycle(new SynchronizationContextScheduler(SynchronizationContext.Current!)) - .Subscribe(_ => Interlocked.Increment(ref totalNotifications)); - - for (var i = 0; i < 1000; i++) - { - subject.OnNext(i); - } - - totalNotifications.Should().Be(0); - await Task.Yield(); - totalNotifications.Should().Be(1); - await Task.Yield(); - totalNotifications.Should().Be(1); - } -} - - -internal sealed class TestSingleThreadSynchronizationContext : SynchronizationContext -{ - public Thread MainThread { get; } - readonly BlockingCollection<(SendOrPostCallback callback, object? state)> _callbacksWithState = []; - - public TestSingleThreadSynchronizationContext() - { - MainThread = new Thread(MainThreadLoop) - { - IsBackground = true - }; - - MainThread.Start(); - } - - public static TestSingleThreadSynchronizationContext Instance { get; } = new(); - - void MainThreadLoop() - { - SetSynchronizationContext(this); - - foreach (var (callback, state) in _callbacksWithState.GetConsumingEnumerable()) - callback.Invoke(state); - } - - public override void Post(SendOrPostCallback callback, object? state) - { - _callbacksWithState.Add((callback, state)); - } - - public override void Send(SendOrPostCallback callback, object? state) - { - if (Current == this) - { - callback(state); - return; - } - - _callbacksWithState.Add((callback, state)); - } -} - -public static class MainThreadAwaitableExtensions -{ - public static MainThreadAwaitable SwitchToMainThread(this object _) => new (); -} - -/// -/// If awaited, force the continuation to run on a Single-threaded synchronization context. -/// That's the exact behavior of Wpf Synchronization Context (DispatcherSynchronizationContext) -/// So basically: -/// 1) after the await we switch thread. -/// 2) Every other continuation will run on the same thread as it happens in Wpf. -/// -public readonly struct MainThreadAwaitable : INotifyCompletion -{ - public MainThreadAwaitable GetAwaiter() => this; - public bool IsCompleted => SynchronizationContext.Current == TestSingleThreadSynchronizationContext.Instance; - public void OnCompleted(Action action) => TestSingleThreadSynchronizationContext.Instance.Send(_ => action(), null); - public void GetResult(){} -} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet.sln b/SignalsDotnet/SignalsDotnet.sln index f89a5e3..b794721 100644 --- a/SignalsDotnet/SignalsDotnet.sln +++ b/SignalsDotnet/SignalsDotnet.sln @@ -7,6 +7,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SignalsDotnet", "SignalsDot EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SignalsDotnet.Tests", "SignalsDotnet.Tests\SignalsDotnet.Tests.csproj", "{D6DA2778-8C20-4D1D-A4E7-956FEAE51859}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SignalsDotnet.PeformanceTests", "SignalsDotnet.PeformanceTests\SignalsDotnet.PeformanceTests.csproj", "{C7366C63-2562-435B-B78D-9D51C0649CBA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -21,6 +23,8 @@ Global {D6DA2778-8C20-4D1D-A4E7-956FEAE51859}.Debug|Any CPU.Build.0 = Debug|Any CPU {D6DA2778-8C20-4D1D-A4E7-956FEAE51859}.Release|Any CPU.ActiveCfg = Release|Any CPU {D6DA2778-8C20-4D1D-A4E7-956FEAE51859}.Release|Any CPU.Build.0 = Release|Any CPU + {C7366C63-2562-435B-B78D-9D51C0649CBA}.Debug|Any CPU.ActiveCfg = Debug|AnyCPU + {C7366C63-2562-435B-B78D-9D51C0649CBA}.Release|Any CPU.ActiveCfg = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/SignalsDotnet/SignalsDotnet/CancellationSignal.cs b/SignalsDotnet/SignalsDotnet/CancellationSignal.cs new file mode 100644 index 0000000..b16d31b --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/CancellationSignal.cs @@ -0,0 +1,28 @@ +using R3; + +namespace SignalsDotnet; + +public static class CancellationSignal +{ + public static IReadOnlySignal Create(Observable isCancelledObservable) + { + return isCancelledObservable.DistinctUntilChanged() + .Scan((cancelationTokenSource: (CancellationTokenSource?)null, cancellationToken: default(CancellationToken)), (x, isCancelled) => + { + if (isCancelled) + { + var cancellationTokenSource = x.cancelationTokenSource ?? new CancellationTokenSource(); + var token = cancellationTokenSource.Token; + cancellationTokenSource.Cancel(); + cancellationTokenSource.Dispose(); + + return (cancellationTokenSource, token); + } + + var newCancellationToken = new CancellationTokenSource(); + return (newCancellationToken, newCancellationToken.Token); + }) + .Select(x => x.cancellationToken) + .ToSignal(x => x with { RaiseOnlyWhenChanged = false }); + } +} diff --git a/SignalsDotnet/SignalsDotnet/CollectionSignal.cs b/SignalsDotnet/SignalsDotnet/CollectionSignal.cs index 4aed6dd..0de2539 100644 --- a/SignalsDotnet/SignalsDotnet/CollectionSignal.cs +++ b/SignalsDotnet/SignalsDotnet/CollectionSignal.cs @@ -1,7 +1,6 @@ using System.Collections.Specialized; using System.ComponentModel; -using System.Reactive; -using System.Reactive.Linq; +using R3; using SignalsDotnet.Configuration; namespace SignalsDotnet; @@ -9,10 +8,10 @@ namespace SignalsDotnet; public class CollectionSignal : IReadOnlySignal where T : class, INotifyCollectionChanged { readonly CollectionChangedSignalConfigurationDelegate? _collectionChangedConfiguration; - readonly Signal> _signal; + readonly Signal?> _signal; public CollectionSignal(CollectionChangedSignalConfigurationDelegate? collectionChangedConfiguration = null, - SignalConfigurationDelegate>? propertyChangedConfiguration = null) + SignalConfigurationDelegate?>? propertyChangedConfiguration = null) { _collectionChangedConfiguration = collectionChangedConfiguration; _signal = new(propertyChangedConfiguration); @@ -20,27 +19,28 @@ public CollectionSignal(CollectionChangedSignalConfigurationDelegate? collection _signal.PropertyChanged += (_, args) => { PropertyChanged?.Invoke(this, args); - PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(UntrackedValue))); }; } public T? Value { get => _signal.Value?.Value; - set => _signal.Value = value?.ToCollectionSignal(_collectionChangedConfiguration); + set => _signal.Value = value?.ToCollectionSignal(_collectionChangedConfiguration)!; } - public IObservable Changed => this.Select(static _ => Unit.Default); - - public IDisposable Subscribe(IObserver observer) - { - return _signal.Select(static x => x ?? Observable.Empty()) - .Switch() - .Subscribe(observer); - } + public Observable Values => _signal.Values + .Select(static x => x?.Values ?? Observable.Return(null)!) + .Switch()!; + + public Observable FutureValues => Values.Skip(1); + public event PropertyChangedEventHandler? PropertyChanged; object? IReadOnlySignal.UntrackedValue => UntrackedValue; public T? UntrackedValue => _signal.UntrackedValue?.UntrackedValue; public T? UntrackedCollectionChangedValue => _signal.Value?.UntrackedValue; + + Observable IReadOnlySignal.Values => _signal.Values + .Select(static x => ((IReadOnlySignal?)x)?.Values ?? Observable.Return(Unit.Default)) + .Switch(); } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/ComputedSignalFactory.cs b/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/ComputedSignalFactory.cs new file mode 100644 index 0000000..8a0d277 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/ComputedSignalFactory.cs @@ -0,0 +1,8 @@ +using SignalsDotnet.Internals.ComputedSignalrFactory; + +namespace SignalsDotnet; + +public static class ComputedSignalFactory +{ + public static IComputedSignalFactory Default => DefaultComputedSignalFactory.Instance; +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/ComputedSignalFactoryEx.cs b/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/ComputedSignalFactoryEx.cs new file mode 100644 index 0000000..a701a28 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/ComputedSignalFactoryEx.cs @@ -0,0 +1,48 @@ +using R3; +using SignalsDotnet.Configuration; +using SignalsDotnet.Helpers; +using SignalsDotnet.Internals.ComputedSignalrFactory; + +namespace SignalsDotnet; + +public static class ComputedSignalFactoryEx +{ + public static IComputedSignalFactory DisconnectEverythingWhen(this IComputedSignalFactory @this, Observable shouldBeCancelled) + { + return new CancelComputedSignalFactoryDecorator(@this, CancellationSignal.Create(shouldBeCancelled)); + } + + public static IComputedSignalFactory OnException(this IComputedSignalFactory @this, Action onException, bool ignoreOperationCancelled = true) + { + return new OnErrorComputedSignalFactoryDecorator(@this, ignoreOperationCancelled, onException); + } + + public static IReadOnlySignal Computed(this IComputedSignalFactory @this, Func func, Func fallbackValue, ReadonlySignalConfigurationDelegate? configuration = null) + { + return @this.Computed(func, () => new Optional(fallbackValue()), configuration); + } + + public static IReadOnlySignal Computed(this IComputedSignalFactory @this, Func func, ReadonlySignalConfigurationDelegate? configuration = null) + { + return @this.Computed(func, static () => default, configuration); + } + + public static IAsyncReadOnlySignal AsyncComputed(this IComputedSignalFactory @this, + Func> func, + T startValue, + Func fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, + ReadonlySignalConfigurationDelegate? configuration = null) + { + return @this.AsyncComputed(func, startValue, () => new Optional(fallbackValue()), concurrentChangeStrategy, configuration); + } + + public static IAsyncReadOnlySignal AsyncComputed(this IComputedSignalFactory @this, + Func> func, + T startValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, + ReadonlySignalConfigurationDelegate? configuration = null) + { + return @this.AsyncComputed(func, startValue, static () => default, concurrentChangeStrategy, configuration); + } +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/IComputedSignalFactory.cs b/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/IComputedSignalFactory.cs new file mode 100644 index 0000000..ea98550 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/ComputedSignalFactory/IComputedSignalFactory.cs @@ -0,0 +1,25 @@ +using R3; +using SignalsDotnet.Configuration; +using SignalsDotnet.Helpers; + +namespace SignalsDotnet; + +public interface IComputedSignalFactory +{ + IReadOnlySignal Computed(Func func, Func> fallbackValue, ReadonlySignalConfigurationDelegate? configuration = null); + Observable ComputedObservable(Func func, Func> fallbackValue); + + IAsyncReadOnlySignal AsyncComputed(Func> func, + T startValue, + Func> fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, + ReadonlySignalConfigurationDelegate? configuration = null); + + Observable AsyncComputedObservable(Func> func, + T startValue, + Func> fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default); + + Effect Effect(Action onChange, TimeProvider? scheduler = null); + Effect AsyncEffect(Func onChange, ConcurrentChangeStrategy concurrentChangeStrategy = default, TimeProvider? scheduler = null); +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/ConcurrentChangeStrategy.cs b/SignalsDotnet/SignalsDotnet/ConcurrentChangeStrategy.cs new file mode 100644 index 0000000..09e2f36 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/ConcurrentChangeStrategy.cs @@ -0,0 +1,7 @@ +namespace SignalsDotnet; + +public enum ConcurrentChangeStrategy +{ + ScheduleNext, + CancelCurrent +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Configuration/CollectionChangedSignalConfiguration.cs b/SignalsDotnet/SignalsDotnet/Configuration/CollectionChangedSignalConfiguration.cs index 4fde891..2334f3c 100644 --- a/SignalsDotnet/SignalsDotnet/Configuration/CollectionChangedSignalConfiguration.cs +++ b/SignalsDotnet/SignalsDotnet/Configuration/CollectionChangedSignalConfiguration.cs @@ -1,19 +1,10 @@ -using System.Reactive; -using System.Reactive.Concurrency; +using R3; namespace SignalsDotnet.Configuration; public delegate CollectionChangedSignalConfiguration CollectionChangedSignalConfigurationDelegate(CollectionChangedSignalConfiguration startConfiguration); -public record CollectionChangedSignalConfiguration(bool SubscribeWeakly, Func, IObservable> CollectionChangedObservableMapper) +public record CollectionChangedSignalConfiguration(bool SubscribeWeakly, Func, Observable> CollectionChangedObservableMapper) { public static CollectionChangedSignalConfiguration Default => new(true, static x => x); -} - -public static class CollectionChangedSignalConfigurationExtensions -{ - public static CollectionChangedSignalConfiguration ThrottleOneCycle(this CollectionChangedSignalConfiguration @this, IScheduler scheduler) - { - return @this with { CollectionChangedObservableMapper = x => x.ThrottleOneCycle(scheduler) }; - } } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Configuration/ReadonlySignalConfiguration.cs b/SignalsDotnet/SignalsDotnet/Configuration/ReadonlySignalConfiguration.cs index c42e241..b25e63c 100644 --- a/SignalsDotnet/SignalsDotnet/Configuration/ReadonlySignalConfiguration.cs +++ b/SignalsDotnet/SignalsDotnet/Configuration/ReadonlySignalConfiguration.cs @@ -5,5 +5,5 @@ public record ReadonlySignalConfiguration(IEqualityComparer Comparer, bool RaiseOnlyWhenChanged, bool SubscribeWeakly) { - public static ReadonlySignalConfiguration Default { get; } = new(EqualityComparer.Default, true, true); + public static ReadonlySignalConfiguration Default { get; } = new(EqualityComparer.Default, true, false); } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Effect.cs b/SignalsDotnet/SignalsDotnet/Effect.cs index 7c078bf..b075462 100644 --- a/SignalsDotnet/SignalsDotnet/Effect.cs +++ b/SignalsDotnet/SignalsDotnet/Effect.cs @@ -1,35 +1,48 @@ -using System.Collections.Concurrent; -using System.Diagnostics.CodeAnalysis; -using System.Reactive; -using System.Reactive.Concurrency; -using System.Reactive.Linq; -using System.Reactive.Subjects; -using System.Threading; -using SignalsDotnet.Internals.Helpers; +using R3; +using SignalsDotnet.Helpers; namespace SignalsDotnet; - public class Effect : IDisposable { - static readonly ConcurrentDictionary> _atomicOperationsNestingByThread = new(); + static readonly object _atomicOperationsLocker = new (); + static readonly AsyncLocal> _atomicOperationsCounter = new(); readonly IDisposable _subscription; - public Effect(Action onChange, IScheduler? scheduler = null) + public Effect(Action onChange, TimeProvider? scheduler = null) { var computationDelayer = ComputationDelayer(scheduler ?? DefaultScheduler); - _subscription = Signal.ComputedObservable(() => + _subscription = Signal.ComputedObservable(_ => { onChange(); - return Unit.Default; + return ValueTask.FromResult(Unit.Default); }, static () => Optional.Empty, computationDelayer) .Subscribe(); } - static Func> ComputationDelayer(IScheduler? scheduler) + public Effect(Func onChange, ConcurrentChangeStrategy concurrentChangeStrategy = default, TimeProvider? scheduler = null) { - var atomicOperations = _atomicOperationsNestingByThread.GetOrAdd(Thread.CurrentThread, static _ => new BehaviorSubject(0)); - var noAtomicOperations = atomicOperations.Where(counter => counter == 0) + var computationDelayer = ComputationDelayer(scheduler ?? DefaultScheduler); + _subscription = Signal.ComputedObservable(async token => + { + await onChange(token); + return Unit.Default; + }, static () => Optional.Empty, computationDelayer, concurrentChangeStrategy) + .Subscribe(); + } + + static Func> ComputationDelayer(TimeProvider? scheduler) + { + var atomicOperations = Observable.Defer(() => + { + lock (_atomicOperationsLocker) + { + return _atomicOperationsCounter.Value ??= new(0); + } + }); + + var noAtomicOperations = atomicOperations.Synchronize(_atomicOperationsCounter) + .Where(counter => counter == 0) .Select(static _ => Unit.Default); return scheduler is null @@ -39,11 +52,11 @@ static Func> ComputationDelayer(IScheduler? scheduler) public static void AtomicOperation(Action action) { - var atomicOperations = _atomicOperationsNestingByThread.AddOrUpdate(Thread.CurrentThread, static _ => new BehaviorSubject(0), (_, subject) => + lock (_atomicOperationsLocker) { - subject.OnNext(subject.Value + 1); - return subject; - }); + _atomicOperationsCounter.Value ??= new(0); + _atomicOperationsCounter.Value.OnNext(_atomicOperationsCounter.Value.Value + 1); + } try { @@ -51,11 +64,34 @@ public static void AtomicOperation(Action action) } finally { - atomicOperations.OnNext(atomicOperations.Value - 1); + lock (_atomicOperationsLocker) + { + _atomicOperationsCounter.Value.OnNext(_atomicOperationsCounter.Value.Value - 1); + } } } - public static IScheduler? DefaultScheduler { get; set; } + public static async ValueTask AtomicOperationAsync(Func action) + { + lock (_atomicOperationsLocker) + { + _atomicOperationsCounter.Value ??= new(0); + _atomicOperationsCounter.Value.OnNext(_atomicOperationsCounter.Value.Value + 1); + } + + try + { + await action(); + } + finally + { + lock (_atomicOperationsLocker) + { + _atomicOperationsCounter.Value.OnNext(_atomicOperationsCounter.Value.Value - 1); + } + } + } + public static TimeProvider? DefaultScheduler { get; set; } public void Dispose() => _subscription.Dispose(); } diff --git a/SignalsDotnet/SignalsDotnet/Helpers/ObservableEx.cs b/SignalsDotnet/SignalsDotnet/Helpers/ObservableEx.cs new file mode 100644 index 0000000..00cf345 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/Helpers/ObservableEx.cs @@ -0,0 +1,18 @@ +using R3; + +namespace SignalsDotnet.Helpers; + +public static class ObservableEx +{ + public static Observable DisconnectWhen(this Observable @this, Observable isDisconnected) + { + return isDisconnected.Select(x => x switch + { + false => @this, + true => Observable.Empty() + }) + .Switch() + .Publish() + .RefCount(); + } +} diff --git a/SignalsDotnet/SignalsDotnet/Internals/Helpers/Optional.cs b/SignalsDotnet/SignalsDotnet/Helpers/Optional.cs similarity index 85% rename from SignalsDotnet/SignalsDotnet/Internals/Helpers/Optional.cs rename to SignalsDotnet/SignalsDotnet/Helpers/Optional.cs index 7e0557b..e5c8bf4 100644 --- a/SignalsDotnet/SignalsDotnet/Internals/Helpers/Optional.cs +++ b/SignalsDotnet/SignalsDotnet/Helpers/Optional.cs @@ -1,19 +1,18 @@ using System.Diagnostics.CodeAnalysis; -namespace SignalsDotnet.Internals.Helpers; +namespace SignalsDotnet.Helpers; -readonly struct Optional +public readonly struct Optional { readonly T? _value; public Optional() => (_value, HasValue) = (default, false); public Optional(T value) => (_value, HasValue) = (value, true); public static Optional Empty => new(); - public bool HasValue { get; } public T? Value => HasValue ? _value : throw new InvalidOperationException("Impossible retrieve a value for an empty optional"); } -static class OptionalExtensions +public static class OptionalExtensions { public static bool TryGetValue(this Optional @this, [NotNullWhen(true)] out T? value) { diff --git a/SignalsDotnet/SignalsDotnet/IReadOnlySignal.cs b/SignalsDotnet/SignalsDotnet/IReadOnlySignal.cs index 2cd9f4a..83514b4 100644 --- a/SignalsDotnet/SignalsDotnet/IReadOnlySignal.cs +++ b/SignalsDotnet/SignalsDotnet/IReadOnlySignal.cs @@ -1,18 +1,26 @@ using System.ComponentModel; -using System.Reactive; +using R3; namespace SignalsDotnet; public interface IReadOnlySignal : INotifyPropertyChanged { - IObservable Changed { get; } + Observable Values { get; } + Observable FutureValues => Values.Skip(1); object? Value { get; } object? UntrackedValue { get; } } -public interface IReadOnlySignal : IObservable, IReadOnlySignal +public interface IReadOnlySignal : IReadOnlySignal { + new Observable Values { get; } + new Observable FutureValues { get; } new T Value { get; } new T UntrackedValue { get; } object? IReadOnlySignal.Value => Value; +} + +public interface IAsyncReadOnlySignal : IReadOnlySignal +{ + IReadOnlySignal IsComputing { get; } } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/ComputedObservable.cs b/SignalsDotnet/SignalsDotnet/Internals/ComputedObservable.cs new file mode 100644 index 0000000..7a3bb42 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/Internals/ComputedObservable.cs @@ -0,0 +1,204 @@ +using SignalsDotnet.Helpers; +using R3; + +namespace SignalsDotnet.Internals; + +internal class ComputedObservable : Observable +{ + readonly Func> _func; + readonly Func> _fallbackValue; + readonly Func>? _scheduler; + readonly ConcurrentChangeStrategy _concurrentChangeStrategy; + + public ComputedObservable(Func> func, + Func> fallbackValue, + Func>? scheduler = null, + ConcurrentChangeStrategy concurrentChangeStrategy = default) + { + _func = func; + _fallbackValue = fallbackValue; + _scheduler = scheduler; + _concurrentChangeStrategy = concurrentChangeStrategy; + } + + protected override IDisposable SubscribeCore(Observer observer) => new Subscription(this, observer); + + class Subscription : IDisposable + { + readonly ComputedObservable _observable; + readonly Observer _observer; + readonly BehaviorSubject _disposed = new(false); + + public Subscription(ComputedObservable observable, Observer observer) + { + _observable = observable; + _observer = observer; + Observable.FromAsync(ComputeResult) + .Take(1) + .TakeUntil(_disposed.Where(x => x)) + .Subscribe(OnNewResult); + } + + void OnNewResult(ComputationResult result) + { + var valueNotified = false; + + result.ShouldComputeNextResult + .Take(1) + .SelectMany(_ => + { + NotifyValueIfNotAlready(); + return Observable.FromAsync(ComputeResult); + }) + .TakeUntil(_disposed.Where(x => x)) + .Subscribe(OnNewResult); + + NotifyValueIfNotAlready(); + + // We notify a new value only if the func() evaluation succeeds. + void NotifyValueIfNotAlready() + { + if (valueNotified) + return; + + valueNotified = true; + if (result.ResultOptional.TryGetValue(out var propertyValue)) + { + _observer.OnNext(propertyValue); + } + } + } + + async ValueTask ComputeResult(CancellationToken cancellationToken) + { + var referenceEquality = ReferenceEqualityComparer.Instance; + HashSet signalRequested = new(referenceEquality); + Optional result; + SingleNotificationObservable stopListeningForSignals = new(); + + var signalChangedObservable = Signal.SignalsRequested() + .TakeUntil(stopListeningForSignals) + .Where(signalRequested.Add) + .Select(static x => x.FutureValues) + .Merge() + .Take(1); + + if (_observable._concurrentChangeStrategy == ConcurrentChangeStrategy.CancelCurrent) + { + var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cancellationToken = cts.Token; + signalChangedObservable = signalChangedObservable.Do(_ => cts.Cancel()) + .DoCancelOnCompleted(cts); + } + + var scheduler = _observable._scheduler; + if (scheduler is not null) + { + signalChangedObservable = signalChangedObservable.Select(scheduler) + .Switch(); + } + + var shouldComputeNextResult = signalChangedObservable.Replay(1); + + var disconnect = shouldComputeNextResult.Connect(); + + try + { + try + { + result = new(await _observable._func(cancellationToken)); + } + finally + { + stopListeningForSignals.SetResult(true); + } + } + catch (OperationCanceledException) + { + result = Optional.Empty; + } + catch + { + // If something fails, the property will have the previous result, + // We still have to observe for the properties to change (maybe next time the exception will not be thrown) + try + { + result = _observable._fallbackValue(); + } + catch + { + result = Optional.Empty; + } + } + + var resultObservable = new DisconnectOnDisposeObservable(shouldComputeNextResult, disconnect); + + return new(resultObservable, result); + } + + + public void Dispose() => _disposed.OnNext(true); + } + + record struct ComputationResult(Observable ShouldComputeNextResult, Optional ResultOptional); + class DisconnectOnDisposeObservable : Observable + { + readonly Observable _observable; + readonly IDisposable _disconnect; + + public DisconnectOnDisposeObservable(Observable observable, IDisposable disconnect) + { + _observable = observable; + _disconnect = disconnect; + } + + protected override IDisposable SubscribeCore(Observer observer) + { + _observable.Subscribe(observer.OnNext, observer.OnErrorResume, observer.OnCompleted); + return _disconnect; + } + } + + class SingleNotificationObservable : Observable, IDisposable + { + Observer? _observer; + readonly object _locker = new(); + Optional _value; + + protected override IDisposable SubscribeCore(Observer observer) + { + lock (_locker) + { + if (_value.TryGetValue(out var value)) + { + observer.OnNext(value); + observer.OnCompleted(); + } + else + { + _observer = observer; + } + + return this; + } + } + + public void SetResult(TNotification value) + { + lock (this) + { + var observer = _observer; + if (observer is not null) + { + observer.OnNext(value); + observer.OnCompleted(); + return; + } + + _value = new(value); + } + } + + public void Dispose() => Interlocked.Exchange(ref _observer, null); + } +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/CancelComputedSignalFactoryDecorator.cs b/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/CancelComputedSignalFactoryDecorator.cs new file mode 100644 index 0000000..049e61e --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/CancelComputedSignalFactoryDecorator.cs @@ -0,0 +1,91 @@ +using R3; +using SignalsDotnet.Configuration; +using SignalsDotnet.Helpers; +using SignalsDotnet.Internals.Helpers; + +namespace SignalsDotnet.Internals.ComputedSignalrFactory; + +internal class CancelComputedSignalFactoryDecorator : IComputedSignalFactory +{ + readonly IComputedSignalFactory _parent; + readonly IReadOnlySignal _cancellationSignal; + + public CancelComputedSignalFactoryDecorator(IComputedSignalFactory parent, IReadOnlySignal cancellationSignal) + { + _parent = parent; + _cancellationSignal = cancellationSignal; + } + + public IReadOnlySignal Computed(Func func, Func> fallbackValue, ReadonlySignalConfigurationDelegate? configuration = null) + { + return ComputedObservable(func, fallbackValue).ToSignal(configuration!); + } + + public IAsyncReadOnlySignal AsyncComputed(Func> func, + T startValue, + Func> fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, ReadonlySignalConfigurationDelegate? configuration = null) + { + func = func.TraceWhenExecuting(out var isExecuting); + return AsyncComputedObservable(func, startValue, fallbackValue, concurrentChangeStrategy).ToAsyncSignal(isExecuting, configuration!); + } + + public Observable ComputedObservable(Func func, Func> fallbackValue) + { + return _parent.ComputedObservable(() => + { + if (_cancellationSignal.Value.IsCancellationRequested) + { + return Optional.Empty; + } + + return new Optional(func()); + }, () => new Optional>(fallbackValue())) + .Where(x => x.HasValue) + .Select(x => x.Value!); + } + + public Observable AsyncComputedObservable(Func> func, T startValue, Func> fallbackValue, ConcurrentChangeStrategy concurrentChangeStrategy = default) + { + return _parent.AsyncComputedObservable(async token => + { + if (_cancellationSignal.Value.IsCancellationRequested || token.IsCancellationRequested) + { + return Optional.Empty; + } + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(token, _cancellationSignal.UntrackedValue); + var result = await func(cts.Token); + return new Optional(result); + }, new Optional(startValue), () => new Optional>(fallbackValue()), concurrentChangeStrategy) + .Where(static x => x.HasValue) + .Select(static x => x.Value)!; + } + + public Effect Effect(Action onChange, TimeProvider? scheduler) + { + return new Effect(() => + { + if (_cancellationSignal.Value.IsCancellationRequested) + { + return; + } + + onChange(); + }, scheduler); + } + + public Effect AsyncEffect(Func onChange, ConcurrentChangeStrategy concurrentChangeStrategy, TimeProvider? scheduler) + { + return new Effect(async token => + { + if (_cancellationSignal.Value.IsCancellationRequested || token.IsCancellationRequested) + { + return; + } + + using var cts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationSignal.UntrackedValue, token); + await onChange(cts.Token); + }, concurrentChangeStrategy, scheduler); + } +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/DefaultComputedSignalFactory.cs b/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/DefaultComputedSignalFactory.cs new file mode 100644 index 0000000..45d4cb3 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/DefaultComputedSignalFactory.cs @@ -0,0 +1,47 @@ +using R3; +using SignalsDotnet.Configuration; +using SignalsDotnet.Helpers; + +namespace SignalsDotnet.Internals.ComputedSignalrFactory; + +internal class DefaultComputedSignalFactory : IComputedSignalFactory +{ + public IReadOnlySignal Computed(Func func, Func> fallbackValue, ReadonlySignalConfigurationDelegate? configuration = null) + { + return Signal.Computed(func, fallbackValue, configuration); + } + + public IAsyncReadOnlySignal AsyncComputed(Func> func, + T startValue, + Func> fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, + ReadonlySignalConfigurationDelegate? configuration = null) + { + return Signal.AsyncComputed(func, startValue, fallbackValue, concurrentChangeStrategy, configuration); + } + + + public Observable ComputedObservable(Func func, Func> fallbackValue) + { + return Signal.ComputedObservable(func, fallbackValue); + } + + public Observable AsyncComputedObservable(Func> func, T startValue, Func> fallbackValue, ConcurrentChangeStrategy concurrentChangeStrategy = default) + { + return Signal.AsyncComputedObservable(func, startValue, fallbackValue, concurrentChangeStrategy); + } + + + public Effect Effect(Action onChange, TimeProvider? scheduler) + { + return new Effect(onChange, scheduler); + } + + public Effect AsyncEffect(Func onChange, ConcurrentChangeStrategy concurrentChangeStrategy, TimeProvider? scheduler) + { + return new Effect(onChange, concurrentChangeStrategy, scheduler); + } + + + public static DefaultComputedSignalFactory Instance { get; } = new(); +} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/OnErrorComputedSignalFactoryDecorator.cs b/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/OnErrorComputedSignalFactoryDecorator.cs new file mode 100644 index 0000000..7bbb0cd --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/Internals/ComputedSignalrFactory/OnErrorComputedSignalFactoryDecorator.cs @@ -0,0 +1,106 @@ +using R3; +using SignalsDotnet.Configuration; +using SignalsDotnet.Helpers; +using SignalsDotnet.Internals.Helpers; + +namespace SignalsDotnet.Internals.ComputedSignalrFactory; + +internal class OnErrorComputedSignalFactoryDecorator : IComputedSignalFactory +{ + readonly IComputedSignalFactory _parent; + readonly bool _ignoreOperationCancelled; + readonly Action _onException; + + public OnErrorComputedSignalFactoryDecorator(IComputedSignalFactory parent, bool ignoreOperationCancelled, Action onException) + { + _parent = parent; + _ignoreOperationCancelled = ignoreOperationCancelled; + _onException = onException; + } + + + public IReadOnlySignal Computed(Func func, Func> fallbackValue, ReadonlySignalConfigurationDelegate? configuration = null) + { + return ComputedObservable(func, fallbackValue).ToSignal(configuration!); + } + + public Observable ComputedObservable(Func func, Func> fallbackValue) + { + return _parent.ComputedObservable(() => + { + try + { + return func(); + } + catch (Exception e) + { + NotifyException(e); + throw; + } + }, fallbackValue); + } + + public IAsyncReadOnlySignal AsyncComputed(Func> func, T startValue, Func> fallbackValue, ConcurrentChangeStrategy concurrentChangeStrategy = default, ReadonlySignalConfigurationDelegate? configuration = null) + { + func = func.TraceWhenExecuting(out var isExecuting); + return AsyncComputedObservable(func, startValue, fallbackValue, concurrentChangeStrategy).ToAsyncSignal(isExecuting, configuration!); + } + + public Observable AsyncComputedObservable(Func> func, T startValue, Func> fallbackValue, ConcurrentChangeStrategy concurrentChangeStrategy = default) + { + return _parent.AsyncComputedObservable(async token => + { + try + { + return await func(token); + } + catch (Exception e) + { + NotifyException(e); + throw; + } + }, startValue, fallbackValue, concurrentChangeStrategy); + } + + public Effect Effect(Action onChange, TimeProvider? scheduler = null) + { + return _parent.Effect(() => + { + try + { + onChange(); + } + catch (Exception e) + { + NotifyException(e); + throw; + } + }, scheduler); + } + + public Effect AsyncEffect(Func onChange, ConcurrentChangeStrategy concurrentChangeStrategy = default, TimeProvider? scheduler = null) + { + return _parent.AsyncEffect(async token => + { + try + { + await onChange(token); + } + catch (Exception e) + { + NotifyException(e); + throw; + } + }, concurrentChangeStrategy); + } + + void NotifyException(Exception e) + { + if (_ignoreOperationCancelled && e is OperationCanceledException) + { + return; + } + + Signal.Untracked(() => _onException(e)); + } +} diff --git a/SignalsDotnet/SignalsDotnet/Internals/FromObservableCollectionSignal.cs b/SignalsDotnet/SignalsDotnet/Internals/FromObservableCollectionSignal.cs index faf4595..3d0aef8 100644 --- a/SignalsDotnet/SignalsDotnet/Internals/FromObservableCollectionSignal.cs +++ b/SignalsDotnet/SignalsDotnet/Internals/FromObservableCollectionSignal.cs @@ -1,13 +1,12 @@ using System.Collections.Specialized; -using System.Reactive; -using System.Reactive.Linq; -using System.Reactive.Subjects; +using System.ComponentModel; +using R3; using SignalsDotnet.Configuration; using SignalsDotnet.Internals.Helpers; namespace SignalsDotnet.Internals; -class FromObservableCollectionSignal : Signal, IReadOnlySignal where T : INotifyCollectionChanged +internal class FromObservableCollectionSignal : IReadOnlySignal where T : INotifyCollectionChanged { readonly Subject _collectionChanged = new(); @@ -29,20 +28,22 @@ public FromObservableCollectionSignal(T collection, CollectionChangedSignalConfi { observable.Subscribe(OnCollectionChanged); } - - IObservable collectionChanged = _collectionChanged; - collectionChanged = configuration.CollectionChangedObservableMapper.Invoke(collectionChanged); - Changed = collectionChanged.StartWith(Unit.Default); } - void OnCollectionChanged(EventPattern _) => _collectionChanged.OnNext(default); + void OnCollectionChanged((object? sender, NotifyCollectionChangedEventArgs e) _) => _collectionChanged.OnNext(default); - public IDisposable Subscribe(IObserver observer) => Changed.Select(_ => Value) - .Subscribe(observer); - public IObservable Changed { get; } + public Observable Values => _collectionChanged.Select(_ => Value) + .Prepend(() => Value); + + public Observable FutureValues => _collectionChanged.Select(_ => Value); readonly T _value; - public T Value => GetValue(this, in _value); + public T Value => Signal.GetValue(this, in _value); + object IReadOnlySignal.UntrackedValue => UntrackedValue; public T UntrackedValue => _value; + public event PropertyChangedEventHandler? PropertyChanged; + + Observable IReadOnlySignal.Values => _collectionChanged.Prepend(Unit.Default); + Observable IReadOnlySignal.FutureValues => _collectionChanged; } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/FromObservableSignal.cs b/SignalsDotnet/SignalsDotnet/Internals/FromObservableSignal.cs index a7fe633..e370225 100644 --- a/SignalsDotnet/SignalsDotnet/Internals/FromObservableSignal.cs +++ b/SignalsDotnet/SignalsDotnet/Internals/FromObservableSignal.cs @@ -1,18 +1,19 @@ -using System.Reactive; -using System.Reactive.Linq; -using System.Reactive.Subjects; +using System.ComponentModel; +using R3; using SignalsDotnet.Configuration; using SignalsDotnet.Internals.Helpers; namespace SignalsDotnet.Internals; -class FromObservableSignal : Signal, IReadOnlySignal, IEquatable> +internal class FromObservableSignal : IReadOnlySignal, IEquatable> { readonly ReadonlySignalConfiguration _configuration; - readonly Subject _someoneAskedValueSubject = new(); + readonly Subject _someoneAskedValueSubject = new(); // lock int _someoneAskedValue; // 1 means true, 0 means false - public FromObservableSignal(IObservable observable, +#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. + public FromObservableSignal(Observable observable, +#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable. ReadonlySignalConfigurationDelegate? configuration = null) { if (observable is null) @@ -38,20 +39,40 @@ public FromObservableSignal(IObservable observable, /// /// Dont inline this function with a lambda /// - void SetValue(T value) => Value = value; - + void SetValue(T value) + { + Value = value; + } - T? _value; - public T? Value + T _value; + public T Value { get { NotifySomeoneAskedAValue(); - return GetValue(this, in _value); + return Signal.GetValue(this, in _value); + } + private set + { + if (EqualityComparer.Default.Equals(_value, value)) + return; + + _value = value; + + var propertyChanged = PropertyChanged; + if (propertyChanged is null) + { + return; + } + + using (Signal.UntrackedScope()) + { + propertyChanged(this, Signal.PropertyChangedArgs); + } } - set => SetValue(ref _value, value, _configuration.Comparer, _configuration.RaiseOnlyWhenChanged); } - public T? UntrackedValue => _value; + + public T UntrackedValue => _value; object? IReadOnlySignal.UntrackedValue => UntrackedValue; void NotifySomeoneAskedAValue() @@ -65,8 +86,8 @@ void NotifySomeoneAskedAValue() _someoneAskedValueSubject.Dispose(); } - public IDisposable Subscribe(IObserver observer) => this.OnPropertyChanged(nameof(Value), () => Value) - .Subscribe(observer); + public Observable Values => this.OnPropertyChanged(false); + public Observable FutureValues => this.OnPropertyChanged(true); public bool Equals(FromObservableSignal? other) { @@ -94,5 +115,20 @@ public override bool Equals(object? obj) public static bool operator !=(FromObservableSignal a, FromObservableSignal b) => !(a == b); public override int GetHashCode() => _value is null ? 0 : _configuration.Comparer.GetHashCode(_value); - public IObservable Changed => this.Select(static _ => Unit.Default); + public event PropertyChangedEventHandler? PropertyChanged; + + Observable IReadOnlySignal.Values => this.OnPropertyChangedAsUnit(false); + Observable IReadOnlySignal.FutureValues => this.OnPropertyChangedAsUnit(true); +} + +internal class FromObservableAsyncSignal : FromObservableSignal, IAsyncReadOnlySignal +{ + public FromObservableAsyncSignal(Observable observable, + IReadOnlySignal isExecuting, + ReadonlySignalConfigurationDelegate? configuration = null) : base(observable, configuration) + { + IsComputing = isExecuting; + } + + public IReadOnlySignal IsComputing { get; } } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/Helpers/GenericHelpers.cs b/SignalsDotnet/SignalsDotnet/Internals/Helpers/GenericHelpers.cs new file mode 100644 index 0000000..11b0519 --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/Internals/Helpers/GenericHelpers.cs @@ -0,0 +1,35 @@ +namespace SignalsDotnet.Internals.Helpers; + +internal static class GenericHelpers +{ + public static Func> ToAsyncValueTask(this Func func) + { + if (func is null) + throw new ArgumentNullException(nameof(func)); + + return token => + { + token.ThrowIfCancellationRequested(); + return ValueTask.FromResult(func()); + }; + } + + public static Func> TraceWhenExecuting(this Func> func, out IReadOnlySignal isExecuting) + { + var isExecutingSignal = new Signal(); + isExecuting = isExecutingSignal; + + return async token => + { + try + { + isExecutingSignal.Value = true; + return await func(token); + } + finally + { + isExecutingSignal.Value = false; + } + }; + } +} diff --git a/SignalsDotnet/SignalsDotnet/Internals/Helpers/KeyEqualityComparer.cs b/SignalsDotnet/SignalsDotnet/Internals/Helpers/KeyEqualityComparer.cs index 4e87d0e..a430f69 100644 --- a/SignalsDotnet/SignalsDotnet/Internals/Helpers/KeyEqualityComparer.cs +++ b/SignalsDotnet/SignalsDotnet/Internals/Helpers/KeyEqualityComparer.cs @@ -1,6 +1,6 @@ namespace SignalsDotnet.Internals.Helpers; -class KeyEqualityComparer : IEqualityComparer where TDestination : notnull +internal class KeyEqualityComparer : IEqualityComparer where TDestination : notnull { readonly Func _keyExtractor; readonly EqualityComparer _equalityComparer = EqualityComparer.Default; diff --git a/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableEx.cs b/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableEx.cs new file mode 100644 index 0000000..6b9f6bc --- /dev/null +++ b/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableEx.cs @@ -0,0 +1,67 @@ +using R3; + +namespace SignalsDotnet.Internals.Helpers; + +internal static class ObservableEx +{ + public static FromAsyncContextObservable FromAsyncUsingAsyncContext(Func> asyncAction) + { + return new FromAsyncContextObservable(asyncAction); + } + + public class FromAsyncContextObservable : Observable + { + readonly Func> _asyncAction; + + public FromAsyncContextObservable(Func> asyncAction) + { + _asyncAction = asyncAction; + } + + protected override IDisposable SubscribeCore(Observer observer) + { + var disposable = new CancellationDisposable(); + var token = disposable.Token; + + try + { + var task = _asyncAction(token); + if (task.IsCompleted) + { + observer.OnNext(task.GetAwaiter().GetResult()); + observer.OnCompleted(); + return disposable; + } + + BindObserverToTask(task, observer); + } + catch (Exception e) + { + observer.OnCompleted(e); + } + + return disposable; + } + + static async void BindObserverToTask(ValueTask task, Observer observer) + { + try + { + var result = await task; + observer.OnNext(result); + observer.OnCompleted(); + } + catch (Exception e) + { + try + { + observer.OnCompleted(e); + } + catch + { + // Ignored + } + } + } + } +} diff --git a/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromINotifyCollectionChanged.cs b/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromINotifyCollectionChanged.cs index 77092fb..20f5396 100644 --- a/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromINotifyCollectionChanged.cs +++ b/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromINotifyCollectionChanged.cs @@ -1,13 +1,51 @@ using System.Collections.Specialized; -using System.Reactive; -using System.Reactive.Linq; +using R3; namespace SignalsDotnet.Internals.Helpers; -static class ObservableFromINotifyCollectionChanged +internal static class ObservableFromINotifyCollectionChanged { - public static IObservable> OnCollectionChanged(this INotifyCollectionChanged collection) + public static Observable<(object? sender, NotifyCollectionChangedEventArgs e)> OnCollectionChanged(this INotifyCollectionChanged collection) { - return Observable.FromEventPattern(x => collection.CollectionChanged += x, x => collection.CollectionChanged -= x); + return new CollectionChangedObservable(collection); + } + + class CollectionChangedObservable : Observable<(object? sender, NotifyCollectionChangedEventArgs e)> + { + readonly INotifyCollectionChanged _notifyCollectionChanged; + + public CollectionChangedObservable(INotifyCollectionChanged notifyCollectionChanged) + { + _notifyCollectionChanged = notifyCollectionChanged; + } + + protected override IDisposable SubscribeCore(Observer<(object? sender, NotifyCollectionChangedEventArgs e)> observer) + { + return new Subscription(observer, this); + } + + + struct Subscription : IDisposable + { + readonly Observer<(object? sender, NotifyCollectionChangedEventArgs e)> _observer; + readonly CollectionChangedObservable _observable; + + public Subscription(Observer<(object? sender, NotifyCollectionChangedEventArgs e)> observer, CollectionChangedObservable observable) + { + _observer = observer; + _observable = observable; + _observable._notifyCollectionChanged.CollectionChanged += OnCollectionChanged; + } + + void OnCollectionChanged(object? sender, NotifyCollectionChangedEventArgs e) + { + _observer.OnNext((sender, e)); + } + + public void Dispose() + { + _observable._notifyCollectionChanged.CollectionChanged -= OnCollectionChanged; + } + } } } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromPropertyChanged.cs b/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromPropertyChanged.cs index c25c03e..fff6add 100644 --- a/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromPropertyChanged.cs +++ b/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservableFromPropertyChanged.cs @@ -1,18 +1,156 @@ using System.ComponentModel; -using System.Reactive.Linq; +using R3; namespace SignalsDotnet.Internals.Helpers; -static class ObservableFromPropertyChanged +internal static class ObservableFromPropertyChanged { - public static IObservable OnPropertyChanged(this INotifyPropertyChanged @this, - string propertyName, - Func getter) + public static FromPropertyChangedObservable OnPropertyChanged(this IReadOnlySignal @this, bool futureChangesOnly) { - return Observable.FromEventPattern(x => @this.PropertyChanged += x, - x => @this.PropertyChanged -= x) - .Where(x => x.EventArgs.PropertyName == propertyName) - .Select(_ => getter()) - .StartWith(getter()); + return new FromPropertyChangedObservable(@this, futureChangesOnly); + } + + public static FromPropertyChangedObservableUnit OnPropertyChangedAsUnit(this IReadOnlySignal @this, bool futureChangesOnly) + { + return new FromPropertyChangedObservableUnit(@this, futureChangesOnly); + } + + public class FromPropertyChangedObservableUnit : Observable + { + readonly IReadOnlySignal _signal; + readonly bool _futureChangesOnly; + + public FromPropertyChangedObservableUnit(IReadOnlySignal signal, bool futureChangesOnly) + { + _signal = signal; + _futureChangesOnly = futureChangesOnly; + } + + protected override IDisposable SubscribeCore(Observer observer) + { + return new FromPropertyChangedSubscriptionUnit(observer, this); + } + + + class FromPropertyChangedSubscriptionUnit : IDisposable + { + readonly Observer _observer; + readonly FromPropertyChangedObservableUnit _observable; + readonly object _locker = new(); + bool _isDisposed; + + public FromPropertyChangedSubscriptionUnit(Observer observer, FromPropertyChangedObservableUnit observable) + { + _observer = observer; + _observable = observable; + if (_observable._futureChangesOnly) + { + observable._signal.PropertyChanged += OnPropertyChanged; + return; + } + + _observer.OnNext(Unit.Default); + lock (_locker) + { + if (_isDisposed) + { + return; + } + + observable._signal.PropertyChanged += OnPropertyChanged; + } + } + + void OnPropertyChanged(object? sender, PropertyChangedEventArgs e) + { + _observer.OnNext(Unit.Default); + } + + public void Dispose() + { + if (!_observable._futureChangesOnly) + { + lock (_locker) + { + if (_isDisposed) + return; + + _isDisposed = true; + } + } + + _observable._signal.PropertyChanged -= OnPropertyChanged; + } + } + } + + + public class FromPropertyChangedObservable : Observable + { + readonly IReadOnlySignal _signal; + readonly bool _futureChangesOnly; + + public FromPropertyChangedObservable(IReadOnlySignal signal, bool futureChangesOnly) + { + _signal = signal; + _futureChangesOnly = futureChangesOnly; + } + + protected override IDisposable SubscribeCore(Observer observer) + { + return new FromPropertyChangedSubscription(observer, this); + } + + + class FromPropertyChangedSubscription : IDisposable + { + readonly Observer _observer; + readonly FromPropertyChangedObservable _observable; + readonly object _locker = new(); + bool _isDisposed; + + public FromPropertyChangedSubscription(Observer observer, FromPropertyChangedObservable observable) + { + _observer = observer; + _observable = observable; + if (_observable._futureChangesOnly) + { + observable._signal.PropertyChanged += OnPropertyChanged; + return; + } + + _observer.OnNext(_observable._signal.Value); + lock (_locker) + { + if (_isDisposed) + { + return; + } + + observable._signal.PropertyChanged += OnPropertyChanged; + } + } + + void OnPropertyChanged(object? sender, PropertyChangedEventArgs e) + { + _observer.OnNext(_observable._signal.Value); + } + + public void Dispose() + { + if (!_observable._futureChangesOnly) + { + lock (_locker) + { + if (_isDisposed) + return; + + _isDisposed = true; + } + } + + _observable._signal.PropertyChanged -= OnPropertyChanged; + } + } } } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservablesByKey.cs b/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservablesByKey.cs deleted file mode 100644 index 7d0173e..0000000 --- a/SignalsDotnet/SignalsDotnet/Internals/Helpers/ObservablesByKey.cs +++ /dev/null @@ -1,39 +0,0 @@ -using System.Collections.Concurrent; -using System.Reactive.Linq; -using System.Reactive.Subjects; - -namespace SignalsDotnet.Internals.Helpers; - -class ObservablesByKey where TKey : notnull -{ - readonly ConcurrentDictionary> _eventHandlersByKey = new(); - readonly ConcurrentQueue, IObservable>> _mappers= new(); - - public void Invoke(TKey key, TValue value) - { - if (_eventHandlersByKey.TryGetValue(key, out var handler)) - handler.OnNext(value); - } - - public ObservablesByKey ForEach(Func, IObservable> observableMapper) - { - _mappers.Enqueue(observableMapper); - return this; - } - - public IObservable WhenAny(params TKey[] keys) - { - return keys.Select(When).Merge(); - } - - public IObservable When(TKey key) - { - var currentHandler = _eventHandlersByKey.GetOrAdd(key, static _ => Subject.Synchronize(new Subject())); - - IObservable finalObservable = currentHandler; - foreach (var mapper in _mappers) - finalObservable = mapper(currentHandler); - - return finalObservable; - } -} \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/WeakObservable.cs b/SignalsDotnet/SignalsDotnet/Internals/Helpers/WeakObservable.cs similarity index 79% rename from SignalsDotnet/SignalsDotnet/WeakObservable.cs rename to SignalsDotnet/SignalsDotnet/Internals/Helpers/WeakObservable.cs index e75ab1f..8f3a3ef 100644 --- a/SignalsDotnet/SignalsDotnet/WeakObservable.cs +++ b/SignalsDotnet/SignalsDotnet/Internals/Helpers/WeakObservable.cs @@ -1,17 +1,12 @@ using System.Reflection; +using R3; -namespace SignalsDotnet; +namespace SignalsDotnet.Internals.Helpers; -static class WeakObservable +internal static class WeakObservable { - public static IDisposable SubscribeWeakly(this IObservable source, Action onNext) + public static IDisposable SubscribeWeakly(this Observable source, Action onNext) { - if (source is null) - throw new ArgumentNullException(nameof(source)); - - if (onNext is null) - throw new ArgumentNullException(nameof(onNext)); - var weakObserver = new WeakAction(onNext); var subscription = source.Subscribe(weakObserver.OnNext); weakObserver.GarbageCollected += subscription.Dispose; diff --git a/SignalsDotnet/SignalsDotnet/Signal.cs b/SignalsDotnet/SignalsDotnet/Signal.cs index 114ab0c..14042ef 100644 --- a/SignalsDotnet/SignalsDotnet/Signal.cs +++ b/SignalsDotnet/SignalsDotnet/Signal.cs @@ -1,78 +1,152 @@ using System.Collections.Concurrent; using System.ComponentModel; -using System.Reactive; -using System.Runtime.CompilerServices; -using SignalsDotnet.Internals.Helpers; +using R3; namespace SignalsDotnet; -public abstract partial class Signal +public static partial class Signal { - static readonly ConcurrentDictionary _untrackedThreads = new(); - static readonly ObservablesByKey _propertyRequested = new(); - public static IObservable PropertiesRequested(Thread thread) => _propertyRequested.When(thread); + static uint _nextComputedSignalAffinityValue; + static readonly AsyncLocal _computedSignalAffinityValue = new(); + static readonly ConcurrentDictionary> _signalRequestedByComputedAffinity = new(); + internal static readonly PropertyChangedEventArgs PropertyChangedArgs = new("Value"); - public static T Untracked(Func action) + internal static Observable SignalsRequested() + { + return new SignalsRequestedObservable(); + } + + public static UntrackedReleaserDisposable UntrackedScope() + { + uint oldAffinity; + lock (_computedSignalAffinityValue) + { + oldAffinity = _computedSignalAffinityValue.Value; + _computedSignalAffinityValue.Value = _nextComputedSignalAffinityValue; + unchecked + { + _nextComputedSignalAffinityValue++; + } + } + + return new UntrackedReleaserDisposable(oldAffinity); + } + + public static async Task Untracked(Func> action) { if (action is null) throw new ArgumentNullException(nameof(action)); - var currentThread = Thread.CurrentThread; - var nestingCount = _untrackedThreads.AddOrUpdate(currentThread, static _ => 0, static (_, nestingCount) => nestingCount + 1); + using (UntrackedScope()) + { + return await action(); + } + } - try + public static async Task Untracked(Func action) + { + if (action is null) + throw new ArgumentNullException(nameof(action)); + + using (UntrackedScope()) { - return action(); + await action(); } - finally + } + + public static T Untracked(Func action) + { + if (action is null) + throw new ArgumentNullException(nameof(action)); + + using (UntrackedScope()) { - if (nestingCount == 0) - { - _untrackedThreads.TryRemove(currentThread, out _); - } - else - { - _untrackedThreads[currentThread] = nestingCount - 1; - } + return action(); } } public static void Untracked(Action action) { - Untracked(() => + if (action is null) + throw new ArgumentNullException(nameof(action)); + + using (UntrackedScope()) { action(); - return Unit.Default; - }); + } } - public event PropertyChangedEventHandler? PropertyChanged; - - protected void OnPropertyChanged([CallerMemberName] string? propertyName = null) + internal static T GetValue(IReadOnlySignal property, in T value) { - PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(propertyName)); + uint affinityValue; + lock (_computedSignalAffinityValue) + { + affinityValue = _computedSignalAffinityValue.Value; + } + + if (_signalRequestedByComputedAffinity.TryGetValue(affinityValue, out var subject)) + { + subject.OnNext(property); + } + + return value; } - protected void SetValue(ref T field, - T value, - IEqualityComparer equalityComparer, - bool raiseOnlyWhenChanged, - [CallerMemberName] string? propertyName = null) + public class SignalsRequestedObservable : Observable { - if (raiseOnlyWhenChanged && equalityComparer.Equals(field, value)) - return; + protected override IDisposable SubscribeCore(Observer observer) + { + lock (_computedSignalAffinityValue) + { + var affinityValue = _nextComputedSignalAffinityValue; + + unchecked + { + _nextComputedSignalAffinityValue++; + } - field = value; - OnPropertyChanged(propertyName); + _computedSignalAffinityValue.Value = affinityValue; + var subject = new Subject(); + _signalRequestedByComputedAffinity.TryAdd(affinityValue, subject); + + subject.Subscribe(observer.OnNext, observer.OnErrorResume, observer.OnCompleted); + return new SignalsRequestedDisposable(affinityValue, subject); + } + } } + readonly struct SignalsRequestedDisposable : IDisposable + { + readonly uint _affinityValue; + readonly Subject _subject; + + public SignalsRequestedDisposable(uint affinityValue, Subject subject) + { + _affinityValue = affinityValue; + _subject = subject; + } + public void Dispose() + { + _signalRequestedByComputedAffinity.TryRemove(_affinityValue, out _); + _subject.Dispose(); + } + } - protected static T GetValue(IReadOnlySignal property, in T value) + public readonly struct UntrackedReleaserDisposable : IDisposable { - var currentThread = Thread.CurrentThread; - if (!_untrackedThreads.ContainsKey(currentThread)) - _propertyRequested.Invoke(currentThread, property); + readonly uint _oldValue; - return value; + public UntrackedReleaserDisposable(uint oldValue) + { + _oldValue = oldValue; + } + public void Dispose() + { + lock (_computedSignalAffinityValue) + { + _computedSignalAffinityValue.Value = _oldValue; + } + } } -} \ No newline at end of file +} + diff --git a/SignalsDotnet/SignalsDotnet/Signal_Computed.cs b/SignalsDotnet/SignalsDotnet/Signal_Computed.cs index 2c77e0b..879c6e5 100644 --- a/SignalsDotnet/SignalsDotnet/Signal_Computed.cs +++ b/SignalsDotnet/SignalsDotnet/Signal_Computed.cs @@ -1,9 +1,6 @@ -using System; -using System.Reactive; -using System.Reactive.Concurrency; -using System.Reactive.Disposables; -using System.Reactive.Linq; +using R3; using SignalsDotnet.Configuration; +using SignalsDotnet.Helpers; using SignalsDotnet.Internals; using SignalsDotnet.Internals.Helpers; @@ -13,87 +10,101 @@ public partial class Signal { public static IReadOnlySignal Computed(Func func, Func fallbackValue, ReadonlySignalConfigurationDelegate? configuration = null) { - if (func is null) - throw new ArgumentNullException(nameof(func)); - - if (fallbackValue is null) - throw new ArgumentNullException(nameof(fallbackValue)); + return Computed(func.ToAsyncValueTask(), default, () => new Optional(fallbackValue()), default, configuration); + } - return Computed(func, () => new Optional(fallbackValue()), configuration); + public static IReadOnlySignal Computed(Func func, Func> fallbackValue, ReadonlySignalConfigurationDelegate? configuration = null) + { + return Computed(func.ToAsyncValueTask(), default, fallbackValue, default, configuration); } - public static IReadOnlySignal Computed(Func func, ReadonlySignalConfigurationDelegate? configuration = null) + public static IReadOnlySignal Computed(Func func, ReadonlySignalConfigurationDelegate? configuration = null) { - if (func is null) - throw new ArgumentNullException(nameof(func)); + return Computed(func.ToAsyncValueTask(), default, static () => Optional.Empty, default, configuration); + } + - return Computed(func, static () => Optional.Empty, configuration); + public static IAsyncReadOnlySignal AsyncComputed(Func> func, + T startValue, + Func> fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, + ReadonlySignalConfigurationDelegate? configuration = null) + { + func = func.TraceWhenExecuting(out var isExecuting); + return AsyncComputed(func, new Optional(startValue), fallbackValue, isExecuting, concurrentChangeStrategy, configuration!); } + public static IAsyncReadOnlySignal AsyncComputed(Func> func, + T startValue, + Func fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, + ReadonlySignalConfigurationDelegate? configuration = null) + { + return AsyncComputed(func, startValue, () => new Optional(fallbackValue()), concurrentChangeStrategy, configuration); + } - internal static IReadOnlySignal Computed(Func func, Func> fallbackValue, ReadonlySignalConfigurationDelegate? configuration) + public static IAsyncReadOnlySignal AsyncComputed(Func> func, + T startValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default, + ReadonlySignalConfigurationDelegate? configuration = null) { - var valueObservable = ComputedObservable(func, fallbackValue, null); + return AsyncComputed(func, startValue, static () => Optional.Empty, concurrentChangeStrategy, configuration); + } + - return new FromObservableSignal(valueObservable, configuration)!; + + public static Observable ComputedObservable(Func func, + Func> fallbackValue) + { + return ComputedObservable(func.ToAsyncValueTask(), fallbackValue); } - internal static IObservable ComputedObservable(Func func, Func> fallbackValue, Func>? scheduler = null) + + public static Observable AsyncComputedObservable(Func> func, + T startValue, + Func> fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy = default) { - return Observable.Create(observer => - { - var disposable = new SerialDisposable(); - OnNewResult(ComputeResult(func, fallbackValue, scheduler)); - void OnNewResult(ComputationResult result) - { - disposable.Disposable = result.NextResult - .Take(1) - .Subscribe(OnNewResult); - - // We notify a new value only if the func() evaluation succeeds. - if (result.ResultOptional.TryGetValue(out var propertyValue)) - observer.OnNext(propertyValue); - } - - return disposable; - }); + return ComputedObservable(func, fallbackValue, concurrentChangeStrategy: concurrentChangeStrategy).Prepend(startValue); } - static ComputationResult ComputeResult(Func resultFunc, - Func> fallbackValue, - Func>? scheduler) + internal static IReadOnlySignal Computed(Func> func, + Optional startValueOptional, + Func> fallbackValue, + ConcurrentChangeStrategy concurrentChangeStrategy, + ReadonlySignalConfigurationDelegate? configuration) { - var referenceEquality = ReferenceEqualityComparer.Instance; - HashSet propertiesRequested = new(referenceEquality); - Optional result; - using (PropertiesRequested(Thread.CurrentThread).Subscribe(x => propertiesRequested.Add(x))) + var valueObservable = ComputedObservable(func, fallbackValue, null, concurrentChangeStrategy); + if (startValueOptional.TryGetValue(out var startValue)) { - try - { - result = new(resultFunc()); - } - catch - { - // If something fails, the property will have the previous result, - // We still have to observe for the properties to change (maybe next time the exception will not be thrown) - result = fallbackValue(); - } + valueObservable = valueObservable.Prepend(startValue); } + return new FromObservableSignal(valueObservable, configuration); + } - var observable = WhenAnyChanged(propertiesRequested).Skip(1); // We want to observe only future changes - - if (scheduler is not null) + internal static IAsyncReadOnlySignal AsyncComputed(Func> func, + Optional startValueOptional, + Func> fallbackValue, + IReadOnlySignal isExecuting, + ConcurrentChangeStrategy concurrentChangeStrategy, + ReadonlySignalConfigurationDelegate? configuration) + { + var valueObservable = ComputedObservable(func, fallbackValue, null, concurrentChangeStrategy); + if (startValueOptional.TryGetValue(out var startValue)) { - observable = observable.Select(scheduler) - .Switch(); + valueObservable = valueObservable.Prepend(startValue); } - var restulObsrvable = observable.Select(_ => ComputeResult(resultFunc, fallbackValue, scheduler)); - - return new(restulObsrvable, result); + return new FromObservableAsyncSignal(valueObservable, isExecuting, configuration); } - record struct ComputationResult(IObservable> NextResult, Optional ResultOptional); + internal static Observable ComputedObservable(Func> func, + Func> fallbackValue, + Func>? scheduler = null, + ConcurrentChangeStrategy concurrentChangeStrategy = default) + { + return new ComputedObservable(func, fallbackValue, scheduler, concurrentChangeStrategy); + } } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Signal_Factory.cs b/SignalsDotnet/SignalsDotnet/Signal_Factory.cs index e061d3e..5c51551 100644 --- a/SignalsDotnet/SignalsDotnet/Signal_Factory.cs +++ b/SignalsDotnet/SignalsDotnet/Signal_Factory.cs @@ -1,4 +1,5 @@ using System.Collections.Specialized; +using R3; using SignalsDotnet.Configuration; using SignalsDotnet.Internals; @@ -8,10 +9,10 @@ public partial class Signal { public static Signal Create(SignalConfigurationDelegate? configurator = null) { - return new Signal(configurator); + return new Signal(configurator!); } - public static Signal Create(T? startValue, SignalConfigurationDelegate? configurator = null) + public static Signal Create(T startValue, SignalConfigurationDelegate? configurator = null) { return new Signal(startValue, configurator); } @@ -22,12 +23,12 @@ public static IReadOnlySignal FromObservableCollection } public static CollectionSignal CreateCollectionSignal(CollectionChangedSignalConfigurationDelegate? collectionChangedConfiguration = null, - SignalConfigurationDelegate>? propertyChangedConfiguration = null) where TCollection : class, INotifyCollectionChanged + SignalConfigurationDelegate?>? propertyChangedConfiguration = null) where TCollection : class, INotifyCollectionChanged { return new CollectionSignal(collectionChangedConfiguration, propertyChangedConfiguration); } - public static IReadOnlySignal FromObservable(IObservable observable, ReadonlySignalConfigurationDelegate? configuration = null) + public static IReadOnlySignal FromObservable(Observable observable, ReadonlySignalConfigurationDelegate? configuration = null) { return new FromObservableSignal(observable, configuration); } @@ -35,12 +36,20 @@ public static CollectionSignal CreateCollectionSignal( public static class SignalFactoryExtensions { - public static IReadOnlySignal ToSignal(this IObservable @this, + public static IReadOnlySignal ToSignal(this Observable @this, ReadonlySignalConfigurationDelegate? configurator = null) { return new FromObservableSignal(@this, configurator); } + internal static IAsyncReadOnlySignal ToAsyncSignal(this Observable @this, + IReadOnlySignal isExecuting, + ReadonlySignalConfigurationDelegate? configurator = null) + { + return new FromObservableAsyncSignal(@this, isExecuting, configurator); + } + + public static IReadOnlySignal ToCollectionSignal(this TCollection collection, CollectionChangedSignalConfigurationDelegate? configurator = null) where TCollection : INotifyCollectionChanged { diff --git a/SignalsDotnet/SignalsDotnet/Signal_T.cs b/SignalsDotnet/SignalsDotnet/Signal_T.cs index ac223bb..b0744ab 100644 --- a/SignalsDotnet/SignalsDotnet/Signal_T.cs +++ b/SignalsDotnet/SignalsDotnet/Signal_T.cs @@ -1,18 +1,18 @@ -using System.Reactive; -using System.Reactive.Linq; +using System.ComponentModel; +using R3; using SignalsDotnet.Configuration; using SignalsDotnet.Internals.Helpers; namespace SignalsDotnet; -public class Signal : Signal, IReadOnlySignal, IEquatable> +public class Signal : IReadOnlySignal, IEquatable> { readonly SignalConfiguration _configuration; - public Signal(SignalConfigurationDelegate? configurator = null) : this(default, configurator) + public Signal(SignalConfigurationDelegate? configurator = null) : this(default!, configurator!) { } - public Signal(T? startValue, SignalConfigurationDelegate? configurator = null) + public Signal(T startValue, SignalConfigurationDelegate? configurator = null) { var configuration = SignalConfiguration.Default; if (configurator != null) @@ -20,26 +20,44 @@ public Signal(T? startValue, SignalConfigurationDelegate? configurator = null _configuration = configuration; _value = startValue; - } - T? _value; - public T? Value + T _value; + public T Value { - get => GetValue(this, in _value); - set => SetValue(ref _value, value, _configuration.Comparer, _configuration.RaiseOnlyWhenChanged); + get => Signal.GetValue(this, in _value); + set + { + if (EqualityComparer.Default.Equals(_value, value)) + return; + + _value = value; + + var propertyChanged = PropertyChanged; + if (propertyChanged is null) + { + return; + } + + using (Signal.UntrackedScope()) + { + propertyChanged(this, Signal.PropertyChangedArgs); + } + } } - public T? UntrackedValue => _value; + + public T UntrackedValue => _value; object? IReadOnlySignal.UntrackedValue => UntrackedValue; - public IDisposable Subscribe(IObserver observer) => this.OnPropertyChanged(nameof(Value), () => Value) - .Subscribe(observer); + public Observable FutureValues => this.OnPropertyChanged(true); + public Observable Values => this.OnPropertyChanged(false); public bool Equals(Signal? other) { if (other is null) return false; + if (ReferenceEquals(this, other)) return true; @@ -50,8 +68,10 @@ public override bool Equals(object? obj) { if (obj is null) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != GetType()) return false; @@ -62,6 +82,8 @@ public override bool Equals(object? obj) public static bool operator !=(Signal a, Signal b) => !(a == b); public override int GetHashCode() => _value is null ? 0 : _configuration.Comparer.GetHashCode(_value!); - public IObservable Changed => this.Select(static _ => Unit.Default); + public event PropertyChangedEventHandler? PropertyChanged; + Observable IReadOnlySignal.Values => this.OnPropertyChangedAsUnit(false); + Observable IReadOnlySignal.FutureValues => this.OnPropertyChangedAsUnit(true); } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/Signal_WhenAnyChanged.cs b/SignalsDotnet/SignalsDotnet/Signal_WhenAnyChanged.cs index 471d555..c46e56a 100644 --- a/SignalsDotnet/SignalsDotnet/Signal_WhenAnyChanged.cs +++ b/SignalsDotnet/SignalsDotnet/Signal_WhenAnyChanged.cs @@ -1,16 +1,15 @@ -using System.Reactive; -using System.Reactive.Linq; +using R3; namespace SignalsDotnet; -public abstract partial class Signal +public static partial class Signal { - public static IObservable WhenAnyChanged(params IReadOnlySignal[] signals) + public static Observable WhenAnyChanged(params IReadOnlySignal[] signals) { return WhenAnyChanged((IReadOnlyCollection)signals); } - public static IObservable WhenAnyChanged(IReadOnlyCollection signals) + public static Observable WhenAnyChanged(IReadOnlyCollection signals) { if (signals is null) throw new ArgumentNullException(nameof(signals)); @@ -18,8 +17,8 @@ public static IObservable WhenAnyChanged(IReadOnlyCollection(); - return signals.Select(x => x.Changed) + return signals.Select(x => x.FutureValues) .Merge() - .Skip(signals.Count - 1); // All observable start with a value, so we skip them but one. + .Prepend(Unit.Default); } } \ No newline at end of file diff --git a/SignalsDotnet/SignalsDotnet/SignalsDotnet.csproj b/SignalsDotnet/SignalsDotnet/SignalsDotnet.csproj index 80e605a..4431f8c 100644 --- a/SignalsDotnet/SignalsDotnet/SignalsDotnet.csproj +++ b/SignalsDotnet/SignalsDotnet/SignalsDotnet.csproj @@ -1,13 +1,13 @@  - net6.0 + net8.0 enable enable Porting of Angular signals to dotnet README.md https://github.com/fedeAlterio/SignalsDotnet - 1.0.1 + 2.0.0 @@ -18,7 +18,7 @@ - + diff --git a/SignalsDotnet/SignalsDotnet/ThrottleOneCycleObservable.cs b/SignalsDotnet/SignalsDotnet/ThrottleOneCycleObservable.cs deleted file mode 100644 index 9dc848a..0000000 --- a/SignalsDotnet/SignalsDotnet/ThrottleOneCycleObservable.cs +++ /dev/null @@ -1,16 +0,0 @@ -using System.Reactive.Concurrency; -using System.Reactive.Linq; - -namespace SignalsDotnet; - -public static class ThrottleOneCycleObservable -{ - public static IObservable ThrottleOneCycle(this IObservable observable, IScheduler scheduler) - { - if (observable is null) - throw new ArgumentNullException(nameof(observable)); - - return observable.Select(x => Observable.Return(x, scheduler)) - .Switch(); - } -} \ No newline at end of file diff --git a/assets/demo.gif b/assets/demo.gif new file mode 100644 index 0000000..7423a43 Binary files /dev/null and b/assets/demo.gif differ