diff --git a/Streamistry.Core/Mapper.cs b/Streamistry.Core/Mapper.cs index aacef0d..5a3e5f4 100644 --- a/Streamistry.Core/Mapper.cs +++ b/Streamistry.Core/Mapper.cs @@ -25,6 +25,7 @@ public Mapper(IChainablePipe upstream, Func function) Function = function; } + [Meter] public void Emit(TInput? obj) => PushDownstream(Invoke(obj)); diff --git a/Streamistry.Core/Observability/BaseTracer.cs b/Streamistry.Core/Observability/BaseTracer.cs new file mode 100644 index 0000000..61deb87 --- /dev/null +++ b/Streamistry.Core/Observability/BaseTracer.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability; +public abstract class BaseTracer : ITracer +{ + public abstract IDisposable? StartActiveSpan(string spanName); + + protected class TracerScope : IDisposable + { + private string SpanName { get; } + public Stopwatch StopWatch { get; } + + public TracerScope(string spanName) + { + SpanName = spanName; + StopWatch = Stopwatch.StartNew(); + } + + public void Dispose() + { + Console.WriteLine($"Ending span '{SpanName}' in {StopWatch.ElapsedTicks} ticks"); + } + } +} diff --git a/Streamistry.Core/Observability/ConsolePublisher.cs b/Streamistry.Core/Observability/ConsolePublisher.cs new file mode 100644 index 0000000..ff00f74 --- /dev/null +++ b/Streamistry.Core/Observability/ConsolePublisher.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability.Measurements; + +namespace Streamistry.Observability; +public class ConsolePublisher +{ + public void Publish(IMeasurement measurement) + { + Console.WriteLine($"[{DateTime.UtcNow}] '{measurement}'"); + } +} diff --git a/Streamistry.Core/Observability/ConsoleTracer.cs b/Streamistry.Core/Observability/ConsoleTracer.cs index 27daddd..fa5bf74 100644 --- a/Streamistry.Core/Observability/ConsoleTracer.cs +++ b/Streamistry.Core/Observability/ConsoleTracer.cs @@ -6,29 +6,13 @@ using System.Threading.Tasks; namespace Streamistry.Observability; -public class ConsoleTracer : ITracer + +public class ConsoleTracer : BaseTracer { - public IDisposable? StartActiveSpan(string spanName) + public override IDisposable? StartActiveSpan(string spanName) { spanName = $"{spanName} [{Guid.NewGuid()}]"; Console.WriteLine($"Starting span '{spanName}'"); return new TracerScope(spanName); } - - private class TracerScope : IDisposable - { - private string SpanName { get; } - public Stopwatch StopWatch { get; } - - public TracerScope(string spanName) - { - SpanName = spanName; - StopWatch = Stopwatch.StartNew(); - } - - public void Dispose() - { - Console.WriteLine($"Ending span '{SpanName}' in {StopWatch.ElapsedTicks} ticks"); - } - } } diff --git a/Streamistry.Core/Observability/Measurements/CountMeasurement.cs b/Streamistry.Core/Observability/Measurements/CountMeasurement.cs new file mode 100644 index 0000000..bc36b8a --- /dev/null +++ b/Streamistry.Core/Observability/Measurements/CountMeasurement.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability.Measurements; +public readonly record struct CountMeasurement(int Value) : IMeasurement +{ + public override string ToString() + => $"{Value} batches"; +} diff --git a/Streamistry.Core/Observability/Measurements/HistogramMeasurement.cs b/Streamistry.Core/Observability/Measurements/HistogramMeasurement.cs new file mode 100644 index 0000000..63f8fbf --- /dev/null +++ b/Streamistry.Core/Observability/Measurements/HistogramMeasurement.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability.Measurements; +public readonly record struct HistogramMeasurement(KeyValuePair[] Buckets) : IMeasurement +{ + public override string ToString() + => $"[{string.Join(", ", BucketsToString(Buckets))}]"; + + private static IEnumerable BucketsToString(KeyValuePair[] buckets) + { + foreach (var bucket in buckets) + yield return $"{{{bucket.Key} => {bucket.Value}}}"; + } +} diff --git a/Streamistry.Core/Observability/Measurements/IMeasurement.cs b/Streamistry.Core/Observability/Measurements/IMeasurement.cs new file mode 100644 index 0000000..c7897ff --- /dev/null +++ b/Streamistry.Core/Observability/Measurements/IMeasurement.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability.Measurements; +public interface IMeasurement +{ +} diff --git a/Streamistry.Core/Observability/Measurements/RateMeasurement.cs b/Streamistry.Core/Observability/Measurements/RateMeasurement.cs new file mode 100644 index 0000000..dd07a10 --- /dev/null +++ b/Streamistry.Core/Observability/Measurements/RateMeasurement.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability.Measurements; +public readonly record struct RateMeasurement(int Value, TimeSpan window) : IMeasurement +{ + public override string ToString() + => $"{Math.Round(RatePerSecond, 3)} batch/sec"; + + public double RatePerSecond + => window.Ticks > 0 ? (Value * 1e7 / window.Ticks ) : double.NaN; +} diff --git a/Streamistry.Core/Observability/MeterAttribute.cs b/Streamistry.Core/Observability/MeterAttribute.cs new file mode 100644 index 0000000..9b04e2f --- /dev/null +++ b/Streamistry.Core/Observability/MeterAttribute.cs @@ -0,0 +1,30 @@ +using System.Diagnostics; +using System.Transactions; +using MethodBoundaryAspect.Fody.Attributes; +using Streamistry.Observability.Meters; + +namespace Streamistry.Observability; + +public sealed class MeterAttribute : OnMethodBoundaryAspect +{ + public override void OnEntry(MethodExecutionArgs args) + { + var observable = args.Instance as IObservablePipe ?? throw new InvalidOperationException(); + var meters = observable + ?.GetObservabilityProvider() + ?.GetMeters(observable); + + if (meters is not null && meters.Length>0) + { + var timestamp = DateTime.UtcNow; + foreach (var meter in meters) + meter.Append(args.Arguments[0], timestamp); + } + } + + public override void OnExit(MethodExecutionArgs args) + { } + + public override void OnException(MethodExecutionArgs args) + { } +} diff --git a/Streamistry.Core/Observability/Meters/BaseMeter.cs b/Streamistry.Core/Observability/Meters/BaseMeter.cs new file mode 100644 index 0000000..3f5c546 --- /dev/null +++ b/Streamistry.Core/Observability/Meters/BaseMeter.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability.Measurements; +using Streamistry.Observability.Thresholds; + +namespace Streamistry.Observability.Meters; +public abstract class BaseMeter : IMeter +{ + private Action? Publish { get; } + private IThreshold[] Thresholds { get; } + + protected BaseMeter(IThreshold[] thresholds, Action[] publishers) + { + if (thresholds is null || thresholds.Length == 0) + thresholds = [new MaxCardinality(1000)]; + Thresholds = thresholds; + + foreach (var publisher in publishers) + Publish += publisher; + } + + protected abstract int Count(); + protected abstract void Update(object value, DateTime timestamp); + public void Append(object value, DateTime timestamp) + { + Update(value, timestamp); + + foreach (var threshold in Thresholds) + { + if (threshold.Check(Count(), timestamp)) + { + Trigger(); + break; + } + } + } + + protected abstract void Reset(); + protected abstract IMeasurement CreateMeasurement(); + public void Trigger() + { + Publish?.Invoke(CreateMeasurement()); + Reset(); + } +} diff --git a/Streamistry.Core/Observability/Meters/Counter.cs b/Streamistry.Core/Observability/Meters/Counter.cs new file mode 100644 index 0000000..6415d62 --- /dev/null +++ b/Streamistry.Core/Observability/Meters/Counter.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability.Measurements; +using Streamistry.Observability.Thresholds; + +namespace Streamistry.Observability.Meters; +internal class Counter : BaseMeter +{ + protected int Value { get; set; } + + public Counter(IThreshold threshold, Action publisher) + : this([threshold], [publisher]) { } + + public Counter(IThreshold[] thresholds, Action[] publishers) + :base(thresholds, publishers) { } + + protected override void Update(object value, DateTime timestamp) + => Value += 1; + + protected override void Reset() + => Value = 0; + protected override int Count() + => Value; + protected override IMeasurement CreateMeasurement() + => new CountMeasurement(Value); +} diff --git a/Streamistry.Core/Observability/Meters/Histogram.cs b/Streamistry.Core/Observability/Meters/Histogram.cs new file mode 100644 index 0000000..f5a52c7 --- /dev/null +++ b/Streamistry.Core/Observability/Meters/Histogram.cs @@ -0,0 +1,45 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability.Measurements; +using Streamistry.Observability.Thresholds; + +namespace Streamistry.Observability.Meters; +internal class Histogram : BaseMeter +{ + protected Func Bucketizer { get; } + protected int CountItems { get; set; } + protected Dictionary Store { get; } = []; + + public Histogram(Func bucketizer, IThreshold threshold, Action publisher) + : this(bucketizer, [threshold], [publisher]) { } + + public Histogram(Func bucketizer, IThreshold[] thresholds, Action[] publishers) + : base(thresholds, publishers) + => Bucketizer = bucketizer; + + protected override int Count() + => CountItems; + + protected override void Update(object value, DateTime timestamp) + { + var bucket = Bucketizer.Invoke((TInput)value); + if (Store.TryGetValue(bucket, out var count)) + Store[bucket] = ++count; + else + Store.Add(bucket, 1); + CountItems += 1; + } + + protected override void Reset() + { + foreach (var bucket in Store.Keys) + Store[bucket] = 0; + CountItems = 0; + } + + protected override IMeasurement CreateMeasurement() + => new HistogramMeasurement([.. Store]); +} diff --git a/Streamistry.Core/Observability/Meters/IMeter.cs b/Streamistry.Core/Observability/Meters/IMeter.cs new file mode 100644 index 0000000..6a289dc --- /dev/null +++ b/Streamistry.Core/Observability/Meters/IMeter.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability.Meters; +public interface IMeter +{ + void Append(object value, DateTime timestamp); + void Trigger(); +} diff --git a/Streamistry.Core/Observability/Meters/RateMeter.cs b/Streamistry.Core/Observability/Meters/RateMeter.cs new file mode 100644 index 0000000..5b69b89 --- /dev/null +++ b/Streamistry.Core/Observability/Meters/RateMeter.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Streamistry.Observability.Measurements; +using Streamistry.Observability.Thresholds; + +namespace Streamistry.Observability.Meters; +internal class RateMeter : Counter +{ + private DateTime? StartWindow { get; set; } + private DateTime? EndWindow { get; set; } + + public RateMeter(IThreshold threshold, Action publisher) + : this([threshold], [publisher]) { } + + public RateMeter(IThreshold[] thresholds, Action[] publishers) + : base(thresholds, publishers) { } + + protected override void Update(object value, DateTime timestamp) + { + base.Update(value, timestamp); + StartWindow ??= timestamp; + EndWindow = timestamp; + } + + protected override void Reset() + { + base.Reset(); + StartWindow = null; + } + + protected override IMeasurement CreateMeasurement() + => new RateMeasurement(Value, StartWindow.HasValue && EndWindow.HasValue ? EndWindow.Value.Subtract(StartWindow.Value) : new TimeSpan(0)); +} diff --git a/Streamistry.Core/Observability/ObservabilityProvider.cs b/Streamistry.Core/Observability/ObservabilityProvider.cs index afaad73..fa6f69e 100644 --- a/Streamistry.Core/Observability/ObservabilityProvider.cs +++ b/Streamistry.Core/Observability/ObservabilityProvider.cs @@ -3,14 +3,29 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Streamistry.Observability.Meters; namespace Streamistry.Observability; public sealed class ObservabilityProvider { private ITracer Tracer { get; set; } = new NullTracer(); + private Dictionary Meters { get; } = []; public ObservabilityProvider(ITracer tracer) => Tracer = tracer; + public void AttachMeters(IObservablePipe observable, IMeter[] meters) + { + if (Meters.TryGetValue(observable, out var value)) + Meters[observable] = [.. value, .. meters]; + else + Meters.Add(observable, meters); + } + + public void DetachMeters(IObservablePipe observable) + => Meters.Remove(observable); + public ITracer GetTracer() => Tracer; + public IMeter[] GetMeters(IObservablePipe observable) + => Meters.TryGetValue(observable, out var meters) ? meters : []; } diff --git a/Streamistry.Core/Observability/Thresholds/IThreshold.cs b/Streamistry.Core/Observability/Thresholds/IThreshold.cs new file mode 100644 index 0000000..50ceffb --- /dev/null +++ b/Streamistry.Core/Observability/Thresholds/IThreshold.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability.Thresholds; +public interface IThreshold +{ + bool Check(int count, DateTime timestamp); +} diff --git a/Streamistry.Core/Observability/Thresholds/MaxCardinality.cs b/Streamistry.Core/Observability/Thresholds/MaxCardinality.cs new file mode 100644 index 0000000..486adfd --- /dev/null +++ b/Streamistry.Core/Observability/Thresholds/MaxCardinality.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Observability.Thresholds; +internal readonly struct MaxCardinality(int Value) : IThreshold +{ + public bool Check(int count, DateTime timestamp) + => count >= Value; +} diff --git a/Streamistry.Core/Observability/TraceAttribute.cs b/Streamistry.Core/Observability/TraceAttribute.cs index 6609f05..a1b5f20 100644 --- a/Streamistry.Core/Observability/TraceAttribute.cs +++ b/Streamistry.Core/Observability/TraceAttribute.cs @@ -8,7 +8,10 @@ public sealed class TraceAttribute : OnMethodBoundaryAspect { public override void OnEntry(MethodExecutionArgs args) { - args.MethodExecutionTag = (args.Instance as IObservablePipe)?.GetObservabilityProvider()?.GetTracer().StartActiveSpan(args.Instance.GetType().Name.Split('`')[0]); + args.MethodExecutionTag = (args.Instance as IObservablePipe) + ?.GetObservabilityProvider() + ?.GetTracer() + .StartActiveSpan(args.Instance.GetType().Name.Split('`')[0]); } public override void OnExit(MethodExecutionArgs args) diff --git a/Streamistry.Testing/Observability/MeterTests.cs b/Streamistry.Testing/Observability/MeterTests.cs new file mode 100644 index 0000000..d995b83 --- /dev/null +++ b/Streamistry.Testing/Observability/MeterTests.cs @@ -0,0 +1,103 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using NUnit.Framework; +using Streamistry.Observability; +using Streamistry.Observability.Meters; +using Streamistry.Observability.Thresholds; +using Streamistry.Pipes.Sinks; +using Streamistry.Pipes.Sources; + +namespace Streamistry.Testing.Observability; +public class MeterTests +{ + private class ConsoleOutput : IDisposable + { + private readonly StringWriter stringWriter = new(); + private readonly TextWriter originalOutput = Console.Out; + + public ConsoleOutput() + => Console.SetOut(stringWriter); + + public string GetOuput() + => stringWriter.ToString(); + + public int CountSubstring(string value) + { + var text = GetOuput(); + int count = 0, minIndex = text.IndexOf(value, 0); + while (minIndex != -1) + { + minIndex = text.IndexOf(value, minIndex + value.Length); + count++; + } + return count; + } + + public void Dispose() + { + Console.SetOut(originalOutput); + stringWriter.Dispose(); + } + } + + [Test] + public void Counter_Mapper_ReturnCount() + { + var observability = new ObservabilityProvider(new NullTracer()); + var publisher = new ConsolePublisher().Publish; + + var source = new EnumerableSource(Enumerable.Range(1, 30), observability); + var mapper = new Mapper(source, x => ++x); + var sink = new MemorySink(mapper); + + var counter = new Counter(new MaxCardinality(10), publisher); + observability.AttachMeters(mapper, [counter]); + + using var output = new ConsoleOutput(); + source.Start(); + + Assert.That(output.CountSubstring("10 batches"), Is.EqualTo(3)); + } + + [Test] + public void Rate_Mapper_ReturnRate() + { + var observability = new ObservabilityProvider(new NullTracer()); + var publisher = new ConsolePublisher().Publish; + + var source = new EnumerableSource(Enumerable.Range(1, 30), observability); + var mapper = new Mapper(source, x => ++x); + var sink = new MemorySink(mapper); + + var rateMeter = new RateMeter(new MaxCardinality(10), publisher); + observability.AttachMeters(mapper, [rateMeter]); + + using var output = new ConsoleOutput(); + source.Start(); + + Assert.That(output.CountSubstring("batch/sec"), Is.EqualTo(3)); + } + + [Test] + public void Histogram_Mapper_ReturnRate() + { + var observability = new ObservabilityProvider(new NullTracer()); + var publisher = new ConsolePublisher().Publish; + + var source = new EnumerableSource(Enumerable.Range(1, 30), observability); + var mapper = new Mapper(source, x => ++x); + var sink = new MemorySink(mapper); + + static string bucketizer(int x) => x % 2 == 0 ? "even" : "odd"; + var histogram = new Histogram(bucketizer, new MaxCardinality(10), publisher); + observability.AttachMeters(mapper, [histogram]); + + using var output = new ConsoleOutput(); + source.Start(); + + Assert.That(output.CountSubstring("[{odd => 5}, {even => 5}]"), Is.EqualTo(3)); + } +}