Skip to content

Commit

Permalink
feat: basic sources to feed pipelines (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Aug 21, 2024
1 parent 8bcbd7a commit ac4b072
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 0 deletions.
19 changes: 19 additions & 0 deletions Streamistry.Core/Pipes/Pipelines/SourcePipeline.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Pipes.Pipelines;
public class SourcePipeline<TOutput> : Pipeline<TOutput>
{
private Source<TOutput> Source { get; }
public SourcePipeline(Source<TOutput> source)
=> Source = source;

public void Start()
=> Source.Start();

public void Stop()
=> Source.Stop();
}
25 changes: 25 additions & 0 deletions Streamistry.Core/Pipes/Sources/EnumerableSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry.Pipes.Sources;
public class EnumerableSource<TOutput> : Source<TOutput>
{
private IEnumerator<TOutput> Enumerator { get; set; }

public EnumerableSource(IEnumerable<TOutput> values)
=> Enumerator = values.GetEnumerator();

protected override bool TryReadNext(out TOutput? item)
{
if (Enumerator.MoveNext())
{
item = Enumerator.Current;
return true;
}
item = default;
return false;
}
}
55 changes: 55 additions & 0 deletions Streamistry.Core/Pipes/Sources/GlobbingSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection.Metadata;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.FileSystemGlobbing;
using Microsoft.Extensions.FileSystemGlobbing.Abstractions;
using static System.Reflection.Metadata.BlobBuilder;

namespace Streamistry.Pipes.Sources;
public class GlobbingSource<TOutput> : Source<TOutput>
{
private DirectoryInfoBase Directory { get; }
private Matcher Matcher { get; } = new();
private Queue<string>? Files { get; set; }

public GlobbingSource(string directory, string glob)
: this(directory, [glob]) { }

public GlobbingSource(string directory, string[] globs)
: this(directory, globs, []) { }

public GlobbingSource(string directory, string[] includeGlobs, string[] excludeGlobs)
{
Directory = new DirectoryInfoWrapper(new DirectoryInfo(directory));
Matcher.AddIncludePatterns(includeGlobs);
Matcher.AddExcludePatterns(excludeGlobs);
}

protected override bool TryReadNext(out TOutput? item)
{
Files ??= QueueFiles(Matcher);

if (Files.TryDequeue(out var file))
{
var fullPath = Path.Combine([Directory.FullName, file]);
using var reader = new StreamReader(fullPath);
item = (TOutput?)Convert.ChangeType(reader.ReadToEnd(), typeof(TOutput));
return true;
}
item = default;
return false;
}

private Queue<string> QueueFiles(Matcher matcher)
{
var files = new Queue<string>();
var results = matcher.Execute(Directory);
foreach (var result in results.Files)
files.Enqueue(result.Path);
return files;
}
}
36 changes: 36 additions & 0 deletions Streamistry.Core/Source.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Streamistry;

/// <summary>
/// Represents a pipeline element that applies a predicate function to each element within a batch of this stream.
/// The output stream is composed of elements that satisfy the predicate; elements that do not satisfy the predicate are excluded from the downstream stream.
/// </summary>
/// <typeparam name="TOutput">The type of the elements in both the input and output streams.</typeparam>
public abstract class Source<TOutput> : ChainablePipe<TOutput>
{
private bool IsStarted { get; set; }

public void Start()
{
IsStarted = true;
Read();
}

public void Stop()
{
IsStarted = false;
}

protected virtual void Read()
{
while (IsStarted && TryReadNext(out var item))
PushDownstream(item);
}

protected abstract bool TryReadNext(out TOutput? item);
}
4 changes: 4 additions & 0 deletions Streamistry.Core/Streamistry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.FileSystemGlobbing" Version="8.0.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Update="DotNet.ReproducibleBuilds" Version="1.2.4">
<PrivateAssets>all</PrivateAssets>
Expand Down
25 changes: 25 additions & 0 deletions Streamistry.Testing/Pipes/Sources/EnumerableSourceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Pipelines;
using Streamistry.Pipes.Sinks;
using Streamistry.Pipes.Sources;

namespace Streamistry.Testing.Pipes.Sources;
public class EnumerableSourceTests
{
[Test]
public void Read_ThreeElements_Successful()
{
var source = new EnumerableSource<int>([1,11,4]);
var pipeline = new SourcePipeline<int>(source);
var sink = new MemorySink<int>(source);

pipeline.Start();
Assert.That(sink.State, Has.Count.EqualTo(3));
Assert.That(sink.State.Last(), Is.EqualTo(4));
}
}
48 changes: 48 additions & 0 deletions Streamistry.Testing/Pipes/Sources/GlobbingSourceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Pipelines;
using Streamistry.Pipes.Sinks;
using Streamistry.Pipes.Sources;

namespace Streamistry.Testing.Pipes.Sources;
public class GlobbingSourceTests
{
private const string Extension = ".glob.txt";

[OneTimeSetUp]
public void OneTimeSetUp()
{
Tuple<string, int>[] files = [new("foo", 1), new("bar", 25), new("qwark", 33)];

foreach (var file in files)
{
using var writer = new StreamWriter($"{file.Item1}{Extension}");
writer.WriteLine(file.Item2);
writer.Close();
}
}

[OneTimeTearDown]
public void OneTimeTearDown()
{
var files = Directory.GetFiles(".", $"*{Extension}");
foreach (var file in files)
File.Delete(file);
}

[Test]
public void Read_ThreeElements_Successful()
{
var source = new GlobbingSource<int>(".", $"*{Extension}");
var pipeline = new SourcePipeline<int>(source);
var sink = new MemorySink<int>(source);

pipeline.Start();
Assert.That(sink.State, Has.Count.EqualTo(3));
Assert.That(sink.State.Last(), Is.EqualTo(33));
}
}

0 comments on commit ac4b072

Please sign in to comment.