Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output on aggregations and partition reset #152

Open
octmar opened this issue Apr 29, 2021 · 0 comments
Open

Output on aggregations and partition reset #152

octmar opened this issue Apr 29, 2021 · 0 comments

Comments

@octmar
Copy link

octmar commented Apr 29, 2021

Hello,
I need help with another scenario on partitioned streams. I have some devices that are raising events to a partitioned ingress stream and i have defined an aggregated output query (sum). My current issue is that output is generated only on a second event pushed and onward, although the output itself is clearly related to the previous input. Example:
Input A {t1, value = 2}
no output
Input B {t2, value = 2}
Sum output: {value = 2, t (depends on manipulation of event times, in my case, t1)}
Code:

    public sealed class Program
    {
    private static event Action<IndexChangeEvent> PushIndexChangeEvent;

    public struct IndexChangeEvent
    {
        public string IndicatorId;
        public string IndexId;
        public DateTime Date;
        public int Value;
    }

    private static void Process<T>(long s, long e, string key, T data)
    {
        var indexName = data.GetType().GetProperty("Name").GetValue(data).ToString();
        var indexValue = data.GetType().GetProperty("Value").GetValue(data).ToString();
        Console.WriteLine($"Key: {key} Start: {new DateTime(s).ToString("dd/MM/yyyy HH:mm:ss.fffffffZ")} End: {new DateTime(e).ToString("dd/MM/yyyy HH:mm:ss.fffffffZ")} Name: {indexName} Value: {indexValue}");
    }

    public static void Main(string[] args)
    {
        Config.ClearColumnsOnReturn = true;
        var observable = Observable.FromEvent<IndexChangeEvent>(
            onNext => PushIndexChangeEvent += onNext,
            onNext => PushIndexChangeEvent -= onNext);

        var container = new QueryContainer();
  
        var ingressStream = container.RegisterInput(
            streamEvents: observable.Synchronize().Select(e => PartitionedStreamEvent.CreatePoint(e.IndicatorId, e.Date.Ticks, e)),
            flushPolicy: PartitionedFlushPolicy.None,
            onCompletedPolicy: OnCompletedPolicy.None,
            disorderPolicy: DisorderPolicy.Drop()
            );

        var ingressStreamMulti = ingressStream.Multicast(2);

        var queryInput = ingressStreamMulti[0]
            .Select(e => new { Name = $"{e.IndexId}.{e.Value}", Value = e.Date.ToString("dd/MM/yyyy HH:mm:ss.fffffffZ") });

        var egressStreamInput = container
            .RegisterOutput(queryInput, identifier: "queryInput")
            .ForEachAsync(e => Process(e.StartTime, e.EndTime, e.PartitionKey, e.Payload));

        var queryQtyProduced = ingressStreamMulti[1]
            .AlterEventDuration(PartitionedStreamEvent.InfinitySyncTime)
            .Sum(e => e.Value)
            .Select(e => new { Name = "QtyProduced", Value = e.ToString() })
            .AlterEventDuration(TimeSpan.FromTicks(1).Ticks);

        var egressStreamQtyProduced = container
            .RegisterOutput(queryQtyProduced, identifier: "queryQtyProduced")
            .ForEachAsync(e => Process(e.StartTime, e.EndTime, e.PartitionKey, e.Payload));

        var pipe = container.Restore();

        var data = DateTime.Now;
        var eve11 = new IndexChangeEvent { IndicatorId = "1", IndexId = "1", Date = data, Value = 2 };
        PushIndexChangeEvent?.Invoke(eve11);
        pipe.Flush();
        var eve12 = new IndexChangeEvent { IndicatorId = "1", IndexId = "1", Date = data.AddSeconds(1), Value = 2 };
        PushIndexChangeEvent?.Invoke(eve12);
        pipe.Flush();
        var eve13 = new IndexChangeEvent { IndicatorId = "1", IndexId = "1", Date = data.AddSeconds(2), Value = 2 };
        PushIndexChangeEvent?.Invoke(eve13);
        pipe.Flush();

        Console.ReadLine();
    }
}

From this code you can see that the first query produces output for each input, while the second, only on the second input event and forward. On another piece of code, where i don't use partitioned streams, this same query produces output for every input.

I also have another question regarding partitioned streams. Is it possible to reset all accumulations for a specific partition? I have scenarios where i want to reset data for a partition and i don't want to reinitialize the stream as to not affect other partitions.
Thank you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant