Skip to content

Commit

Permalink
feat: implement basic type of meters (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Aug 25, 2024
1 parent 54f8b8c commit e373dd3
Show file tree
Hide file tree
Showing 19 changed files with 451 additions and 20 deletions.
1 change: 1 addition & 0 deletions Streamistry.Core/Mapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public Mapper(IChainablePipe<TInput> upstream, Func<TInput?, TOutput?> function)
Function = function;
}

[Meter]
public void Emit(TInput? obj)
=> PushDownstream(Invoke(obj));

Expand Down
29 changes: 29 additions & 0 deletions Streamistry.Core/Observability/BaseTracer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability;
public abstract class BaseTracer : ITracer
{
public abstract IDisposable? StartActiveSpan(string spanName);

protected class TracerScope : IDisposable
{
private string SpanName { get; }
public Stopwatch StopWatch { get; }

public TracerScope(string spanName)
{
SpanName = spanName;
StopWatch = Stopwatch.StartNew();
}

public void Dispose()
{
Console.WriteLine($"Ending span '{SpanName}' in {StopWatch.ElapsedTicks} ticks");
}
}
}
16 changes: 16 additions & 0 deletions Streamistry.Core/Observability/ConsolePublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability.Measurements;

namespace Streamistry.Observability;
public class ConsolePublisher
{
public void Publish(IMeasurement measurement)
{
Console.WriteLine($"[{DateTime.UtcNow}] '{measurement}'");
}
}
22 changes: 3 additions & 19 deletions Streamistry.Core/Observability/ConsoleTracer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,13 @@
using System.Threading.Tasks;

namespace Streamistry.Observability;
public class ConsoleTracer : ITracer

public class ConsoleTracer : BaseTracer
{
public IDisposable? StartActiveSpan(string spanName)
public override IDisposable? StartActiveSpan(string spanName)
{
spanName = $"{spanName} [{Guid.NewGuid()}]";
Console.WriteLine($"Starting span '{spanName}'");
return new TracerScope(spanName);
}

private class TracerScope : IDisposable
{
private string SpanName { get; }
public Stopwatch StopWatch { get; }

public TracerScope(string spanName)
{
SpanName = spanName;
StopWatch = Stopwatch.StartNew();
}

public void Dispose()
{
Console.WriteLine($"Ending span '{SpanName}' in {StopWatch.ElapsedTicks} ticks");
}
}
}
12 changes: 12 additions & 0 deletions Streamistry.Core/Observability/Measurements/CountMeasurement.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability.Measurements;
public readonly record struct CountMeasurement(int Value) : IMeasurement
{
public override string ToString()
=> $"{Value} batches";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability.Measurements;
public readonly record struct HistogramMeasurement(KeyValuePair<string, int>[] Buckets) : IMeasurement
{
public override string ToString()
=> $"[{string.Join(", ", BucketsToString(Buckets))}]";

private static IEnumerable<string> BucketsToString(KeyValuePair<string, int>[] buckets)
{
foreach (var bucket in buckets)
yield return $"{{{bucket.Key} => {bucket.Value}}}";
}
}
10 changes: 10 additions & 0 deletions Streamistry.Core/Observability/Measurements/IMeasurement.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability.Measurements;
public interface IMeasurement
{
}
15 changes: 15 additions & 0 deletions Streamistry.Core/Observability/Measurements/RateMeasurement.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability.Measurements;
public readonly record struct RateMeasurement(int Value, TimeSpan window) : IMeasurement
{
public override string ToString()
=> $"{Math.Round(RatePerSecond, 3)} batch/sec";

public double RatePerSecond
=> window.Ticks > 0 ? (Value * 1e7 / window.Ticks ) : double.NaN;
}
30 changes: 30 additions & 0 deletions Streamistry.Core/Observability/MeterAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Diagnostics;
using System.Transactions;
using MethodBoundaryAspect.Fody.Attributes;
using Streamistry.Observability.Meters;

namespace Streamistry.Observability;

public sealed class MeterAttribute : OnMethodBoundaryAspect
{
public override void OnEntry(MethodExecutionArgs args)
{
var observable = args.Instance as IObservablePipe ?? throw new InvalidOperationException();
var meters = observable
?.GetObservabilityProvider()
?.GetMeters(observable);

if (meters is not null && meters.Length>0)
{
var timestamp = DateTime.UtcNow;
foreach (var meter in meters)
meter.Append(args.Arguments[0], timestamp);
}
}

public override void OnExit(MethodExecutionArgs args)
{ }

public override void OnException(MethodExecutionArgs args)
{ }
}
49 changes: 49 additions & 0 deletions Streamistry.Core/Observability/Meters/BaseMeter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability.Measurements;
using Streamistry.Observability.Thresholds;

namespace Streamistry.Observability.Meters;
public abstract class BaseMeter : IMeter
{
private Action<IMeasurement>? Publish { get; }
private IThreshold[] Thresholds { get; }

protected BaseMeter(IThreshold[] thresholds, Action<IMeasurement>[] publishers)
{
if (thresholds is null || thresholds.Length == 0)
thresholds = [new MaxCardinality(1000)];
Thresholds = thresholds;

foreach (var publisher in publishers)
Publish += publisher;
}

protected abstract int Count();
protected abstract void Update(object value, DateTime timestamp);
public void Append(object value, DateTime timestamp)
{
Update(value, timestamp);

foreach (var threshold in Thresholds)
{
if (threshold.Check(Count(), timestamp))
{
Trigger();
break;
}
}
}

protected abstract void Reset();
protected abstract IMeasurement CreateMeasurement();
public void Trigger()
{
Publish?.Invoke(CreateMeasurement());
Reset();
}
}
29 changes: 29 additions & 0 deletions Streamistry.Core/Observability/Meters/Counter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability.Measurements;
using Streamistry.Observability.Thresholds;

namespace Streamistry.Observability.Meters;
internal class Counter : BaseMeter
{
protected int Value { get; set; }

public Counter(IThreshold threshold, Action<IMeasurement> publisher)
: this([threshold], [publisher]) { }

public Counter(IThreshold[] thresholds, Action<IMeasurement>[] publishers)
:base(thresholds, publishers) { }

protected override void Update(object value, DateTime timestamp)
=> Value += 1;

protected override void Reset()
=> Value = 0;
protected override int Count()
=> Value;
protected override IMeasurement CreateMeasurement()
=> new CountMeasurement(Value);
}
45 changes: 45 additions & 0 deletions Streamistry.Core/Observability/Meters/Histogram.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability.Measurements;
using Streamistry.Observability.Thresholds;

namespace Streamistry.Observability.Meters;
internal class Histogram<TInput> : BaseMeter
{
protected Func<TInput, string> Bucketizer { get; }
protected int CountItems { get; set; }
protected Dictionary<string, int> Store { get; } = [];

public Histogram(Func<TInput, string> bucketizer, IThreshold threshold, Action<IMeasurement> publisher)
: this(bucketizer, [threshold], [publisher]) { }

public Histogram(Func<TInput, string> bucketizer, IThreshold[] thresholds, Action<IMeasurement>[] publishers)
: base(thresholds, publishers)
=> Bucketizer = bucketizer;

protected override int Count()
=> CountItems;

protected override void Update(object value, DateTime timestamp)
{
var bucket = Bucketizer.Invoke((TInput)value);
if (Store.TryGetValue(bucket, out var count))
Store[bucket] = ++count;
else
Store.Add(bucket, 1);
CountItems += 1;
}

protected override void Reset()
{
foreach (var bucket in Store.Keys)
Store[bucket] = 0;
CountItems = 0;
}

protected override IMeasurement CreateMeasurement()
=> new HistogramMeasurement([.. Store]);
}
12 changes: 12 additions & 0 deletions Streamistry.Core/Observability/Meters/IMeter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability.Meters;
public interface IMeter
{
void Append(object value, DateTime timestamp);
void Trigger();
}
36 changes: 36 additions & 0 deletions Streamistry.Core/Observability/Meters/RateMeter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability.Measurements;
using Streamistry.Observability.Thresholds;

namespace Streamistry.Observability.Meters;
internal class RateMeter : Counter
{
private DateTime? StartWindow { get; set; }
private DateTime? EndWindow { get; set; }

public RateMeter(IThreshold threshold, Action<IMeasurement> publisher)
: this([threshold], [publisher]) { }

public RateMeter(IThreshold[] thresholds, Action<IMeasurement>[] publishers)
: base(thresholds, publishers) { }

protected override void Update(object value, DateTime timestamp)
{
base.Update(value, timestamp);
StartWindow ??= timestamp;
EndWindow = timestamp;
}

protected override void Reset()
{
base.Reset();
StartWindow = null;
}

protected override IMeasurement CreateMeasurement()
=> new RateMeasurement(Value, StartWindow.HasValue && EndWindow.HasValue ? EndWindow.Value.Subtract(StartWindow.Value) : new TimeSpan(0));
}
15 changes: 15 additions & 0 deletions Streamistry.Core/Observability/ObservabilityProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,29 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability.Meters;

namespace Streamistry.Observability;
public sealed class ObservabilityProvider
{
private ITracer Tracer { get; set; } = new NullTracer();
private Dictionary<IObservablePipe, IMeter[]> Meters { get; } = [];

public ObservabilityProvider(ITracer tracer)
=> Tracer = tracer;

public void AttachMeters(IObservablePipe observable, IMeter[] meters)
{
if (Meters.TryGetValue(observable, out var value))
Meters[observable] = [.. value, .. meters];
else
Meters.Add(observable, meters);
}

public void DetachMeters(IObservablePipe observable)
=> Meters.Remove(observable);

public ITracer GetTracer() => Tracer;
public IMeter[] GetMeters(IObservablePipe observable)
=> Meters.TryGetValue(observable, out var meters) ? meters : [];
}
11 changes: 11 additions & 0 deletions Streamistry.Core/Observability/Thresholds/IThreshold.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability.Thresholds;
public interface IThreshold
{
bool Check(int count, DateTime timestamp);
}
Loading

0 comments on commit e373dd3

Please sign in to comment.