Skip to content

Commit

Permalink
fix: handle multiple ObservabilityProvider at the same time (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Aug 24, 2024
1 parent f3cc1e6 commit 54f8b8c
Show file tree
Hide file tree
Showing 21 changed files with 107 additions and 47 deletions.
7 changes: 4 additions & 3 deletions Streamistry.Core/Aggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using Streamistry.Observability;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace Streamistry;
Expand All @@ -21,7 +21,8 @@ public class Aggregator<TSource, TAccumulate, TResult> : ChainablePipe<TResult>,
public Func<TAccumulate?, TResult?> Selector { get; }
private TAccumulate? State { get; set; }

public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?,TResult?> selector, TAccumulate? seed = default)
public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?, TAccumulate?> accumulator, Func<TAccumulate?, TResult?> selector, TAccumulate? seed = default)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
(Accumulator, Selector, State) = (accumulator, selector, seed);
Expand All @@ -30,7 +31,7 @@ public Aggregator(IChainablePipe<TSource> upstream, Func<TAccumulate?, TSource?,
public void Emit(TSource? obj)
=> PushDownstream(Invoke(obj));

[Telemetry]
[Trace]
protected TResult? Invoke(TSource? obj)
{
State = Accumulator.Invoke(State, obj);
Expand Down
14 changes: 13 additions & 1 deletion Streamistry.Core/ChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,27 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Mono.Cecil;
using Streamistry.Observability;

namespace Streamistry;
public abstract class ChainablePipe<T> : IChainablePipe<T>
public abstract class ChainablePipe<T> : IChainablePipe<T>, IObservablePipe
{
private Action<T?>? Downstream { get; set; }
protected ObservabilityProvider? Observability { get; private set; }

public void RegisterDownstream(Action<T?> action)
=> Downstream += action;

protected ChainablePipe(ObservabilityProvider? observability)
=> RegisterObservability(observability);

public void RegisterObservability(ObservabilityProvider? observability)
=> Observability = observability;

public ObservabilityProvider? GetObservabilityProvider()
=> Observability;

protected virtual void PushDownstream(T? obj)
=> Downstream?.Invoke(obj);
}
6 changes: 4 additions & 2 deletions Streamistry.Core/Combinator.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using Streamistry.Observability;

namespace Streamistry;

Expand All @@ -19,6 +20,7 @@ public abstract class Combinator<TFirst, TSecond, TResult> : ChainablePipe<TResu
public Func<TFirst?, TSecond?, TResult?> Function { get; init; }

public Combinator(IChainablePipe<TFirst> firstUpstream, IChainablePipe<TSecond> secondUpstream, Func<TFirst?, TSecond?, TResult?> function)
: base(firstUpstream.GetObservabilityProvider())
{
firstUpstream.RegisterDownstream(EmitFirst);
secondUpstream.RegisterDownstream(EmitSecond);
Expand All @@ -42,7 +44,7 @@ public void EmitSecond(TSecond? second)
Queue(second);
}

[Telemetry]
[Trace]
protected TResult? Invoke(TFirst? first, TSecond? second)
=> Function.Invoke(first, second);

Expand Down
5 changes: 3 additions & 2 deletions Streamistry.Core/Filter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using Streamistry.Observability;
using static System.Runtime.InteropServices.JavaScript.JSType;

namespace Streamistry;
Expand All @@ -18,6 +18,7 @@ public class Filter<TInput> : ChainablePipe<TInput>, IProcessablePipe<TInput>
public Func<TInput?, bool> Predicate { get; init; }

public Filter(IChainablePipe<TInput> upstream, Func<TInput?, bool> predicate)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
Predicate = predicate;
Expand All @@ -29,7 +30,7 @@ public void Emit(TInput? obj)
PushDownstream(obj);
}

[Telemetry]
[Trace]
protected bool Invoke(TInput? input)
=> Predicate.Invoke(input);
}
2 changes: 1 addition & 1 deletion Streamistry.Core/IChainablePipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Threading.Tasks;

