Skip to content

Commit

Permalink
Merge pull request #51 from rock-core/polars
Browse files Browse the repository at this point in the history
feat: add support to create polars frame in the DSL module
  • Loading branch information
doudou authored Oct 23, 2024
2 parents 5dd3693 + 6a004eb commit 2873876
Show file tree
Hide file tree
Showing 12 changed files with 1,295 additions and 0 deletions.
12 changes: 12 additions & 0 deletions lib/syskit/log/polars.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

require "polars"
require "syskit/log/polars/frame_builder"
require "syskit/log/polars/column_builder"
require "syskit/log/polars/column_resolved_field_builder"
require "syskit/log/polars/column_logical_time_builder"
require "syskit/log/polars/path_builder"
require "syskit/log/polars/create_aligned_frame"
require "syskit/log/polars/dsl"

Syskit::Log::DSL.include(Syskit::Log::Polars::DSL)
32 changes: 32 additions & 0 deletions lib/syskit/log/polars/column_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

module Syskit
module Log
module Polars
# Dispatch of data to a single column
class ColumnBuilder
attr_reader :name
attr_reader :value_transform
attr_reader :global_transform

def initialize(name:, value_transform: nil, global_transform: nil)
@name = name
@value_transform = value_transform
@global_transform = global_transform
end

def apply_value_transform(value)
@value_transform ? @value_transform.call(value) : value
end

def create_series(data)
::Polars::Series.new(@name, data, dtype: @dtype)
end

def apply_global_transform(series)
@global_transform ? @global_transform.call(series) : series
end
end
end
end
end
24 changes: 24 additions & 0 deletions lib/syskit/log/polars/column_logical_time_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Syskit
module Log
module Polars
# @api private
#
# Dispatch of a stream's logical time into a single dataframe column
class ColumnLogicalTimeBuilder < ColumnBuilder
def initialize(name:, value_transform: nil, global_transform: nil)
@dtype = :u64
@na_value = nil

super
end

def resolve(time, _sample)
value = time.tv_sec * 1_000_000 + time.tv_usec
apply_value_transform(value)
end
end
end
end
end
61 changes: 61 additions & 0 deletions lib/syskit/log/polars/column_resolved_field_builder.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# frozen_string_literal: true

module Syskit
module Log
module Polars
# @api private
#
# Dispatch of a stream's field data into a single column
class ColumnResolvedFieldBuilder < ColumnBuilder
NA_UNSET = Object.new

def initialize( # rubocop:disable Metrics/ParameterLists
name:, path:, type:, value_transform:, global_transform:,
dtype: nil
)
if dtype && !dtype.respond_to?(:to_sym)
raise ArgumentError,
"'dtype' must be given in symbol form (e.g. :f32 "\
"instead of Polars::Float32)"
end

@path = path
@type = type
@dtype =
dtype || ColumnResolvedFieldBuilder.dtype_from_typelib_type(type)
@na_value =
ColumnResolvedFieldBuilder.na_value_from_dtype(@dtype)

super(name: name, value_transform: value_transform,
global_transform: global_transform)
end

def resolve(_time, value)
v = @path.resolve(value).first.to_ruby
apply_value_transform(v)
end

def self.na_value_from_dtype(dtype)
@na_value = (Float::NAN if dtype.to_s.start_with?("f"))
end

def self.dtype_from_typelib_type(type)
return ::Polars::Object unless type <= Typelib::NumericType

category =
if type.integer?
if type.unsigned?
"u"
else
"i"
end
else
"f"
end

"#{category}#{type.size * 8}"
end
end
end
end
end
114 changes: 114 additions & 0 deletions lib/syskit/log/polars/create_aligned_frame.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# frozen_string_literal: true

module Syskit
module Log
module Polars # :nodoc:
CHUNK_SIZE = 8_192

def self.create_aligned_frame(
center_time, builders, joint_stream, timeout: nil, chunk_size: CHUNK_SIZE
)
state = CreateAlignedFrame.new(
builders, timeout: timeout, chunk_size: chunk_size
)
joint_stream.raw_each do |index, time, sample|
trigger = state.update_current_samples(index, time, sample)
state.push_current_samples if trigger
end

state.push_chunks
state.recenter_time_series(center_time)
state.df
end

# @api private
#
# Implementation of algorithm steps and state for
# {Polars.create_aligned_frame}
class CreateAlignedFrame
attr_reader :df

def initialize(builders, timeout: nil, chunk_size: CHUNK_SIZE)
@builders = builders
@current_samples = Array.new(builders.size)
@chunks = builders.map do |b|
b.create_chunks(CHUNK_SIZE)
end
@chunk_size = chunk_size

@df = ::Polars::DataFrame.new(
builders.flat_map { |b| b.create_series([]) }
)

@row_count = 0
@initialized = false
@master_deadline = nil
@timeout = timeout
end

def update_current_samples(index, time, sample)
deadline = time + @timeout if @timeout
@current_samples[index] = [time, sample, deadline]
@master_deadline = deadline if index == 0
if @initialized
index == 0 && (!@master_deadline || time < @master_deadline)
else
@initialized = !@current_samples.index(nil)
end
end

