diff --git a/lib/syskit/log/dsl.rb b/lib/syskit/log/dsl.rb index 8ac6bfa..b1d23cc 100644 --- a/lib/syskit/log/dsl.rb +++ b/lib/syskit/log/dsl.rb @@ -425,71 +425,6 @@ def to_daru_frame(*streams, accurate: false, timeout: nil) end end - # Convert fields of a data stream into a Polars frame - # - # @param [Array] streams an array if objects that can be converted to - # samples using {#samples_of} - # @param [Boolean] accurate prefer accuracy over speed (see below) - # @param [Float,nil] timeout how long, since the last received sample - # from a stream, the method will start introducing NAs to replace - # the stream's values (NA is either NAN for float values, or nil) - # @yield a {FrameBuilder} object used to describe the frame to be built - # - # This method uses the first given stream as a "master" stream, and - # attempts to load the value of the remaining columns at the same - # real time than the value from the master stream. - # - # How the method deals with resampling (when {#interval_sample_every} has - # been called) depends on the `accurate` parameter. When `false`, - # the streams are first re-sampled and then aligned. When doing coarse - # sampling, this can introduce significant misalignments. When true, - # the method resamples only the first stream, and then aligns the - # other full non-resampled streams. accurante: false is significantly - # faster for very dense streams (w.r.t. the sampling period) - def to_polars_frame( - *streams, accurate: false, timeout: nil, chunk_size: Polars::CHUNK_SIZE - ) - return ::Polars::DataFrame.new if streams.empty? - - interval_start, interval_end = streams.map(&:interval_lg).transpose - interval_start = interval_start.min - interval_end = interval_end.max - interval_start = [interval_start, @interval[0]].max if @interval[0] - interval_end = [interval_end, @interval[1]].min if @interval[1] - - if accurate - first_samples = - samples_of(streams[0], from: interval_start, to: interval_end) - - samples = [first_samples] + streams[1..-1].map do |s| - samples_of(s, from: interval_start, to: interval_end, - every_samples: nil, every_seconds: nil) - end - else - samples = streams.map do |s| - samples_of(s, from: interval_start, to: interval_end) - end - end - - builders = streams.map { |s| Polars::FrameBuilder.new(s.type) } - yield(*builders) - - @interval_zero_time ||= streams.first.interval_lg[0] - - if builders.size == 1 - builders.first.to_polars_frame( - @interval_zero_time, samples.first, - timeout: timeout, chunk_size: chunk_size - ) - else - joint_stream = Pocolog::StreamAligner.new(false, *samples) - Polars.create_aligned_frame( - @interval_zero_time, builders, joint_stream, - timeout: timeout, chunk_size: chunk_size - ) - end - end - # Restricts a data stream to the current interval and sample selection # # @see interval_select interval_shift_start interval_shift_end @@ -648,11 +583,6 @@ def roby_vega_task_timeline_data(*tasks) end end - # Convert a Daru frame into a vega data array - def polars_to_vega(frame) - frame.fill_nan(nil).to_a - end - # Convert a Daru frame into a vega data array def daru_to_vega(frame, every: 1) data = [] diff --git a/lib/syskit/log/polars.rb b/lib/syskit/log/polars.rb index 342f2c3..6968be0 100644 --- a/lib/syskit/log/polars.rb +++ b/lib/syskit/log/polars.rb @@ -7,3 +7,6 @@ require "syskit/log/polars/column_logical_time_builder" require "syskit/log/polars/path_builder" require "syskit/log/polars/create_aligned_frame" +require "syskit/log/polars/dsl" + +Syskit::Log::DSL.include(Syskit::Log::Polars::DSL) diff --git a/lib/syskit/log/polars/dsl.rb b/lib/syskit/log/polars/dsl.rb new file mode 100644 index 0000000..2fe2f5a --- /dev/null +++ b/lib/syskit/log/polars/dsl.rb @@ -0,0 +1,116 @@ +# frozen_string_literal: true + +module Syskit + module Log + module Polars + # Polars-specific functionality from {Log::DSL} + module DSL + # Convert a Daru frame into a vega data array + def polars_to_vega(frame) + frame.fill_nan(nil).to_a + end + + # Convert fields of a data stream into a Polars frame + # + # @param [Array] streams an array if objects that can be converted to + # samples using {#samples_of} + # @param [Boolean] accurate prefer accuracy over speed (see below) + # @param [Float,nil] timeout how long, since the last received sample + # from a stream, the method will start introducing NAs to replace + # the stream's values (NA is either NAN for float values, or nil) + # @yield a {FrameBuilder} object used to describe the frame to be built + # + # This method uses the first given stream as a "master" stream, and + # attempts to load the value of the remaining columns at the same + # real time than the value from the master stream. + # + # How the method deals with resampling (when {#interval_sample_every} has + # been called) depends on the `accurate` parameter. When `false`, + # the streams are first re-sampled and then aligned. When doing coarse + # sampling, this can introduce significant misalignments. When true, + # the method resamples only the first stream, and then aligns the + # other full non-resampled streams. accurante: false is significantly + # faster for very dense streams (w.r.t. the sampling period) + def to_polars_frame( + *streams, accurate: false, timeout: nil, + chunk_size: Polars::CHUNK_SIZE + ) + return ::Polars::DataFrame.new if streams.empty? + + samples = + if accurate + to_polars_frame_accurate_samples(streams) + else + to_polars_frame_samples(streams) + end + + builders = streams.map { |s| Polars::FrameBuilder.new(s.type) } + yield(*builders) + + center_time = @interval_zero_time || streams.first.interval_lg[0] + + to_polars_frame_execute( + builders, center_time, samples, + timeout: timeout, chunk_size: chunk_size + ) + end + + # @api private + def to_polars_frame_execute( + builders, center_time, samples, timeout:, chunk_size: + ) + if builders.size == 1 + builders.first.to_polars_frame( + center_time, samples.first, + timeout: timeout, chunk_size: chunk_size + ) + else + joint_stream = Pocolog::StreamAligner.new(false, *samples) + Polars.create_aligned_frame( + center_time, builders, joint_stream, + timeout: timeout, chunk_size: chunk_size + ) + end + end + + # @api private + # + # Create the samples enumeration objects for to_polars_frame in the + # accurate case + def to_polars_frame_accurate_samples(streams) + interval_start, interval_end = to_polars_frame_interval(streams) + first_samples = + samples_of(streams[0], from: interval_start, to: interval_end) + + [first_samples] + streams[1..-1].map do |s| + samples_of(s, from: interval_start, to: interval_end, + every_samples: nil, every_seconds: nil) + end + end + + # @api private + # + # Create the samples enumeration objects for to_polars_frame in the + # normal case + def to_polars_frame_samples(streams) + interval_start, interval_end = to_polars_frame_interval(streams) + streams.map do |s| + samples_of(s, from: interval_start, to: interval_end) + end + end + + # @api private + # + # Compute the frame interval for to_polars_frame + def to_polars_frame_interval(streams) + interval_start, interval_end = streams.map(&:interval_lg).transpose + interval_start = interval_start.min + interval_end = interval_end.max + interval_start = [interval_start, @interval[0]].max if @interval[0] + interval_end = [interval_end, @interval[1]].min if @interval[1] + [interval_start, interval_end] + end + end + end + end +end diff --git a/test/dsl_test.rb b/test/dsl_test.rb index caa1756..31be6e5 100644 --- a/test/dsl_test.rb +++ b/test/dsl_test.rb @@ -5,15 +5,6 @@ require "daru" require "iruby" -HAS_POLARS = - begin - require "polars" - true - rescue LoadError # rubocop:disable Lint/SuppressedException - end - -require "syskit/log/polars" if HAS_POLARS - module Syskit module Log # :nodoc: describe DSL do @@ -355,167 +346,6 @@ module Log # :nodoc: end end - describe "#to_polars_frame" do - before do - skip "polars-ruby is not installed" unless HAS_POLARS - - now_nsec = Time.now - now = Time.at(now_nsec.tv_sec, now_nsec.tv_usec) - now_usec = now.tv_sec * 1_000_000 + now.tv_usec - - registry = Typelib::CXXRegistry.new - compound_t = registry.create_compound "/C" do |b| - b.t = "/uint64_t" - b.d = "/double" - b.i = "/int" - end - create_dataset "exists" do - create_logfile "test.0.log" do - create_logfile_stream( - "test", type: compound_t, metadata: { - "rock_task_name" => "task_test", - "rock_task_object_name" => "port_test", - "rock_stream_type" => "port" - } - ) - write_logfile_sample now, now, { t: now_usec - 500, d: 0.1, i: 1 } - write_logfile_sample( - now + 10, now + 1, { t: now_usec + 999_500, d: 0.2, i: 2 } - ) - end - - create_logfile "test1.0.log" do - create_logfile_stream( - "test", type: compound_t, metadata: { - "rock_task_name" => "task_test1", - "rock_task_object_name" => "port_test", - "rock_stream_type" => "port" - } - ) - write_logfile_sample( - now, now + 0.1, { t: now_usec + 100_000, d: 0.15, i: 3 } - ) - write_logfile_sample( - now + 10, now + 0.9, - { t: now_usec + 900_000, d: 0.25, i: 4 } - ) - end - end - - @context = make_context - @context.datastore_select @datastore_path - @context.dataset_select - end - - it "creates a frame from a single stream" do - port = @context.task_test_task.port_test_port - frame = @context.to_polars_frame port do |f| - f.add_logical_time - f.add(&:d) - end - - assert_equal [0, 1], frame["time"].to_a - assert_equal [0.1, 0.2], frame[".d"].to_a - end - - it "allows overriding the column type" do - port = @context.task_test_task.port_test_port - port1 = @context.task_test1_task.port_test_port - frame = @context.to_polars_frame port, port1 do |a, b| - a.add_time_field("t", &:t) - a.add("a", dtype: :f32, &:d) - b.add("b", &:d) - end - - assert_equal ::Polars::Float32, frame["a"].dtype - end - - it "centers the time fields" do - port = @context.task_test_task.port_test_port - port1 = @context.task_test1_task.port_test_port - frame = @context.to_polars_frame port, port1 do |a, b| - a.add_time_field("t", &:t) - a.add("a", &:d) - b.add("b", &:d) - end - - expected = ::Polars::DataFrame.new( - { - t: [-0.0005, 0.9995], - a: [0.1, 0.2], - b: [0.15, 0.25] - }, schema: nil - ) # 'schema' option for 2.7 compatibility - assert_polars_frame_near(expected, frame) - end - - def assert_polars_frame_near(expected, actual) - diff = expected - actual - max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max - assert_operator max, :<, 1e-6 - end - - it "aligns different streams in a single frame" do - port = @context.task_test_task.port_test_port - port1 = @context.task_test1_task.port_test_port - frame = @context.to_polars_frame port, port1 do |a, b| - a.add_logical_time("a_time") - a.add("a", &:d) - b.add("b", &:d) - b.add_logical_time("b_time") - end - - expected = ::Polars::DataFrame.new( - { - a_time: [0, 1], - a: [0.1, 0.2], - b: [0.15, 0.25], - b_time: [0.1, 0.9] - }, schema: nil - ) # schema for 2.7 compatibility - diff = expected - frame - max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max - assert_operator max, :<, 1e-6 - end - - it "handles datasets bigger than the chunk size" do - port = @context.task_test_task.port_test_port - port1 = @context.task_test1_task.port_test_port - frame = @context.to_polars_frame port, port1, chunk_size: 1 do |a, b| - a.add_logical_time("a_time") - a.add("a", &:d) - b.add("b", &:d) - b.add_logical_time("b_time") - end - - expected = ::Polars::DataFrame.new( - { - a_time: [0, 1], - a: [0.1, 0.2], - b: [0.15, 0.25], - b_time: [0.1, 0.9] - }, schema: nil - ) # schema for 2.7 compatibility - diff = expected - frame - max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max - assert_operator max, :<, 1e-6 - end - end - - describe "polars_to_vega" do - before do - @context = make_context - end - - it "converts NaN into nil" do - frame = ::Polars::DataFrame.new( - { "a" => [Float::NAN, 0.1] }, schema: nil - ) - vega = @context.polars_to_vega(frame) - assert_equal [{ "a" => nil }, { "a" => 0.1 }], vega - end - end - describe "daru_to_vega" do before do @context = make_context diff --git a/test/polars/dsl_test.rb b/test/polars/dsl_test.rb new file mode 100644 index 0000000..7677558 --- /dev/null +++ b/test/polars/dsl_test.rb @@ -0,0 +1,214 @@ +# frozen_string_literal: true + +HAS_POLARS = + begin + require "polars" + true + rescue LoadError # rubocop:disable Lint/SuppressedException + end + +require "syskit/log/polars" if HAS_POLARS + +module Syskit + module Log + module Polars + describe DSL do + before do + @__default_store = ENV["SYSKIT_LOG_STORE"] + ENV.delete("SYSKIT_LOG_STORE") + + @root_path = Pathname.new(Dir.mktmpdir) + @datastore_path = @root_path + "datastore" + create_datastore(@datastore_path) + end + + after do + @root_path.rmtree + if @__default_store + ENV["SYSKIT_LOG_STORE"] = @__default_store + else + ENV.delete("SYSKIT_LOG_STORE") + end + end + + describe "#to_polars_frame" do + before do + skip "polars-ruby is not installed" unless HAS_POLARS + + registry = Typelib::CXXRegistry.new + compound_t = registry.create_compound "/C" do |b| + b.t = "/uint64_t" + b.d = "/double" + b.i = "/int" + end + + create_test_dataset(compound_t) + + @context = make_context + @context.datastore_select @datastore_path + @context.dataset_select + end + + it "creates a frame from a single stream" do + port = @context.task_test_task.port_test_port + frame = @context.to_polars_frame port do |f| + f.add_logical_time + f.add(&:d) + end + + assert_equal [0, 1], frame["time"].to_a + assert_equal [0.1, 0.2], frame[".d"].to_a + end + + it "allows overriding the column type" do + port = @context.task_test_task.port_test_port + port1 = @context.task_test1_task.port_test_port + frame = @context.to_polars_frame port, port1 do |a, b| + a.add_time_field("t", &:t) + a.add("a", dtype: :f32, &:d) + b.add("b", &:d) + end + + assert_equal ::Polars::Float32, frame["a"].dtype + end + + it "centers the time fields" do + port = @context.task_test_task.port_test_port + port1 = @context.task_test1_task.port_test_port + frame = @context.to_polars_frame port, port1 do |a, b| + a.add_time_field("t", &:t) + a.add("a", &:d) + b.add("b", &:d) + end + + expected = ::Polars::DataFrame.new( + { + t: [-0.0005, 0.9995], + a: [0.1, 0.2], + b: [0.15, 0.25] + }, schema: nil + ) # 'schema' option for 2.7 compatibility + assert_polars_frame_near(expected, frame) + end + + def assert_polars_frame_near(expected, actual) + diff = expected - actual + max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max + assert_operator max, :<, 1e-6 + end + + it "aligns different streams in a single frame" do + port = @context.task_test_task.port_test_port + port1 = @context.task_test1_task.port_test_port + frame = @context.to_polars_frame port, port1 do |a, b| + a.add_logical_time("a_time") + a.add("a", &:d) + b.add("b", &:d) + b.add_logical_time("b_time") + end + + expected = ::Polars::DataFrame.new( + { + a_time: [0, 1], + a: [0.1, 0.2], + b: [0.15, 0.25], + b_time: [0.1, 0.9] + }, schema: nil + ) # schema for 2.7 compatibility + diff = expected - frame + max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max + assert_operator max, :<, 1e-6 + end + + it "handles datasets bigger than the chunk size" do + port = @context.task_test_task.port_test_port + port1 = @context.task_test1_task.port_test_port + frame = @context.to_polars_frame( + port, port1, chunk_size: 1 + ) do |a, b| + a.add_logical_time("a_time") + a.add("a", &:d) + b.add("b", &:d) + b.add_logical_time("b_time") + end + + expected = ::Polars::DataFrame.new( + { + a_time: [0, 1], + a: [0.1, 0.2], + b: [0.15, 0.25], + b_time: [0.1, 0.9] + }, schema: nil + ) # schema for 2.7 compatibility + diff = expected - frame + max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max + assert_operator max, :<, 1e-6 + end + end + + describe "polars_to_vega" do + before do + @context = make_context + end + + it "converts NaN into nil" do + frame = ::Polars::DataFrame.new( + { "a" => [Float::NAN, 0.1] }, schema: nil + ) + vega = @context.polars_to_vega(frame) + assert_equal [{ "a" => nil }, { "a" => 0.1 }], vega + end + end + + def create_test_dataset(compound_t) # rubocop:disable Metrics/AbcSize + now_nsec = Time.now + now = Time.at(now_nsec.tv_sec, now_nsec.tv_usec) + now_usec = now.tv_sec * 1_000_000 + now.tv_usec + + create_dataset "exists" do # rubocop:disable Metrics/BlockLength + create_logfile "test.0.log" do + create_logfile_stream( + "test", type: compound_t, metadata: { + "rock_task_name" => "task_test", + "rock_task_object_name" => "port_test", + "rock_stream_type" => "port" + } + ) + write_logfile_sample( + now, now, { t: now_usec - 500, d: 0.1, i: 1 } + ) + write_logfile_sample( + now + 10, now + 1, + { t: now_usec + 999_500, d: 0.2, i: 2 } + ) + end + + create_logfile "test1.0.log" do + create_logfile_stream( + "test", type: compound_t, metadata: { + "rock_task_name" => "task_test1", + "rock_task_object_name" => "port_test", + "rock_stream_type" => "port" + } + ) + write_logfile_sample( + now, now + 0.1, + { t: now_usec + 100_000, d: 0.15, i: 3 } + ) + write_logfile_sample( + now + 10, now + 0.9, + { t: now_usec + 900_000, d: 0.25, i: 4 } + ) + end + end + end + + def make_context + context = Object.new + context.extend Syskit::Log::DSL + context + end + end + end + end +end