namespace Streamistry;
public interface IChainablePipe<T>
public interface IChainablePipe<T> : IObservablePipe
{
void RegisterDownstream(Action<T?> action);
}
13 changes: 13 additions & 0 deletions Streamistry.Core/IObservablePipe.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;
public interface IObservablePipe
{
void RegisterObservability(ObservabilityProvider? provider);
ObservabilityProvider? GetObservabilityProvider();
}
5 changes: 3 additions & 2 deletions Streamistry.Core/Mapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using Streamistry.Observability;

namespace Streamistry;

Expand All @@ -19,6 +19,7 @@ public class Mapper<TInput, TOutput> : ChainablePipe<TOutput>, IProcessablePipe<
public Func<TInput?, TOutput?> Function { get; init; }

public Mapper(IChainablePipe<TInput> upstream, Func<TInput?, TOutput?> function)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
Function = function;
Expand All @@ -27,7 +28,7 @@ public Mapper(IChainablePipe<TInput> upstream, Func<TInput?, TOutput?> function)
public void Emit(TInput? obj)
=> PushDownstream(Invoke(obj));

[Telemetry]
[Trace]
protected TOutput? Invoke(TInput? obj)
=> Function.Invoke(obj);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Telemetry;
namespace Streamistry.Observability;
public class ConsoleTracer : ITracer
{
public IDisposable? StartActiveSpan(string spanName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Telemetry;
namespace Streamistry.Observability;
public interface ITracer
{
IDisposable? StartActiveSpan(string spanName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Telemetry;
namespace Streamistry.Observability;
public class NullTracer : ITracer
{
public IDisposable? StartActiveSpan(string spanName)
Expand Down
16 changes: 16 additions & 0 deletions Streamistry.Core/Observability/ObservabilityProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Observability;
public sealed class ObservabilityProvider
{
private ITracer Tracer { get; set; } = new NullTracer();

public ObservabilityProvider(ITracer tracer)
=> Tracer = tracer;

public ITracer GetTracer() => Tracer;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
using System.Transactions;
using MethodBoundaryAspect.Fody.Attributes;

namespace Streamistry.Telemetry;
namespace Streamistry.Observability;

public sealed class TelemetryAttribute : OnMethodBoundaryAspect
public sealed class TraceAttribute : OnMethodBoundaryAspect
{
public override void OnEntry(MethodExecutionArgs args)
{
args.MethodExecutionTag = TelemetryProvider.GetTracer().StartActiveSpan(args.Instance.GetType().Name.Split('`')[0]);
args.MethodExecutionTag = (args.Instance as IObservablePipe)?.GetObservabilityProvider()?.GetTracer().StartActiveSpan(args.Instance.GetType().Name.Split('`')[0]);
}

public override void OnExit(MethodExecutionArgs args)
Expand Down
5 changes: 5 additions & 0 deletions Streamistry.Core/Pipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;

public class Pipeline<T> : ChainablePipe<T>, IProcessablePipe<T>
{
public void Emit(T? obj)
=> PushDownstream(obj);

public Pipeline(ObservabilityProvider? provider = null)
: base(provider)
=> RegisterObservability(provider);
}
9 changes: 7 additions & 2 deletions Streamistry.Core/Pipes/Pipelines/SourcePipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry.Pipes.Pipelines;
public class SourcePipeline<TOutput> : Pipeline<TOutput>
{
private Source<TOutput> Source { get; }
public SourcePipeline(Source<TOutput> source)
=> Source = source;
public SourcePipeline(Source<TOutput> source, ObservabilityProvider? provider = null)
: base(provider)
{
Source = source;
Source.RegisterObservability(provider);
}

public void Start()
=> Source.Start();
Expand Down
4 changes: 3 additions & 1 deletion Streamistry.Core/Pipes/Sources/EnumerableSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry.Pipes.Sources;
public class EnumerableSource<TOutput> : Source<TOutput>
{
private IEnumerator<TOutput> Enumerator { get; set; }

public EnumerableSource(IEnumerable<TOutput> values)
public EnumerableSource(IEnumerable<TOutput> values, ObservabilityProvider? provider = null)
: base(provider)
=> Enumerator = values.GetEnumerator();

protected override bool TryReadNext(out TOutput? item)
Expand Down
4 changes: 3 additions & 1 deletion Streamistry.Core/Pipes/Sources/GlobbingSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.FileSystemGlobbing;
using Microsoft.Extensions.FileSystemGlobbing.Abstractions;
using Streamistry.Observability;
using static System.Reflection.Metadata.BlobBuilder;

namespace Streamistry.Pipes.Sources;
Expand All @@ -22,7 +23,8 @@ public GlobbingSource(string directory, string glob)
public GlobbingSource(string directory, string[] globs)
: this(directory, globs, []) { }

public GlobbingSource(string directory, string[] includeGlobs, string[] excludeGlobs)
public GlobbingSource(string directory, string[] includeGlobs, string[] excludeGlobs, ObservabilityProvider? provider = null)
: base(provider)
{
Directory = new DirectoryInfoWrapper(new DirectoryInfo(directory));
Matcher.AddIncludePatterns(includeGlobs);
Expand Down
16 changes: 13 additions & 3 deletions Streamistry.Core/Sink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,33 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using Streamistry.Observability;

namespace Streamistry;
public class Sink<T> : IProcessablePipe<T>
public class Sink<T> : IProcessablePipe<T>, IObservablePipe
{
public Action<T?> Function { get; }
protected ObservabilityProvider? Observability { get; private set; }

public Sink(IChainablePipe<T> upstream, Action<T?> function)
{
upstream.RegisterDownstream(Emit);
RegisterObservability(upstream.GetObservabilityProvider());
Function = function;
}

public void Emit(T? obj)
=> Invoke(obj);

[Telemetry]
[Trace]
protected void Invoke(T? obj)
=> Function.Invoke(obj);

public void RegisterObservability(ObservabilityProvider? observability)
=> Observability = observability;
public void CascadeObservability(IObservablePipe pipe)
=> pipe.RegisterObservability(Observability);

public ObservabilityProvider? GetObservabilityProvider()
=> Observability;
}
6 changes: 6 additions & 0 deletions Streamistry.Core/Source.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Observability;

namespace Streamistry;

Expand All @@ -13,6 +15,10 @@ namespace Streamistry;
/// <typeparam name="TOutput">The type of the elements in both the input and output streams.</typeparam>
public abstract class Source<TOutput> : ChainablePipe<TOutput>
{
protected Source(ObservabilityProvider? provider)
: base(provider)
{ }

private bool IsStarted { get; set; }

public void Start()
Expand Down
5 changes: 3 additions & 2 deletions Streamistry.Core/Splitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Streamistry.Telemetry;
using Streamistry.Observability;

namespace Streamistry;

Expand All @@ -18,6 +18,7 @@ internal class Splitter<TInput, TOutput> : ChainablePipe<TOutput>, IProcessableP
public Func<TInput?, TOutput[]?> Function { get; init; }

public Splitter(IChainablePipe<TInput> upstream, Func<TInput?, TOutput[]?> function)
: base(upstream.GetObservabilityProvider())
{
upstream.RegisterDownstream(Emit);
Function = function;
Expand All @@ -33,7 +34,7 @@ public void Emit(TInput? obj)
PushDownstream(result);
}

[Telemetry]
[Trace]
protected TOutput[]? Invoke(TInput? obj)
=> Function.Invoke(obj);
}
16 changes: 0 additions & 16 deletions Streamistry.Core/Telemetry/TelemetryProvider.cs

This file was deleted.

Loading

0 comments on commit 54f8b8c

Please sign in to comment.