Skip to content

Commit

Permalink
Merge pull request #53 from kapeps/logical_time
Browse files Browse the repository at this point in the history
feat: syskit-log import process use the logical time information to rewrite the logical time
  • Loading branch information
kapeps authored Jan 29, 2025
2 parents 095ea7c + 0a47444 commit eca1cc5
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
45 changes: 45 additions & 0 deletions lib/syskit/log/datastore/normalize.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class InvalidFollowupStream < RuntimeError; end
# Internal representation of the output of a normalization operation
class Output
attr_reader :path
attr_reader :logical_time_field
attr_reader :stream_block
attr_reader :digest
attr_reader :stream_size
Expand All @@ -52,6 +53,7 @@ def initialize(
@wio = wio
@stream_block = stream_block
@stream_block_pos = stream_block_pos
@logical_time_field = resolve_logical_time_field(stream_block)
@stream_size = 0
@interval_rt = []
@interval_lg = []
Expand Down Expand Up @@ -108,20 +110,63 @@ def create_block_stream
Pocolog::BlockStream.new(@wio.dup)
end

def update_raw_payload_logical_time(raw_payload, logical_time)
# Logical time are bytes from 8..15
raw_payload[8..11] = [logical_time.tv_sec].pack("V")
raw_payload[12..15] = [logical_time.tv_usec].pack("V")
raw_payload
end

def add_data_block(rt_time, lg_time, raw_data, raw_payload)
@stream_size += 1

write raw_data[0, 2]
write ZERO_BYTE
write raw_data[4..-1]

if @logical_time_field
logical_time = extract_logical_time(raw_payload)
lg_time = logical_time.microseconds
raw_payload = update_raw_payload_logical_time(
raw_payload, logical_time
)
end
write raw_payload

@interval_rt[0] ||= rt_time
@interval_rt[1] = rt_time
@interval_lg[0] ||= lg_time
@interval_lg[1] = lg_time
@last_data_block_time = [rt_time, lg_time]
end

def resolve_logical_time_field(stream_block)
rock_timestamp_field = stream_block.metadata["rock_timestamp_field"]
return rock_timestamp_field if rock_timestamp_field

type = stream_block.type
return unless type < Typelib::CompoundType

metadata = type.field_metadata
type.each_field do |field|
role = metadata[field].get("role").first

return field if role == "logical_time"
end
nil
end

def extract_logical_time(raw_payload)
return unless @logical_time_field

# Skip 21 bytes as they belong to the data stream declaration block
# information before the marshalled data.
# See rock-core/tools-pocolog/blob/master/spec/spec-v2.txt
@stream_block.type
.from_buffer(raw_payload[21..-1])
.raw_get(@logical_time_field)
end

def string_digest
DatasetIdentity.string_digest(@digest)
end
Expand Down
45 changes: 45 additions & 0 deletions test/datastore/normalize_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,51 @@ class Datastore
stream.samples.to_a
end
end

describe "logical_time" do
it "extract logical time from payload" do
registry = Typelib::CXXRegistry.new
registry.create_compound "/Time" do |b|
b.microseconds = "uint64_t"
b.tv_sec = "uint64_t"
b.tv_usec = "uint64_t"
end
test_t = registry.create_compound "/Test" do |b|
b.time = "/Time"
b.other_type = "/int"
end
test_t.field_metadata["time"].set("role", "logical_time")
timestamp = Time.new(1998, 12, 22)
timestamp_as_microseconds = timestamp.tv_sec * 1_000_000 +
timestamp.tv_usec
value = test_t.new(time: { microseconds: timestamp_as_microseconds,
tv_sec: timestamp.tv_sec,
tv_usec: timestamp.tv_usec },
other_type: 42)

create_logfile "file0.0.log" do
create_logfile_stream(
"stream0",
metadata: {
"rock_task_name" => "task0",
"rock_task_object_name" => "port"
},
type: test_t
)
write_logfile_sample base_time, base_time + 5, value
end

logfile_pathname("normalized").mkdir
input_path = logfile_pathname("file0.0.log")
normalize.normalize([input_path])
stream = open_logfile_stream(
["normalized", "task0::port.0.log"], "task0.port"
)

assert_equal [[base_time, timestamp, value]],
stream.samples.to_a
end
end
end
end
end

0 comments on commit eca1cc5

Please sign in to comment.