Skip to content

Commit

Permalink
Merge pull request #11 from Kritner/FEATURE/observer
Browse files Browse the repository at this point in the history
Orleans Observer
  • Loading branch information
Kritner committed Jan 15, 2019
2 parents a0f5d0e + 1766681 commit 64026df
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Kritner.OrleansGettingStarted.Client.Helpers;
using Kritner.OrleansGettingStarted.GrainInterfaces;
using Orleans;

namespace Kritner.OrleansGettingStarted.Client.OrleansFunctionExamples
{
public class GrainObserverEventSender : IOrleansFunction
{
public string Description => "This function can be used to send a message to subscribed observers.";

public async Task PerformFunction(IClusterClient clusterClient)
{
var grain = clusterClient.GetGrain<IObservableManager>(0);

Console.WriteLine("Enter a message to send to subscribed observers.");
var message = Console.ReadLine();

await grain.SendMessageToObservers(message);

ConsoleHelpers.ReturnToMenu();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;
using System.Threading.Tasks;
using Kritner.OrleansGettingStarted.Client.Helpers;
using Kritner.OrleansGettingStarted.GrainInterfaces;
using Orleans;

namespace Kritner.OrleansGettingStarted.Client.OrleansFunctionExamples
{
public class GrainObserverReceiver : IOrleansFunction, IObserverSample
{
private bool _shouldBreakLoop;

public string Description => "Acts as a receiver of observed messages. When the observer manager notifies subscribed observers like this class, they take action on the notification.";

public async Task PerformFunction(IClusterClient clusterClient)
{
Console.WriteLine("Observing for behavior, stops once behavior observed.");

var observerManager = clusterClient.GetGrain<IObservableManager>(0);
var observerRef = await clusterClient
.CreateObjectReference<IObserverSample>(this);

while (!_shouldBreakLoop)
{
await observerManager.Subscribe(observerRef);
await Task.Delay(5000);
}

await observerManager.Unsubscribe(observerRef);

ConsoleHelpers.ReturnToMenu();
}

public void ReceiveMessage(string message)
{
ConsoleHelpers.LineSeparator();
Console.WriteLine("Observed Behavior:");
Console.WriteLine(message);

_shouldBreakLoop = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ public IList<IOrleansFunction> GetOrleansFunctions()
new StatefulWork(),
new ShowoffDashboard(),
new DependencyInjectionEmailService(),
new EverythingIsOkReminder()
new EverythingIsOkReminder(),
new GrainObserverReceiver(),
new GrainObserverEventSender(),
};
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Orleans;
using System.Threading.Tasks;

namespace Kritner.OrleansGettingStarted.GrainInterfaces
{
public interface IObservableManager : IGrainWithIntegerKey, IGrainInterfaceMarker
{
Task Subscribe(IObserverSample observer);
Task Unsubscribe(IObserverSample observer);
Task SendMessageToObservers(string message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Orleans;
using System;

namespace Kritner.OrleansGettingStarted.GrainInterfaces
{
public interface IObserverSample : IGrainObserver
{
void ReceiveMessage(string message);
}
}
38 changes: 38 additions & 0 deletions src/Kritner.OrleansGettingStarted.Grains/ObservableManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using Kritner.OrleansGettingStarted.GrainInterfaces;
using Orleans;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Kritner.OrleansGettingStarted.Grains
{
public class ObservableManager : Grain, IObservableManager, IGrainMarker
{
private GrainObserverManager<IObserverSample> _subsManager;

public override async Task OnActivateAsync()
{
_subsManager = new GrainObserverManager<IObserverSample>();
await base.OnActivateAsync();
}

public Task SendMessageToObservers(string message)
{
_subsManager.Notify(n => n.ReceiveMessage(message));
return Task.CompletedTask;
}

public Task Subscribe(IObserverSample observer)
{
_subsManager.Subscribe(observer);
return Task.CompletedTask;
}

public Task Unsubscribe(IObserverSample observer)
{
_subsManager.Unsubscribe(observer);
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
using Orleans.Runtime;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Kritner.OrleansGettingStarted.Grains
{
/// <summary>
/// Maintains a collection of grain observers.
/// </summary>
/// <typeparam name="T">
/// The grain observer type.
/// </typeparam>
public class GrainObserverManager<T> : IEnumerable<T> where T : IAddressable
{
/// <summary>
/// The observers.
/// </summary>
private readonly Dictionary<T, DateTime> observers = new Dictionary<T, DateTime>();

/// <summary>
/// Initializes a new instance of the <see cref="GrainObserverManager{T}"/> class.
/// </summary>
public GrainObserverManager()
{
this.GetDateTime = () => DateTime.UtcNow;
}

/// <summary>
/// Gets or sets the delegate used to get the date and time, for expiry.
/// </summary>
public Func<DateTime> GetDateTime { get; set; }

/// <summary>
/// Gets or sets the expiration time span, after which observers are lazily removed.
/// </summary>
public TimeSpan ExpirationDuration { get; set; } = TimeSpan.FromMinutes(1);

/// <summary>
/// Gets the number of observers.
/// </summary>
public int Count => this.observers.Count;

/// <summary>
/// Removes all observers.
/// </summary>
public void Clear()
{
this.observers.Clear();
}

/// <summary>
/// Ensures that the provided <paramref name="observer"/> is subscribed, renewing its subscription.
/// </summary>
/// <param name="observer">The observer.</param>
public void Subscribe(T observer)
{
// Add or update the subscription.
this.observers[observer] = this.GetDateTime();
}

/// <summary>
/// Ensures that the provided <paramref name="observer"/> is unsubscribed.
/// </summary>
/// <param name="observer">The observer.</param>
public void Unsubscribe(T observer)
{
this.observers.Remove(observer);
}

/// <summary>
/// Notifies all observers.
/// </summary>
/// <param name="notification">
/// The notification delegate to call on each observer.
/// </param>
/// <param name="predicate">The predicate used to select observers to notify.</param>
/// <returns>
/// A <see cref="Task"/> representing the work performed.
/// </returns>
public async Task Notify(Func<T, Task> notification, Func<T, bool> predicate = null)
{
var now = this.GetDateTime();
var defunct = default(List<T>);
foreach (var observer in this.observers)
{
if (observer.Value + this.ExpirationDuration < now)
{
// Expired observers will be removed.
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
continue;
}

// Skip observers which don't match the provided predicate.
if (predicate != null && !predicate(observer.Key))
{
continue;
}

try
{
await notification(observer.Key);
}
catch (Exception)
{
// Failing observers are considered defunct and will be removed..
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
}
}

// Remove defunct observers.
if (defunct != default(List<T>))
{
foreach (var observer in defunct)
{
this.observers.Remove(observer);
}
}
}

/// <summary>
/// Notifies all observers which match the provided <paramref name="predicate"/>.
/// </summary>
/// <param name="notification">
/// The notification delegate to call on each observer.
/// </param>
/// <param name="predicate">The predicate used to select observers to notify.</param>
public void Notify(Action<T> notification, Func<T, bool> predicate = null)
{
var now = this.GetDateTime();
var defunct = default(List<T>);
foreach (var observer in this.observers)
{
if (observer.Value + this.ExpirationDuration < now)
{
// Expired observers will be removed.
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
continue;
}

// Skip observers which don't match the provided predicate.
if (predicate != null && !predicate(observer.Key))
{
continue;
}

try
{
notification(observer.Key);
}
catch (Exception)
{
// Failing observers are considered defunct and will be removed..
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
}
}

// Remove defunct observers.
if (defunct != default(List<T>))
{
foreach (var observer in defunct)
{
this.observers.Remove(observer);
}
}
}

/// <summary>
/// Removed all expired observers.
/// </summary>
public void ClearExpired()
{
var now = this.GetDateTime();
var defunct = default(List<T>);
foreach (var observer in this.observers)
{
if (observer.Value + this.ExpirationDuration < now)
{
// Expired observers will be removed.
defunct = defunct ?? new List<T>();
defunct.Add(observer.Key);
}
}

// Remove defunct observers.
if (defunct != default(List<T>))
{
foreach (var observer in defunct)
{
this.observers.Remove(observer);
}
}
}

/// <summary>
/// Returns the enumerator for all observers.
/// </summary>
/// <returns>The enumerator for all observers.</returns>
public IEnumerator<T> GetEnumerator()
{
return this.observers.Keys.GetEnumerator();
}

/// <summary>
/// Returns the enumerator for all observers.
/// </summary>
/// <returns>The enumerator for all observers.</returns>
IEnumerator IEnumerable.GetEnumerator()
{
return this.observers.Keys.GetEnumerator();
}
}
}

0 comments on commit 64026df

Please sign in to comment.