diff --git a/Streamistry.Core/Aggregator.cs b/Streamistry.Core/Aggregator.cs index cb4274e..802134c 100644 --- a/Streamistry.Core/Aggregator.cs +++ b/Streamistry.Core/Aggregator.cs @@ -3,6 +3,8 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Streamistry.Telemetry; +using static System.Runtime.InteropServices.JavaScript.JSType; namespace Streamistry; @@ -26,9 +28,12 @@ public Aggregator(IChainablePipe upstream, Func PushDownstream(Invoke(obj)); + + [Telemetry] + protected TResult? Invoke(TSource? obj) { State = Accumulator.Invoke(State, obj); - var result = Selector.Invoke(State); - PushDownstream(result); + return Selector.Invoke(State); } } diff --git a/Streamistry.Core/ChainablePipe.cs b/Streamistry.Core/ChainablePipe.cs index d7de423..d4d82fa 100644 --- a/Streamistry.Core/ChainablePipe.cs +++ b/Streamistry.Core/ChainablePipe.cs @@ -12,6 +12,6 @@ public abstract class ChainablePipe : IChainablePipe public void RegisterDownstream(Action action) => Downstream += action; - public void PushDownstream(T? obj) + protected virtual void PushDownstream(T? obj) => Downstream?.Invoke(obj); } diff --git a/Streamistry.Core/Combinator.cs b/Streamistry.Core/Combinator.cs index 866fb10..d7e4906 100644 --- a/Streamistry.Core/Combinator.cs +++ b/Streamistry.Core/Combinator.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Streamistry.Telemetry; namespace Streamistry; @@ -28,7 +29,7 @@ public Combinator(IChainablePipe firstUpstream, IChainablePipe public void EmitFirst(TFirst? first) { if (TryGetElement(out var second)) - PushDownstream(Function.Invoke(first, second)); + PushDownstream(Invoke(first, second)); else Queue(first); } @@ -36,11 +37,15 @@ public void EmitFirst(TFirst? first) public void EmitSecond(TSecond? second) { if (TryGetElement(out var first)) - PushDownstream(Function.Invoke(first, second)); + PushDownstream(Invoke(first, second)); else Queue(second); } + [Telemetry] + protected TResult? Invoke(TFirst? first, TSecond? second) + => Function.Invoke(first, second); + protected abstract bool TryGetElement(out T? value); protected abstract void Queue(T value); } diff --git a/Streamistry.Core/Filter.cs b/Streamistry.Core/Filter.cs index bad727f..5811d33 100644 --- a/Streamistry.Core/Filter.cs +++ b/Streamistry.Core/Filter.cs @@ -3,6 +3,8 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Streamistry.Telemetry; +using static System.Runtime.InteropServices.JavaScript.JSType; namespace Streamistry; @@ -23,7 +25,11 @@ public Filter(IChainablePipe upstream, Func predicate) public void Emit(TInput? obj) { - if (Predicate.Invoke(obj)) + if (Invoke(obj)) PushDownstream(obj); } + + [Telemetry] + protected bool Invoke(TInput? input) + => Predicate.Invoke(input); } diff --git a/Streamistry.Core/FodyWeavers.xml b/Streamistry.Core/FodyWeavers.xml new file mode 100644 index 0000000..f217bdf --- /dev/null +++ b/Streamistry.Core/FodyWeavers.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/Streamistry.Core/FodyWeavers.xsd b/Streamistry.Core/FodyWeavers.xsd new file mode 100644 index 0000000..f421e79 --- /dev/null +++ b/Streamistry.Core/FodyWeavers.xsd @@ -0,0 +1,26 @@ + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/Streamistry.Core/Mapper.cs b/Streamistry.Core/Mapper.cs index 5d3e717..67358f4 100644 --- a/Streamistry.Core/Mapper.cs +++ b/Streamistry.Core/Mapper.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Streamistry.Telemetry; namespace Streamistry; @@ -24,8 +25,9 @@ public Mapper(IChainablePipe upstream, Func function) } public void Emit(TInput? obj) - { - var result = Function.Invoke(obj); - PushDownstream(result); - } + => PushDownstream(Invoke(obj)); + + [Telemetry] + protected TOutput? Invoke(TInput? obj) + => Function.Invoke(obj); } diff --git a/Streamistry.Core/Pipes/Sinks/DebugOutputSink.cs b/Streamistry.Core/Pipes/Sinks/DebugOutputSink.cs index 1d91d00..3adda0a 100644 --- a/Streamistry.Core/Pipes/Sinks/DebugOutputSink.cs +++ b/Streamistry.Core/Pipes/Sinks/DebugOutputSink.cs @@ -14,5 +14,8 @@ public DebugOutputSink(IChainablePipe upstream) { } public static void Process(T? obj) - => Console.WriteLine(obj); + { + Console.Write(">>> "); + Console.WriteLine(obj); + } } diff --git a/Streamistry.Core/Sink.cs b/Streamistry.Core/Sink.cs index d973f4c..0568e30 100644 --- a/Streamistry.Core/Sink.cs +++ b/Streamistry.Core/Sink.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Streamistry.Telemetry; namespace Streamistry; public class Sink : IProcessablePipe @@ -16,5 +17,9 @@ public Sink(IChainablePipe upstream, Action function) } public void Emit(T? obj) + => Invoke(obj); + + [Telemetry] + protected void Invoke(T? obj) => Function.Invoke(obj); } diff --git a/Streamistry.Core/Splitter.cs b/Streamistry.Core/Splitter.cs index 185b6a5..84c3b15 100644 --- a/Streamistry.Core/Splitter.cs +++ b/Streamistry.Core/Splitter.cs @@ -3,6 +3,7 @@ using System.Linq; using System.Text; using System.Threading.Tasks; +using Streamistry.Telemetry; namespace Streamistry; @@ -24,11 +25,15 @@ public Splitter(IChainablePipe upstream, Func funct public void Emit(TInput? obj) { - var results = Function.Invoke(obj); + var results = Invoke(obj); if (results is null) PushDownstream(default); else foreach (var result in results) PushDownstream(result); } + + [Telemetry] + protected TOutput[]? Invoke(TInput? obj) + => Function.Invoke(obj); } diff --git a/Streamistry.Core/Streamistry.csproj b/Streamistry.Core/Streamistry.csproj index eff5645..51dcd55 100644 --- a/Streamistry.Core/Streamistry.csproj +++ b/Streamistry.Core/Streamistry.csproj @@ -12,6 +12,11 @@ true + + + + + diff --git a/Streamistry.Core/Telemetry/ConsoleTracer.cs b/Streamistry.Core/Telemetry/ConsoleTracer.cs new file mode 100644 index 0000000..e95955f --- /dev/null +++ b/Streamistry.Core/Telemetry/ConsoleTracer.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Telemetry; +public class ConsoleTracer : ITracer +{ + public IDisposable? StartActiveSpan(string spanName) + { + spanName = $"{spanName} [{Guid.NewGuid()}]"; + Console.WriteLine($"Starting span '{spanName}'"); + return new TracerScope(spanName); + } + + private class TracerScope : IDisposable + { + private string SpanName { get; } + public Stopwatch StopWatch { get; } + + public TracerScope(string spanName) + { + SpanName = spanName; + StopWatch = Stopwatch.StartNew(); + } + + public void Dispose() + { + Console.WriteLine($"Ending span '{SpanName}' in {StopWatch.ElapsedTicks} ticks"); + } + } +} diff --git a/Streamistry.Core/Telemetry/ITracer.cs b/Streamistry.Core/Telemetry/ITracer.cs new file mode 100644 index 0000000..3cae068 --- /dev/null +++ b/Streamistry.Core/Telemetry/ITracer.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Telemetry; +public interface ITracer +{ + IDisposable? StartActiveSpan(string spanName); +} diff --git a/Streamistry.Core/Telemetry/NullTracer.cs b/Streamistry.Core/Telemetry/NullTracer.cs new file mode 100644 index 0000000..7c0fb94 --- /dev/null +++ b/Streamistry.Core/Telemetry/NullTracer.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Telemetry; +public class NullTracer : ITracer +{ + public IDisposable? StartActiveSpan(string spanName) + => null; +} diff --git a/Streamistry.Core/Telemetry/TelemetryAttribute.cs b/Streamistry.Core/Telemetry/TelemetryAttribute.cs new file mode 100644 index 0000000..dc0354d --- /dev/null +++ b/Streamistry.Core/Telemetry/TelemetryAttribute.cs @@ -0,0 +1,25 @@ +using System.Diagnostics; +using System.Transactions; +using MethodBoundaryAspect.Fody.Attributes; + +namespace Streamistry.Telemetry; + +public sealed class TelemetryAttribute : OnMethodBoundaryAspect +{ + public override void OnEntry(MethodExecutionArgs args) + { + args.MethodExecutionTag = TelemetryProvider.GetTracer().StartActiveSpan(args.Instance.GetType().Name.Split('`')[0]); + } + + public override void OnExit(MethodExecutionArgs args) + { + var span = (IDisposable?)args.MethodExecutionTag; + span?.Dispose(); + } + + public override void OnException(MethodExecutionArgs args) + { + var span = (IDisposable?)args.MethodExecutionTag; + span?.Dispose(); + } +} diff --git a/Streamistry.Core/Telemetry/TelemetryProvider.cs b/Streamistry.Core/Telemetry/TelemetryProvider.cs new file mode 100644 index 0000000..7f0e45b --- /dev/null +++ b/Streamistry.Core/Telemetry/TelemetryProvider.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Streamistry.Telemetry; +public sealed class TelemetryProvider +{ + private static ITracer Tracer { get; set; } = new NullTracer(); + + public TelemetryProvider(ITracer tracer) + => Tracer = tracer; + + public static ITracer GetTracer() => Tracer; +} diff --git a/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs b/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs index 3e26273..0091aca 100644 --- a/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs +++ b/Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs @@ -38,7 +38,7 @@ public void Emit_DisplayOneElement_Successful() var sink = new DebugOutputSink(pipeline); sink.Emit(0); - Assert.That(output.GetOuput(), Is.EqualTo("0\r\n")); + Assert.That(output.GetOuput(), Is.EqualTo(">>> 0\r\n")); } [Test] @@ -52,6 +52,6 @@ public void Emit_DisplayThreeElements_Successful() sink.Emit("World"); sink.Emit("!"); - Assert.That(output.GetOuput(), Is.EqualTo("Hello\r\nWorld\r\n!\r\n")); + Assert.That(output.GetOuput(), Is.EqualTo(">>> Hello\r\n>>> World\r\n>>> !\r\n")); } } diff --git a/Streamistry.Testing/Streamistry.Testing.csproj b/Streamistry.Testing/Streamistry.Testing.csproj index 410dd8b..134e139 100644 --- a/Streamistry.Testing/Streamistry.Testing.csproj +++ b/Streamistry.Testing/Streamistry.Testing.csproj @@ -1,7 +1,7 @@ - + - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -19,7 +19,6 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - diff --git a/Streamistry.Testing/Telemetry/ConsoleTracerTests.cs b/Streamistry.Testing/Telemetry/ConsoleTracerTests.cs new file mode 100644 index 0000000..6d5a87e --- /dev/null +++ b/Streamistry.Testing/Telemetry/ConsoleTracerTests.cs @@ -0,0 +1,51 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Microsoft.VisualStudio.TestPlatform.Utilities; +using NUnit.Framework; +using Streamistry.Pipes.Sinks; +using Streamistry.Telemetry; + +namespace Streamistry.Testing.Telemetry; +public class ConsoleTracerTests +{ + private class ConsoleOutput : IDisposable + { + private readonly StringWriter stringWriter = new(); + private readonly TextWriter originalOutput = Console.Out; + + public ConsoleOutput() + => Console.SetOut(stringWriter); + + public string GetOuput() + => stringWriter.ToString(); + + public void Dispose() + { + Console.SetOut(originalOutput); + stringWriter.Dispose(); + } + } + + [Test] + public void Set_ConsoleTracer_Used() + { + _ = new TelemetryProvider(new ConsoleTracer()); + + using var output = new ConsoleOutput(); + var pipeline = new Pipeline(); + var mapper = new Mapper(pipeline, x => ++x); + var sink = new MemorySink(mapper); + mapper.Emit(0); + + Assert.That(sink.State.First(), Is.EqualTo(1)); Assert.Multiple(() => + { + Assert.That(output.GetOuput(), Does.StartWith("Starting span 'Mapper ")); + + Assert.That(output.GetOuput(), Does.Contain("Ending span 'Mapper ")); + Assert.That(output.GetOuput(), Does.EndWith("ticks\r\n")); + }); + } +}