def push_current_samples
ref_time = @current_samples[0][0]
@current_samples
.each_with_index do |(v_time, v_sample, v_deadline), v_index|
if v_deadline && (v_deadline < ref_time)
update_current_row_na(v_index)
else
update_current_row(v_index, v_time, v_sample)
end
end

@row_count += 1
push_chunks if @row_count == @chunk_size
end

def update_current_row_na(index)
@builders[index].update_row_na(@chunks[index], @row_count)
end

def update_current_row(index, time, sample)
@builders[index].update_row(@chunks[index], @row_count, time, sample)
end

def self.truncate_chunks(chunks, size)
chunks.map do |builder_chunks|
builder_chunks.map { |a| a[0, size] }
end
end

def self.create_dataframe(builders, chunks)
series = builders.zip(chunks).flat_map do |b, b_chunks|
b.create_series(b_chunks)
end
::Polars::DataFrame.new(series)
end

def push_chunks
return @df if @row_count == 0

chunks = CreateAlignedFrame.truncate_chunks(@chunks, @row_count)
chunk_df = CreateAlignedFrame.create_dataframe(@builders, chunks)

@row_count = 0
@df = @df.vstack(chunk_df)
end

def recenter_time_series(center_time)
@builders.each do |b|
b.recenter_time_series(@df, center_time)
end
end
end
end
end
end
116 changes: 116 additions & 0 deletions lib/syskit/log/polars/dsl.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# frozen_string_literal: true

module Syskit
module Log
module Polars
# Polars-specific functionality from {Log::DSL}
module DSL
# Convert a Daru frame into a vega data array
def polars_to_vega(frame)
frame.fill_nan(nil).to_a
end

# Convert fields of a data stream into a Polars frame
#
# @param [Array] streams an array if objects that can be converted to
# samples using {#samples_of}
# @param [Boolean] accurate prefer accuracy over speed (see below)
# @param [Float,nil] timeout how long, since the last received sample
# from a stream, the method will start introducing NAs to replace
# the stream's values (NA is either NAN for float values, or nil)
# @yield a {FrameBuilder} object used to describe the frame to be built
#
# This method uses the first given stream as a "master" stream, and
# attempts to load the value of the remaining columns at the same
# real time than the value from the master stream.
#
# How the method deals with resampling (when {#interval_sample_every} has
# been called) depends on the `accurate` parameter. When `false`,
# the streams are first re-sampled and then aligned. When doing coarse
# sampling, this can introduce significant misalignments. When true,
# the method resamples only the first stream, and then aligns the
# other full non-resampled streams. accurante: false is significantly
# faster for very dense streams (w.r.t. the sampling period)
def to_polars_frame(
*streams, accurate: false, timeout: nil,
chunk_size: Polars::CHUNK_SIZE
)
return ::Polars::DataFrame.new if streams.empty?

samples =
if accurate
to_polars_frame_accurate_samples(streams)
else
to_polars_frame_samples(streams)
end

builders = streams.map { |s| Polars::FrameBuilder.new(s.type) }
yield(*builders)

center_time = @interval_zero_time || streams.first.interval_lg[0]

to_polars_frame_execute(
builders, center_time, samples,
timeout: timeout, chunk_size: chunk_size
)
end

# @api private
def to_polars_frame_execute(
builders, center_time, samples, timeout:, chunk_size:
)
if builders.size == 1
builders.first.to_polars_frame(
center_time, samples.first,
timeout: timeout, chunk_size: chunk_size
)
else
joint_stream = Pocolog::StreamAligner.new(false, *samples)
Polars.create_aligned_frame(
center_time, builders, joint_stream,
timeout: timeout, chunk_size: chunk_size
)
end
end

# @api private
#
# Create the samples enumeration objects for to_polars_frame in the
# accurate case
def to_polars_frame_accurate_samples(streams)
interval_start, interval_end = to_polars_frame_interval(streams)
first_samples =
samples_of(streams[0], from: interval_start, to: interval_end)

[first_samples] + streams[1..-1].map do |s|
samples_of(s, from: interval_start, to: interval_end,
every_samples: nil, every_seconds: nil)
end
end

# @api private
#
# Create the samples enumeration objects for to_polars_frame in the
# normal case
def to_polars_frame_samples(streams)
interval_start, interval_end = to_polars_frame_interval(streams)
streams.map do |s|
samples_of(s, from: interval_start, to: interval_end)
end
end

# @api private
#
# Compute the frame interval for to_polars_frame
def to_polars_frame_interval(streams)
interval_start, interval_end = streams.map(&:interval_lg).transpose
interval_start = interval_start.min
interval_end = interval_end.max
interval_start = [interval_start, @interval[0]].max if @interval[0]
interval_end = [interval_end, @interval[1]].min if @interval[1]
[interval_start, interval_end]
end
end
end
end
end
Loading

0 comments on commit 2873876

Please sign in to comment.