Skip to content

Commit

Permalink
feat: implement basic support for telemetry (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Aug 23, 2024
1 parent 77e1cc2 commit f3cc1e6
Show file tree
Hide file tree
Showing 19 changed files with 233 additions and 18 deletions.
9 changes: 7 additions & 2 deletions Streamistry.Core/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace Streamistry;

Expand All @@ -26,9 +28,12 @@ public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?,
}

public void Emit(TSource? obj)
=> PushDownstream(Invoke(obj));

[Telemetry]
protected TResult? Invoke(TSource? obj)
{
State = Accumulator.Invoke(State, obj);
var result = Selector.Invoke(State);
PushDownstream(result);
return Selector.Invoke(State);
}
}
2 changes: 1 addition & 1 deletion Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ public abstract class ChainablePipe<T> : IChainablePipe<T>
public void RegisterDownstream(Action<T?> action)
=> Downstream += action;

public void PushDownstream(T? obj)
protected virtual void PushDownstream(T? obj)
=> Downstream?.Invoke(obj);
}
9 changes: 7 additions & 2 deletions Streamistry.Core/Combinator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;

namespace Streamistry;

Expand All @@ -28,19 +29,23 @@ public Combinator(IChainablePipe<TFirst> firstUpstream, IChainablePipe<TSecond>
public void EmitFirst(TFirst? first)
{
if (TryGetElement<TSecond>(out var second))
PushDownstream(Function.Invoke(first, second));
PushDownstream(Invoke(first, second));
else
Queue(first);
}

public void EmitSecond(TSecond? second)
{
if (TryGetElement<TFirst>(out var first))
PushDownstream(Function.Invoke(first, second));
PushDownstream(Invoke(first, second));
else
Queue(second);
}

[Telemetry]
protected TResult? Invoke(TFirst? first, TSecond? second)
=> Function.Invoke(first, second);

protected abstract bool TryGetElement<T>(out T? value);
protected abstract void Queue<T>(T value);
}
8 changes: 7 additions & 1 deletion Streamistry.Core/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace Streamistry;

Expand All @@ -23,7 +25,11 @@ public Filter(IChainablePipe<TInput> upstream, Func<TInput?, bool> predicate)

public void Emit(TInput? obj)
{
if (Predicate.Invoke(obj))
if (Invoke(obj))
PushDownstream(obj);
}

[Telemetry]
protected bool Invoke(TInput? input)
=> Predicate.Invoke(input);
}
4 changes: 4 additions & 0 deletions Streamistry.Core/FodyWeavers.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<Weavers xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="FodyWeavers.xsd">
<MethodBoundaryAspect />
</Weavers>
26 changes: 26 additions & 0 deletions Streamistry.Core/FodyWeavers.xsd
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="utf-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- This file was generated by Fody. Manual changes to this file will be lost when your project is rebuilt. -->
<xs:element name="Weavers">
<xs:complexType>
<xs:all>
<xs:element name="MethodBoundaryAspect" minOccurs="0" maxOccurs="1" type="xs:anyType" />
</xs:all>
<xs:attribute name="VerifyAssembly" type="xs:boolean">
<xs:annotation>
<xs:documentation>'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="VerifyIgnoreCodes" type="xs:string">
<xs:annotation>
<xs:documentation>A comma-separated list of error codes that can be safely ignored in assembly verification.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="GenerateXsd" type="xs:boolean">
<xs:annotation>
<xs:documentation>'false' to turn off automatic generation of the XML Schema file.</xs:documentation>
</xs:annotation>
</xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
10 changes: 6 additions & 4 deletions Streamistry.Core/Mapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;

namespace Streamistry;

Expand All @@ -24,8 +25,9 @@ public Mapper(IChainablePipe<TInput> upstream, Func<TInput?, TOutput?> function)
}

public void Emit(TInput? obj)
{
var result = Function.Invoke(obj);
PushDownstream(result);
}
=> PushDownstream(Invoke(obj));

[Telemetry]
protected TOutput? Invoke(TInput? obj)
=> Function.Invoke(obj);
}
5 changes: 4 additions & 1 deletion Streamistry.Core/Pipes/Sinks/DebugOutputSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@ public DebugOutputSink(IChainablePipe<T> upstream)
{ }

public static void Process(T? obj)
=> Console.WriteLine(obj);
{
Console.Write(">>> ");
Console.WriteLine(obj);
}
}
5 changes: 5 additions & 0 deletions Streamistry.Core/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;

namespace Streamistry;
public class Sink<T> : IProcessablePipe<T>
Expand All @@ -16,5 +17,9 @@ public Sink(IChainablePipe<T> upstream, Action<T?> function)
}

public void Emit(T? obj)
=> Invoke(obj);

