From 5effdc25e256aace2643777324e410b76aa403d6 Mon Sep 17 00:00:00 2001 From: kapeps Date: Thu, 26 Dec 2024 16:52:14 -0300 Subject: [PATCH 1/5] feat: syskit-log import process use the logical time information to rewrite the logical time --- lib/syskit/log/datastore/normalize.rb | 40 +++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/lib/syskit/log/datastore/normalize.rb b/lib/syskit/log/datastore/normalize.rb index 5ea0d8d..5f3a71e 100644 --- a/lib/syskit/log/datastore/normalize.rb +++ b/lib/syskit/log/datastore/normalize.rb @@ -34,6 +34,7 @@ class InvalidFollowupStream < RuntimeError; end # Internal representation of the output of a normalization operation class Output attr_reader :path + attr_reader :logical_time_field attr_reader :stream_block attr_reader :digest attr_reader :stream_size @@ -52,6 +53,7 @@ def initialize( @wio = wio @stream_block = stream_block @stream_block_pos = stream_block_pos + @logical_time_field = resolve_logical_time_field(stream_block) @stream_size = 0 @interval_rt = [] @interval_lg = [] @@ -115,6 +117,10 @@ def add_data_block(rt_time, lg_time, raw_data, raw_payload) write ZERO_BYTE write raw_data[4..-1] write raw_payload + + logical_time = extract_logical_time(raw_payload) + lg_time = logical_time if logical_time + @interval_rt[0] ||= rt_time @interval_rt[1] = rt_time @interval_lg[0] ||= lg_time @@ -122,6 +128,40 @@ def add_data_block(rt_time, lg_time, raw_data, raw_payload) @last_data_block_time = [rt_time, lg_time] end + def resolve_logical_time_field(stream_block) + type = stream_block.type + + return unless type < Typelib::CompoundType + + metadata = type.field_metadata + type.each_field do |field| + return if metadata[field].include?("rock_timestamp_field") + + next unless metadata[field].include?("role") + + role = metadata[field].get("role").first + + return field if role == "logical_time" + end + nil + end + + def extract_logical_time(raw_payload) + return unless @logical_time_field + + # Skip 21 bytes as they belong to the data stream declaration block + # information before the marshalled data. + # See rock-core/tools-pocolog/blob/master/spec/spec-v2.txt + time_to_microseconds( + @stream_block.type + .from_buffer(raw_payload[21..-1])[@logical_time_field] + ) + end + + def time_to_microseconds(time) + time.tv_sec * 1_000_000 + time.tv_usec + end + def string_digest DatasetIdentity.string_digest(@digest) end From 73c060433040ab7986f1c9240b666fa13eb30f30 Mon Sep 17 00:00:00 2001 From: kapeps Date: Fri, 27 Dec 2024 09:42:05 -0300 Subject: [PATCH 2/5] chore: unit test logical time extraction --- lib/syskit/log/datastore/normalize.rb | 2 +- test/datastore/normalize_test.rb | 45 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/lib/syskit/log/datastore/normalize.rb b/lib/syskit/log/datastore/normalize.rb index 5f3a71e..d285ff6 100644 --- a/lib/syskit/log/datastore/normalize.rb +++ b/lib/syskit/log/datastore/normalize.rb @@ -135,7 +135,7 @@ def resolve_logical_time_field(stream_block) metadata = type.field_metadata type.each_field do |field| - return if metadata[field].include?("rock_timestamp_field") + break if metadata[field].include?("rock_timestamp_field") next unless metadata[field].include?("role") diff --git a/test/datastore/normalize_test.rb b/test/datastore/normalize_test.rb index 236865c..e6e59f1 100644 --- a/test/datastore/normalize_test.rb +++ b/test/datastore/normalize_test.rb @@ -217,6 +217,51 @@ class Datastore stream.samples.to_a end end + + describe "logical_time" do + it "extract logical time from payload" do + registry = Typelib::CXXRegistry.new + registry.create_compound "/Time" do |b| + b.microseconds = "uint64_t" + b.tv_sec = "uint64_t" + b.tv_usec = "uint64_t" + end + test_t = registry.create_compound "/Test" do |b| + b.time = "/Time" + b.other_type = "/int" + end + test_t.field_metadata["time"].set("role", "logical_time") + timestamp = Time.new(1998, 12, 22) + timestamp_as_microseconds = timestamp.tv_sec * 1_000_000 + + timestamp.tv_usec + value = test_t.new(time: { microseconds: timestamp_as_microseconds, + tv_sec: timestamp.tv_sec, + tv_usec: timestamp.tv_usec }, + other_type: 42) + + create_logfile "file0.0.log" do + create_logfile_stream( + "stream0", + metadata: { + "rock_task_name" => "task0", + "rock_task_object_name" => "port" + }, + type: test_t + ) + write_logfile_sample base_time, base_time + 5, value + end + + logfile_pathname("normalized").mkdir + input_path = logfile_pathname("file0.0.log") + normalize.normalize([input_path]) + stream = open_logfile_stream( + ["normalized", "task0::port.0.log"], "task0.port" + ) + + assert_equal [[base_time, timestamp, value]], + stream.samples.to_a + end + end end end end From a7d583f2271a8ab67c3e1abe3c1b43020e6f8897 Mon Sep 17 00:00:00 2001 From: kapeps Date: Thu, 2 Jan 2025 10:37:09 -0300 Subject: [PATCH 3/5] fix actually write the logical time data --- lib/syskit/log/datastore/normalize.rb | 30 +++++++++++++++------------ 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/lib/syskit/log/datastore/normalize.rb b/lib/syskit/log/datastore/normalize.rb index d285ff6..1820eb1 100644 --- a/lib/syskit/log/datastore/normalize.rb +++ b/lib/syskit/log/datastore/normalize.rb @@ -116,10 +116,19 @@ def add_data_block(rt_time, lg_time, raw_data, raw_payload) write raw_data[0, 2] write ZERO_BYTE write raw_data[4..-1] - write raw_payload + # Real time are bytes from 0..7 + write raw_payload[0..7] + # Logical time are bytes from 8..15 logical_time = extract_logical_time(raw_payload) - lg_time = logical_time if logical_time + if logical_time + write [logical_time.tv_sec].pack("V") + write [logical_time.tv_usec].pack("V") + lg_time = logical_time.microseconds + else + write raw_payload[8..15] + end + write raw_payload[16..-1] @interval_rt[0] ||= rt_time @interval_rt[1] = rt_time @@ -129,14 +138,14 @@ def add_data_block(rt_time, lg_time, raw_data, raw_payload) end def resolve_logical_time_field(stream_block) - type = stream_block.type + rock_timestamp_field = stream_block.metadata["rock_timestamp_field"] + return rock_timestamp_field if rock_timestamp_field + type = stream_block.type return unless type < Typelib::CompoundType metadata = type.field_metadata type.each_field do |field| - break if metadata[field].include?("rock_timestamp_field") - next unless metadata[field].include?("role") role = metadata[field].get("role").first @@ -152,14 +161,9 @@ def extract_logical_time(raw_payload) # Skip 21 bytes as they belong to the data stream declaration block # information before the marshalled data. # See rock-core/tools-pocolog/blob/master/spec/spec-v2.txt - time_to_microseconds( - @stream_block.type - .from_buffer(raw_payload[21..-1])[@logical_time_field] - ) - end - - def time_to_microseconds(time) - time.tv_sec * 1_000_000 + time.tv_usec + @stream_block.type + .from_buffer(raw_payload[21..-1]) + .raw_get(@logical_time_field) end def string_digest From 53a55942334dd73fa31a742a34ad74ecad70d1ef Mon Sep 17 00:00:00 2001 From: kapeps Date: Thu, 2 Jan 2025 15:41:57 -0300 Subject: [PATCH 4/5] chore: guard the payload logical time update instead of writing many times --- lib/syskit/log/datastore/normalize.rb | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/lib/syskit/log/datastore/normalize.rb b/lib/syskit/log/datastore/normalize.rb index 1820eb1..884f036 100644 --- a/lib/syskit/log/datastore/normalize.rb +++ b/lib/syskit/log/datastore/normalize.rb @@ -110,6 +110,13 @@ def create_block_stream Pocolog::BlockStream.new(@wio.dup) end + def update_raw_payload_logical_time(raw_payload, logical_time) + # Logical time are bytes from 8..15 + raw_payload[8..11] = [logical_time.tv_sec].pack("V") + raw_payload[12..15] = [logical_time.tv_usec].pack("V") + raw_payload + end + def add_data_block(rt_time, lg_time, raw_data, raw_payload) @stream_size += 1 @@ -117,18 +124,14 @@ def add_data_block(rt_time, lg_time, raw_data, raw_payload) write ZERO_BYTE write raw_data[4..-1] - # Real time are bytes from 0..7 - write raw_payload[0..7] - # Logical time are bytes from 8..15 logical_time = extract_logical_time(raw_payload) if logical_time - write [logical_time.tv_sec].pack("V") - write [logical_time.tv_usec].pack("V") lg_time = logical_time.microseconds - else - write raw_payload[8..15] + raw_payload = update_raw_payload_logical_time( + raw_payload, logical_time + ) end - write raw_payload[16..-1] + write raw_payload @interval_rt[0] ||= rt_time @interval_rt[1] = rt_time From 0a47444c670f6a203c3b42bdc1001a94d8efe871 Mon Sep 17 00:00:00 2001 From: kapeps Date: Tue, 28 Jan 2025 16:36:23 -0300 Subject: [PATCH 5/5] fix: minor code improvements --- lib/syskit/log/datastore/normalize.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/syskit/log/datastore/normalize.rb b/lib/syskit/log/datastore/normalize.rb index 884f036..985ff8e 100644 --- a/lib/syskit/log/datastore/normalize.rb +++ b/lib/syskit/log/datastore/normalize.rb @@ -124,8 +124,8 @@ def add_data_block(rt_time, lg_time, raw_data, raw_payload) write ZERO_BYTE write raw_data[4..-1] - logical_time = extract_logical_time(raw_payload) - if logical_time + if @logical_time_field + logical_time = extract_logical_time(raw_payload) lg_time = logical_time.microseconds raw_payload = update_raw_payload_logical_time( raw_payload, logical_time @@ -149,8 +149,6 @@ def resolve_logical_time_field(stream_block) metadata = type.field_metadata type.each_field do |field| - next unless metadata[field].include?("role") - role = metadata[field].get("role").first return field if role == "logical_time"