Skip to content

Commit

Permalink
chore: move polars-specific DSL functionality into its own file
Browse files Browse the repository at this point in the history
  • Loading branch information
doudou committed Oct 8, 2024
1 parent b393c30 commit 28c4eb6
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 240 deletions.
70 changes: 0 additions & 70 deletions lib/syskit/log/dsl.rb
Original file line number Diff line number Diff line change
Expand Up @@ -425,71 +425,6 @@ def to_daru_frame(*streams, accurate: false, timeout: nil)
end
end

# Convert fields of a data stream into a Polars frame
#
# @param [Array] streams an array if objects that can be converted to
# samples using {#samples_of}
# @param [Boolean] accurate prefer accuracy over speed (see below)
# @param [Float,nil] timeout how long, since the last received sample
# from a stream, the method will start introducing NAs to replace
# the stream's values (NA is either NAN for float values, or nil)
# @yield a {FrameBuilder} object used to describe the frame to be built
#
# This method uses the first given stream as a "master" stream, and
# attempts to load the value of the remaining columns at the same
# real time than the value from the master stream.
#
# How the method deals with resampling (when {#interval_sample_every} has
# been called) depends on the `accurate` parameter. When `false`,
# the streams are first re-sampled and then aligned. When doing coarse
# sampling, this can introduce significant misalignments. When true,
# the method resamples only the first stream, and then aligns the
# other full non-resampled streams. accurante: false is significantly
# faster for very dense streams (w.r.t. the sampling period)
def to_polars_frame(
*streams, accurate: false, timeout: nil, chunk_size: Polars::CHUNK_SIZE
)
return ::Polars::DataFrame.new if streams.empty?

interval_start, interval_end = streams.map(&:interval_lg).transpose
interval_start = interval_start.min
interval_end = interval_end.max
interval_start = [interval_start, @interval[0]].max if @interval[0]
interval_end = [interval_end, @interval[1]].min if @interval[1]

if accurate
first_samples =
samples_of(streams[0], from: interval_start, to: interval_end)

samples = [first_samples] + streams[1..-1].map do |s|
samples_of(s, from: interval_start, to: interval_end,
every_samples: nil, every_seconds: nil)
end
else
samples = streams.map do |s|
samples_of(s, from: interval_start, to: interval_end)
end
end

builders = streams.map { |s| Polars::FrameBuilder.new(s.type) }
yield(*builders)

@interval_zero_time ||= streams.first.interval_lg[0]

if builders.size == 1
builders.first.to_polars_frame(
@interval_zero_time, samples.first,
timeout: timeout, chunk_size: chunk_size
)
else
joint_stream = Pocolog::StreamAligner.new(false, *samples)
Polars.create_aligned_frame(
@interval_zero_time, builders, joint_stream,
timeout: timeout, chunk_size: chunk_size
)
end
end

# Restricts a data stream to the current interval and sample selection
#
# @see interval_select interval_shift_start interval_shift_end
Expand Down Expand Up @@ -648,11 +583,6 @@ def roby_vega_task_timeline_data(*tasks)
end
end

# Convert a Daru frame into a vega data array
def polars_to_vega(frame)
frame.fill_nan(nil).to_a
end

# Convert a Daru frame into a vega data array
def daru_to_vega(frame, every: 1)
data = []
Expand Down
3 changes: 3 additions & 0 deletions lib/syskit/log/polars.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
require "syskit/log/polars/column_logical_time_builder"
require "syskit/log/polars/path_builder"
require "syskit/log/polars/create_aligned_frame"
require "syskit/log/polars/dsl"

Syskit::Log::DSL.include(Syskit::Log::Polars::DSL)
116 changes: 116 additions & 0 deletions lib/syskit/log/polars/dsl.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# frozen_string_literal: true

module Syskit
module Log
module Polars
# Polars-specific functionality from {Log::DSL}
module DSL
# Convert a Daru frame into a vega data array
def polars_to_vega(frame)
frame.fill_nan(nil).to_a
end

# Convert fields of a data stream into a Polars frame
#
# @param [Array] streams an array if objects that can be converted to
# samples using {#samples_of}
# @param [Boolean] accurate prefer accuracy over speed (see below)
# @param [Float,nil] timeout how long, since the last received sample
# from a stream, the method will start introducing NAs to replace
# the stream's values (NA is either NAN for float values, or nil)
# @yield a {FrameBuilder} object used to describe the frame to be built
#
# This method uses the first given stream as a "master" stream, and
# attempts to load the value of the remaining columns at the same
# real time than the value from the master stream.
#
# How the method deals with resampling (when {#interval_sample_every} has
# been called) depends on the `accurate` parameter. When `false`,
# the streams are first re-sampled and then aligned. When doing coarse
# sampling, this can introduce significant misalignments. When true,
# the method resamples only the first stream, and then aligns the
# other full non-resampled streams. accurante: false is significantly
# faster for very dense streams (w.r.t. the sampling period)
def to_polars_frame(
*streams, accurate: false, timeout: nil,
chunk_size: Polars::CHUNK_SIZE
)
return ::Polars::DataFrame.new if streams.empty?

samples =
if accurate
to_polars_frame_accurate_samples(streams)
else
to_polars_frame_samples(streams)
end

builders = streams.map { |s| Polars::FrameBuilder.new(s.type) }
yield(*builders)

center_time = @interval_zero_time || streams.first.interval_lg[0]

to_polars_frame_execute(
builders, center_time, samples,
timeout: timeout, chunk_size: chunk_size
)
end

# @api private
def to_polars_frame_execute(
builders, center_time, samples, timeout:, chunk_size:
)
if builders.size == 1
builders.first.to_polars_frame(
center_time, samples.first,
timeout: timeout, chunk_size: chunk_size
)
else
joint_stream = Pocolog::StreamAligner.new(false, *samples)
Polars.create_aligned_frame(
center_time, builders, joint_stream,
timeout: timeout, chunk_size: chunk_size
)
end
end

# @api private
#
# Create the samples enumeration objects for to_polars_frame in the
# accurate case
def to_polars_frame_accurate_samples(streams)
interval_start, interval_end = to_polars_frame_interval(streams)
first_samples =
samples_of(streams[0], from: interval_start, to: interval_end)

[first_samples] + streams[1..-1].map do |s|
samples_of(s, from: interval_start, to: interval_end,
every_samples: nil, every_seconds: nil)
end
end

# @api private
#
# Create the samples enumeration objects for to_polars_frame in the
# normal case
def to_polars_frame_samples(streams)
interval_start, interval_end = to_polars_frame_interval(streams)
streams.map do |s|
samples_of(s, from: interval_start, to: interval_end)
end
end

# @api private
#
# Compute the frame interval for to_polars_frame
def to_polars_frame_interval(streams)
interval_start, interval_end = streams.map(&:interval_lg).transpose
interval_start = interval_start.min
interval_end = interval_end.max
interval_start = [interval_start, @interval[0]].max if @interval[0]
interval_end = [interval_end, @interval[1]].min if @interval[1]
[interval_start, interval_end]
end
end
end
end
end
170 changes: 0 additions & 170 deletions test/dsl_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@
require "daru"
require "iruby"

HAS_POLARS =
begin
require "polars"
true
rescue LoadError # rubocop:disable Lint/SuppressedException
end

require "syskit/log/polars" if HAS_POLARS

module Syskit
module Log # :nodoc:
describe DSL do
Expand Down Expand Up @@ -355,167 +346,6 @@ module Log # :nodoc:
end
end

describe "#to_polars_frame" do
before do
skip "polars-ruby is not installed" unless HAS_POLARS

now_nsec = Time.now
now = Time.at(now_nsec.tv_sec, now_nsec.tv_usec)
now_usec = now.tv_sec * 1_000_000 + now.tv_usec

registry = Typelib::CXXRegistry.new
compound_t = registry.create_compound "/C" do |b|
b.t = "/uint64_t"
b.d = "/double"
b.i = "/int"
end
create_dataset "exists" do
create_logfile "test.0.log" do
create_logfile_stream(
"test", type: compound_t, metadata: {
"rock_task_name" => "task_test",
"rock_task_object_name" => "port_test",
"rock_stream_type" => "port"
}
)
write_logfile_sample now, now, { t: now_usec - 500, d: 0.1, i: 1 }
write_logfile_sample(
now + 10, now + 1, { t: now_usec + 999_500, d: 0.2, i: 2 }
)
end

create_logfile "test1.0.log" do
create_logfile_stream(
"test", type: compound_t, metadata: {
"rock_task_name" => "task_test1",
"rock_task_object_name" => "port_test",
"rock_stream_type" => "port"
}
)
write_logfile_sample(
now, now + 0.1, { t: now_usec + 100_000, d: 0.15, i: 3 }
)
write_logfile_sample(
now + 10, now + 0.9,
{ t: now_usec + 900_000, d: 0.25, i: 4 }
)
end
end

@context = make_context
@context.datastore_select @datastore_path
@context.dataset_select
end

it "creates a frame from a single stream" do
port = @context.task_test_task.port_test_port
frame = @context.to_polars_frame port do |f|
f.add_logical_time
f.add(&:d)
end

assert_equal [0, 1], frame["time"].to_a
assert_equal [0.1, 0.2], frame[".d"].to_a
end

it "allows overriding the column type" do
port = @context.task_test_task.port_test_port
port1 = @context.task_test1_task.port_test_port
frame = @context.to_polars_frame port, port1 do |a, b|
a.add_time_field("t", &:t)
a.add("a", dtype: :f32, &:d)
b.add("b", &:d)
end

assert_equal ::Polars::Float32, frame["a"].dtype
end

it "centers the time fields" do
port = @context.task_test_task.port_test_port
port1 = @context.task_test1_task.port_test_port
frame = @context.to_polars_frame port, port1 do |a, b|
a.add_time_field("t", &:t)
a.add("a", &:d)
b.add("b", &:d)
end

expected = ::Polars::DataFrame.new(
{
t: [-0.0005, 0.9995],
a: [0.1, 0.2],
b: [0.15, 0.25]
}, schema: nil
) # 'schema' option for 2.7 compatibility
assert_polars_frame_near(expected, frame)
end

def assert_polars_frame_near(expected, actual)
diff = expected - actual
max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max
assert_operator max, :<, 1e-6
end

it "aligns different streams in a single frame" do
port = @context.task_test_task.port_test_port
port1 = @context.task_test1_task.port_test_port
frame = @context.to_polars_frame port, port1 do |a, b|
a.add_logical_time("a_time")
a.add("a", &:d)
b.add("b", &:d)
b.add_logical_time("b_time")
end

expected = ::Polars::DataFrame.new(
{
a_time: [0, 1],
a: [0.1, 0.2],
b: [0.15, 0.25],
b_time: [0.1, 0.9]
}, schema: nil
) # schema for 2.7 compatibility
diff = expected - frame
max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max
assert_operator max, :<, 1e-6
end

it "handles datasets bigger than the chunk size" do
port = @context.task_test_task.port_test_port
port1 = @context.task_test1_task.port_test_port
frame = @context.to_polars_frame port, port1, chunk_size: 1 do |a, b|
a.add_logical_time("a_time")
a.add("a", &:d)
b.add("b", &:d)
b.add_logical_time("b_time")
end

expected = ::Polars::DataFrame.new(
{
a_time: [0, 1],
a: [0.1, 0.2],
b: [0.15, 0.25],
b_time: [0.1, 0.9]
}, schema: nil
) # schema for 2.7 compatibility
diff = expected - frame
max = (diff.max.row(0).to_a + diff.min.row(0).to_a).map(&:abs).max
assert_operator max, :<, 1e-6
end
end

describe "polars_to_vega" do
before do
@context = make_context
end

it "converts NaN into nil" do
frame = ::Polars::DataFrame.new(
{ "a" => [Float::NAN, 0.1] }, schema: nil
)
vega = @context.polars_to_vega(frame)
assert_equal [{ "a" => nil }, { "a" => 0.1 }], vega
end
end

describe "daru_to_vega" do
before do
@context = make_context
Expand Down
Loading

0 comments on commit 28c4eb6

Please sign in to comment.