[Telemetry]
protected void Invoke(T? obj)
=> Function.Invoke(obj);
}
7 changes: 6 additions & 1 deletion Streamistry.Core/Splitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;

namespace Streamistry;

Expand All @@ -24,11 +25,15 @@ public Splitter(IChainablePipe<TInput> upstream, Func<TInput?, TOutput[]?> funct

public void Emit(TInput? obj)
{
var results = Function.Invoke(obj);
var results = Invoke(obj);
if (results is null)
PushDownstream(default);
else
foreach (var result in results)
PushDownstream(result);
}

[Telemetry]
protected TOutput[]? Invoke(TInput? obj)
=> Function.Invoke(obj);
}
5 changes: 5 additions & 0 deletions Streamistry.Core/Streamistry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MethodBoundaryAspect.Fody" Version="2.0.149" />
<PackageReference Include="Mono.Cecil" Version="0.11.5" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.FileSystemGlobbing" Version="8.0.0" />
</ItemGroup>
Expand Down
34 changes: 34 additions & 0 deletions Streamistry.Core/Telemetry/ConsoleTracer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Telemetry;
public class ConsoleTracer : ITracer
{
public IDisposable? StartActiveSpan(string spanName)
{
spanName = $"{spanName} [{Guid.NewGuid()}]";
Console.WriteLine($"Starting span '{spanName}'");
return new TracerScope(spanName);
}

private class TracerScope : IDisposable
{
private string SpanName { get; }
public Stopwatch StopWatch { get; }

public TracerScope(string spanName)
{
SpanName = spanName;
StopWatch = Stopwatch.StartNew();
}

public void Dispose()
{
Console.WriteLine($"Ending span '{SpanName}' in {StopWatch.ElapsedTicks} ticks");
}
}
}
11 changes: 11 additions & 0 deletions Streamistry.Core/Telemetry/ITracer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Telemetry;
public interface ITracer
{
IDisposable? StartActiveSpan(string spanName);
}
13 changes: 13 additions & 0 deletions Streamistry.Core/Telemetry/NullTracer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Telemetry;
public class NullTracer : ITracer
{
public IDisposable? StartActiveSpan(string spanName)
=> null;
}
25 changes: 25 additions & 0 deletions Streamistry.Core/Telemetry/TelemetryAttribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System.Diagnostics;
using System.Transactions;
using MethodBoundaryAspect.Fody.Attributes;

namespace Streamistry.Telemetry;

public sealed class TelemetryAttribute : OnMethodBoundaryAspect
{
public override void OnEntry(MethodExecutionArgs args)
{
args.MethodExecutionTag = TelemetryProvider.GetTracer().StartActiveSpan(args.Instance.GetType().Name.Split('`')[0]);
}

public override void OnExit(MethodExecutionArgs args)
{
var span = (IDisposable?)args.MethodExecutionTag;
span?.Dispose();
}

public override void OnException(MethodExecutionArgs args)
{
var span = (IDisposable?)args.MethodExecutionTag;
span?.Dispose();
}
}
16 changes: 16 additions & 0 deletions Streamistry.Core/Telemetry/TelemetryProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Telemetry;
public sealed class TelemetryProvider
{
private static ITracer Tracer { get; set; } = new NullTracer();

public TelemetryProvider(ITracer tracer)
=> Tracer = tracer;

public static ITracer GetTracer() => Tracer;
}
4 changes: 2 additions & 2 deletions Streamistry.Testing/Pipes/Sinks/DebugOutputSinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void Emit_DisplayOneElement_Successful()
var sink = new DebugOutputSink<int>(pipeline);
sink.Emit(0);

Assert.That(output.GetOuput(), Is.EqualTo("0\r\n"));
Assert.That(output.GetOuput(), Is.EqualTo(">>> 0\r\n"));
}

[Test]
Expand All @@ -52,6 +52,6 @@ public void Emit_DisplayThreeElements_Successful()
sink.Emit("World");
sink.Emit("!");

Assert.That(output.GetOuput(), Is.EqualTo("Hello\r\nWorld\r\n!\r\n"));
Assert.That(output.GetOuput(), Is.EqualTo(">>> Hello\r\n>>> World\r\n>>> !\r\n"));
}
}
7 changes: 3 additions & 4 deletions Streamistry.Testing/Streamistry.Testing.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="NUnit" Version="4.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.0" />
<PackageReference Include="NUnit" Version="4.2.0" />
<PackageReference Include="NUnit.Analyzers" Version="4.3.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand All @@ -19,7 +19,6 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Vertica.Data" Version="24.2.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Streamistry\Streamistry.Core\Streamistry.csproj" />
Expand Down
Loading

0 comments on commit f3cc1e6

Please sign in to comment.