From c924b22a7e3d3075cf882039eb2ab9cd5fdac0db Mon Sep 17 00:00:00 2001 From: Sylvain Date: Mon, 14 Aug 2023 14:14:01 -0300 Subject: [PATCH] feat: handle compressed pocolog files in the datastore itself --- lib/syskit/log/cli/datastore.rb | 10 ++- lib/syskit/log/datastore/dataset.rb | 72 ++++++++--------- lib/syskit/log/datastore/import.rb | 13 ++- lib/syskit/log/lazy_data_stream.rb | 5 +- lib/syskit/log/streams.rb | 2 +- test/datastore/dataset_test.rb | 26 +++--- test/datastore/import_test.rb | 65 --------------- test/datastore/normalize_test.rb | 121 +++++++--------------------- test/deployment_group_test.rb | 4 + test/deployment_test.rb | 4 + test/dsl/summary_test.rb | 4 + test/models/deployment_test.rb | 4 + test/replay_manager_test.rb | 28 +++---- test/rock_stream_matcher_test.rb | 4 + test/streams_test.rb | 4 + test/task_streams_test.rb | 4 + test/test_helper.rb | 86 +++++++++++++++++--- 17 files changed, 218 insertions(+), 238 deletions(-) diff --git a/lib/syskit/log/cli/datastore.rb b/lib/syskit/log/cli/datastore.rb index ff7dccf..b65d3f8 100644 --- a/lib/syskit/log/cli/datastore.rb +++ b/lib/syskit/log/cli/datastore.rb @@ -223,10 +223,14 @@ def raw_dataset?(path) return unless path.directory? has_pocolog_files = - Pathname.enum_for(:glob, path + "*.0.log").any? { true } + Pathname.enum_for(:glob, path + "*.0.log").any? { true } || + Pathname.enum_for(:glob, path + "*.0.log.zst").any? { true } has_roby_events = - Pathname.enum_for(:glob, path + "*-events.log").any? { true } - has_process_server_info_yml = (path + "info.yml").exist? + Pathname.enum_for(:glob, path + "*-events.log").any? { true } || + Pathname.enum_for(:glob, path + "*-events.log.zst").any? { true } + has_process_server_info_yml = + (path + "info.yml").exist? || + (path + "info.yml.zst").exist? has_pocolog_files && (has_roby_events || has_process_server_info_yml) diff --git a/lib/syskit/log/datastore/dataset.rb b/lib/syskit/log/datastore/dataset.rb index 34b85e0..95ad325 100644 --- a/lib/syskit/log/datastore/dataset.rb +++ b/lib/syskit/log/datastore/dataset.rb @@ -164,7 +164,8 @@ def compute_file_identity(path) end sha2 = compute_file_sha2(io) - IdentityEntry.new(path, io.tell, sha2) + identity_path = path.dirname + path.basename(".zst") + IdentityEntry.new(identity_path, io.tell, sha2) end end @@ -326,12 +327,14 @@ def compute_dataset_digest( def each_important_file return enum_for(__method__) unless block_given? - Pathname.glob(dataset_path + "pocolog" + "*.*.log") do |path| - yield(path) - end + ["", ".zst"].each do |ext| + Pathname.glob(dataset_path + "pocolog" + "*.*.log#{ext}") do |path| + yield(path) + end - Pathname.glob(dataset_path + "roby-events.*.log") do |path| - yield(path) + Pathname.glob(dataset_path + "roby-events.*.log#{ext}") do |path| + yield(path) + end end end @@ -566,12 +569,13 @@ def metadata_write_to_file def pocolog_path(name) path = dataset_path + "pocolog" + "#{name}.0.log" - unless path.exist? - raise ArgumentError, - "no pocolog file for stream #{name} (expected #{path})" - end + return path if path.exist? + + path = path.sub_ext(".log.zst") + return Syskit::Log.decompressed(path, cache_path) if path.exist? - path + raise ArgumentError, + "no pocolog file for stream #{name} (expected #{path})" end def each_pocolog_path @@ -580,6 +584,10 @@ def each_pocolog_path Pathname.glob(dataset_path + "pocolog" + "*.log") do |logfile_path| yield(logfile_path) end + + Pathname.glob(dataset_path + "pocolog" + "*.log.zst") do |logfile_path| + yield(logfile_path) + end end # Enumerate the pocolog streams available in this dataset @@ -591,6 +599,7 @@ def each_pocolog_stream pocolog_index_dir = (cache_path + "pocolog").to_s each_pocolog_path do |logfile_path| + logfile_path = Syskit::Log.decompressed(logfile_path, cache_path) logfile = Pocolog::Logfiles.open( logfile_path, index_dir: pocolog_index_dir, silent: true ) @@ -602,38 +611,23 @@ def each_pocolog_stream # # Load lazy data stream information from disk def read_lazy_data_streams - pocolog_index_dir = (cache_path + "pocolog").to_s - Pathname.enum_for(:glob, dataset_path + "pocolog" + "*.log").map do |logfile_path| + each_pocolog_path.map do |logfile_path| + raw_logfile_path = + logfile_path.dirname + logfile_path.basename(".zst") + index_path = Pocolog::Logfiles.default_index_filename( - logfile_path.to_s, index_dir: pocolog_index_dir.to_s + raw_logfile_path.to_s, index_dir: logfile_path.dirname.to_s ) index_path = Pathname.new(index_path) - logfile_path.open do |file_io| - index_path.open do |index_io| - stream_info = - Pocolog::Format::Current - .read_minimal_info(index_io, file_io) - stream_block, index_stream_info = stream_info.first - - interval_rt = index_stream_info.interval_rt.map do |t| - Pocolog::StreamIndex.time_from_internal(t, 0) - end - interval_lg = index_stream_info.interval_lg.map do |t| - Pocolog::StreamIndex.time_from_internal(t, 0) - end - - LazyDataStream.new( - logfile_path, - pocolog_index_dir, - stream_block.name, - stream_block.type, - stream_block.metadata, - interval_rt, - interval_lg, - index_stream_info.stream_size - ) - end + unless index_path.exist? + Syskit::Log.generate_pocolog_minimal_index( + logfile_path, index_path + ) end + + Syskit::Log.read_single_lazy_data_stream( + logfile_path, index_path, cache_path + "pocolog" + ) end end diff --git a/lib/syskit/log/datastore/import.rb b/lib/syskit/log/datastore/import.rb index be9920a..fe30df4 100644 --- a/lib/syskit/log/datastore/import.rb +++ b/lib/syskit/log/datastore/import.rb @@ -237,11 +237,12 @@ def normalize_pocolog_files(files, delete_input: false) total: bytes_total ) - Syskit::Log::Datastore.normalize( + entries = Syskit::Log::Datastore.normalize( files, output_path: out_pocolog_dir, index_dir: out_pocolog_cache_dir, delete_input: delete_input, compress: @compress, reporter: @reporter ) + entries.each { |e| e.path = identity_path(e.path) } ensure @reporter.finish end @@ -299,7 +300,7 @@ def copy_roby_event_log(event_log_path, roby_sql_index) # rubocop:disable Metric end @reporter.reset_progressbar( - "#{event_log_path.basename} [:bar]", total: event_log_path.stat.size + "#{event_log_path.basename} [:bar]", total: in_stat.size ) rebuilder = Roby::DRoby::PlanRebuilder.new @@ -315,7 +316,7 @@ def copy_roby_event_log(event_log_path, roby_sql_index) # rubocop:disable Metric end entry = Dataset::IdentityEntry.new( - out_path, out_io.tell, out_io.string_digest + identity_path(out_path), out_io.tell, out_io.string_digest ) out_io.close FileUtils.touch out_path.to_s, mtime: in_stat.mtime @@ -395,7 +396,7 @@ def copy_roby_event_log_no_index(event_log_path) end entry = Dataset::IdentityEntry.new( - out_path, out_io.tell, out_io.string_digest + identity_path(out_path), out_io.tell, out_io.string_digest ) out_io.close FileUtils.touch out_path.to_s, mtime: in_stat.mtime @@ -405,6 +406,10 @@ def copy_roby_event_log_no_index(event_log_path) out_io&.close unless out_io&.closed? end + def identity_path(file_path) + file_path.dirname + file_path.basename(".zst") + end + # @api private # # Initialize the IOs and objects needed for a roby event log copy diff --git a/lib/syskit/log/lazy_data_stream.rb b/lib/syskit/log/lazy_data_stream.rb index 206ecb3..25339dd 100644 --- a/lib/syskit/log/lazy_data_stream.rb +++ b/lib/syskit/log/lazy_data_stream.rb @@ -82,7 +82,10 @@ def registry def syskit_eager_load return @pocolog_stream if @pocolog_stream - file = Pocolog::Logfiles.open(path, index_dir: index_dir) + file = Pocolog::Logfiles.open( + Syskit::Log.decompressed(path, index_dir).to_s, + index_dir: index_dir + ) @pocolog_stream = file.streams.first end diff --git a/lib/syskit/log/streams.rb b/lib/syskit/log/streams.rb index 800762b..40fc41e 100644 --- a/lib/syskit/log/streams.rb +++ b/lib/syskit/log/streams.rb @@ -21,7 +21,7 @@ def self.from_file(file) streams end - # Load the set of streams available from a file + # Load the set of streams available from a dataset def self.from_dataset(dataset) streams = new streams.add_dataset(dataset) diff --git a/test/datastore/dataset_test.rb b/test/datastore/dataset_test.rb index 97cd199..c866273 100644 --- a/test/datastore/dataset_test.rb +++ b/test/datastore/dataset_test.rb @@ -9,8 +9,13 @@ class Datastore attr_reader :root_path, :dataset, :dataset_path, :cache_path, :store attr_reader :roby_digest, :pocolog_digest - def dataset_pathname(*names) - dataset_path + File.join(*names) + def dataset_pathname(*names, compress: compress?) + path = dataset_path.join(*names[0..-2]) + if compress + path.join(names[-1] + ".zst") + else + path.join(names[-1]) + end end before do @@ -31,7 +36,7 @@ def dataset_pathname(*names) FileUtils.touch dataset_pathname("text", "test.txt") dataset_pathname("roby-events.0.log").open("w") { |io| io.write "ROBY" } FileUtils.touch dataset_pathname("ignored", "not_recognized_file") - dataset_pathname("ignored", "not_recognized_dir").mkpath + dataset_pathname("ignored", "not_recognized_dir", compress: false).mkpath FileUtils.touch dataset_pathname("ignored", "not_recognized_dir", "test") end after do @@ -557,10 +562,11 @@ def write_metadata( describe "#each_pocolog_stream" do it "expects the pocolog cache files in the dataset's cache directory" do cache_path.mkpath - open_logfile logfile_path("task0::port.0.log"), index_dir: (cache_path + "pocolog").to_s - flexmock(Pocolog::Logfiles).new_instances - .should_receive(:rebuild_and_load_index) - .never + open_logfile "task0::port.0.log", index_dir: (cache_path + "pocolog").to_s + flexmock(Pocolog::Logfiles) + .new_instances + .should_receive(:rebuild_and_load_index) + .never streams = dataset.each_pocolog_stream.to_a assert_equal ["test"], streams.map(&:name) end @@ -585,8 +591,10 @@ def write_metadata( write_logfile_sample base_time + 100, base_time + 300, 3 end cache_path.mkpath - open_logfile logfile_path("task0::port.0.log"), index_dir: (cache_path + "pocolog").to_s - open_logfile logfile_path("task0::other.0.log"), index_dir: (cache_path + "pocolog").to_s + open_logfile "task0::port.0.log", + index_dir: (cache_path + "pocolog").to_s + open_logfile "task0::other.0.log", + index_dir: (cache_path + "pocolog").to_s end it "loads stream information and returns LazyDataStream objects" do diff --git a/test/datastore/import_test.rb b/test/datastore/import_test.rb index a39af49..cb0b968 100644 --- a/test/datastore/import_test.rb +++ b/test/datastore/import_test.rb @@ -297,71 +297,6 @@ def tty_reporter assert_equal t, time end end - - def logfile_pathname(*path) - return super unless /-events\.log$|\.\d+\.log$/.match?(path.last) - return super unless compress? - - super(*path[0..-2], path.last + ".zst") - end - - def open_logfile_stream(path, stream_name, **kw) - return super unless compress? - - Tempfile.open(["", ".log"]) do |temp_io| - path = path.sub_ext(".log.zst") if path.extname != ".zst" - - temp_io.write Zstd.decompress(path.read) - temp_io.flush - temp_io.rewind - return super(temp_io.path, stream_name, **kw) - end - end - - def read_logfile(*name) - path = logfile_pathname(*name) - data = path.read - return data unless path.extname == ".zst" - - Zstd.decompress(data) - end - - def write_logfile(name, data) - path = logfile_pathname(name) - data = Zstd.compress(data) if path.extname == ".zst" - path.write data - end - - def create_logfile(name, truncate: 0) - path = Pathname.new(super(name)) - path.truncate(path.stat.size - truncate) - return path unless compress? - - compressed = Zstd.compress(path.read) - compressed_path = path.sub_ext(".log.zst") - compressed_path.write(compressed) - path.unlink - compressed_path - end - - def create_roby_logfile(name) - path = Pathname.new(super(name)) - return path unless compress? - - compressed = Zstd.compress(path.read) - compressed_path = path.sub_ext(".log.zst") - compressed_path.write(compressed) - path.unlink - compressed_path - end - - def copy_in_file(in_path, name) - out_path = logfile_pathname(name) - data = in_path.read - data = Zstd.compress(data) if compress? - - out_path.write(data) - end end end end diff --git a/test/datastore/normalize_test.rb b/test/datastore/normalize_test.rb index 1302c14..e5d12be 100644 --- a/test/datastore/normalize_test.rb +++ b/test/datastore/normalize_test.rb @@ -12,10 +12,6 @@ class Datastore @normalize = Normalize.new(compress: compress?) end - def compress? - ENV["SYSKIT_LOG_TEST_COMPRESS"] == "1" - end - describe "#normalize" do before do create_logfile "file0.0.log" do @@ -29,14 +25,13 @@ def compress? it "splits the file into a one-file-per-stream scheme" do logfile_pathname("normalized").mkdir normalize.normalize([logfile_pathname("file0.0.log")]) - normalized_dir = logfile_pathname("normalized") stream = open_logfile_stream( - normalized_dir + "task0::port.0.log", "task0.port" + ["normalized", "task0::port.0.log"], "task0.port" ) assert_equal [[base_time + 2, base_time + 20, 2]], stream.samples.to_a stream = open_logfile_stream( - normalized_dir + "task1::port.0.log", "task1.port" + ["normalized", "task1::port.0.log"], "task1.port" ) assert_equal [[base_time + 1, base_time + 10, 1]], stream.samples.to_a @@ -45,14 +40,13 @@ def compress? logfile_pathname("normalized").mkdir input_path = logfile_pathname("file0.0.log") normalize.normalize([input_path], delete_input: true) - normalized_dir = logfile_pathname("normalized") stream = open_logfile_stream( - normalized_dir + "task0::port.0.log", "task0.port" + ["normalized", "task0::port.0.log"], "task0.port" ) assert_equal [[base_time + 2, base_time + 20, 2]], stream.samples.to_a stream = open_logfile_stream( - normalized_dir + "task1::port.0.log", "task1.port" + ["normalized", "task1::port.0.log"], "task1.port" ) assert_equal [[base_time + 1, base_time + 10, 1]], stream.samples.to_a @@ -69,16 +63,20 @@ def compress? assert_raises(e) do normalize.normalize([input_path], delete_input: true) end - normalized_dir = logfile_pathname("normalized") - refute((normalized_dir + "task0::port.0.log").exist?) - refute((normalized_dir + "task1::port.0.log").exist?) + refute logfile_pathname("normalized", "task0::port.0.log").exist? + refute logfile_pathname("normalized", "task1::port.0.log").exist? assert input_path.exist? end it "does delete unrelated input files if they have been already "\ "processed" do logfile_pathname("normalized").mkdir create_logfile "file1.0.log" do - create_logfile_stream "stream2", metadata: Hash["rock_task_name" => "task2", "rock_task_object_name" => "port"] + create_logfile_stream( + "stream2", metadata: { + "rock_task_name" => "task2", + "rock_task_object_name" => "port" + } + ) write_logfile_sample base_time + 2, base_time + 20, 2 end input0_path = logfile_pathname("file0.0.log") @@ -92,8 +90,8 @@ def compress? assert_raises(e) do normalize.normalize([input0_path, input1_path], delete_input: true) end - assert(logfile_pathname("normalized", "task0::port.0.log").exist?) - refute(logfile_pathname("normalized", "task2::port.0.log").exist?) + assert logfile_pathname("normalized", "task0::port.0.log").exist? + refute logfile_pathname("normalized", "task2::port.0.log").exist? refute input0_path.exist? assert input1_path.exist? end @@ -105,9 +103,8 @@ def compress? flexmock(Pocolog::Logfiles).new_instances .should_receive(:rebuild_and_load_index) .never - normalized_dir = logfile_pathname("normalized") - open_logfile_stream (normalized_dir + "task0::port.0.log"), "task0.port" - open_logfile_stream (normalized_dir + "task1::port.0.log"), "task1.port" + open_logfile_stream ["normalized", "task0::port.0.log"], "task0.port" + open_logfile_stream ["normalized", "task1::port.0.log"], "task1.port" end it "allows to specify the cache directory" do skip if compress? @@ -121,9 +118,8 @@ def compress? .new_instances .should_receive(:rebuild_and_load_index) .never - normalized_dir = logfile_pathname("normalized") - open_logfile_stream (normalized_dir + "task0::port.0.log"), "task0.port", index_dir: index_dir - open_logfile_stream (normalized_dir + "task1::port.0.log"), "task1.port", index_dir: index_dir + open_logfile_stream ["normalized", "task0::port.0.log"], "task0.port", index_dir: index_dir + open_logfile_stream ["normalized", "task1::port.0.log"], "task1.port", index_dir: index_dir end describe "digest generation" do it "optionally computes the sha256 digest of the generated file, "\ @@ -145,9 +141,12 @@ def compress? create_logfile_stream "stream0", metadata: Hash["rock_task_name" => "task0", "rock_task_object_name" => "port"] write_logfile_sample base_time + 3, base_time + 30, 3 end - normalize.normalize([logfile_pathname("file0.0.log"), logfile_pathname("file0.1.log")]) - normalized_dir = logfile_pathname("normalized") - stream = open_logfile_stream (normalized_dir + "task0::port.0.log"), "task0.port" + normalize.normalize( + [logfile_pathname("file0.0.log"), logfile_pathname("file0.1.log")] + ) + stream = open_logfile_stream( + ["normalized", "task0::port.0.log"], "task0.port" + ) assert_equal [[base_time + 2, base_time + 20, 2], [base_time + 3, base_time + 30, 3]], stream.samples.to_a end @@ -196,22 +195,6 @@ def compress? end end end - it "deletes newly created files if the initialization of a new file fails" do - create_logfile "file0.1.log" do - create_logfile_stream "stream0", - metadata: Hash["rock_task_name" => "task0", "rock_task_object_name" => "port"] - write_logfile_sample base_time + 3, base_time + 30, 3 - end - error_class = Class.new(RuntimeError) - flexmock(File).new_instances.should_receive(:write).and_raise(error_class) - _out, = capture_io do - assert_raises(error_class) do - normalize.normalize([logfile_pathname("file0.0.log"), logfile_pathname("file0.1.log")]) - end - end - normalized_dir = logfile_pathname("normalized") - refute (normalized_dir + "task0::port.0.log").exist? - end end describe "#normalize_logfile" do @@ -232,8 +215,12 @@ def compress? end it "handles truncated files" do create_logfile "file0.0.log", truncate: 1 do - create_logfile_stream "stream0", - metadata: Hash["rock_task_name" => "task0", "rock_task_object_name" => "port"] + create_logfile_stream( + "stream0", metadata: { + "rock_task_name" => "task0", + "rock_task_object_name" => "port" + } + ) write_logfile_sample base_time + 3, base_time + 30, 3 write_logfile_sample base_time + 4, base_time + 40, 4 end @@ -250,58 +237,12 @@ def compress? logfile_pathname("normalized"), reporter: reporter ) stream = open_logfile_stream( - logfile_pathname("normalized", "task0::port.0.log"), - "task0.port" + ["normalized", "task0::port.0.log"], "task0.port" ) assert_equal [[base_time + 3, base_time + 30, 3]], stream.samples.to_a end end - - def logfile_pathname(*path) - return super unless /\.\d+\.log$/.match?(path.last) - return super unless compress? - - super(*path[0..-2], path.last + ".zst") - end - - def open_logfile_stream(path, stream_name, **kw) - return super unless compress? - - Tempfile.open(["", ".log"]) do |temp_io| - path = path.sub_ext(".log.zst") if path.extname != ".zst" - - temp_io.write Zstd.decompress(path.read) - temp_io.flush - temp_io.rewind - return super(temp_io.path, stream_name, **kw) - end - end - - def read_logfile(*name) - path = logfile_pathname(*name) - data = path.read - return data unless path.extname == ".zst" - - Zstd.decompress(data) - end - - def write_logfile(name, data) - path = logfile_pathname(name) - data = Zstd.compress(data) if path.extname == ".zst" - path.write data - end - - def create_logfile(name, truncate: 0) - path = Pathname.new(super(name)) - path.truncate(path.stat.size - truncate) - return path unless compress? - - compressed = Zstd.compress(path.read) - path.sub_ext(".log.zst").write(compressed) - path.unlink - path - end end end end diff --git a/test/deployment_group_test.rb b/test/deployment_group_test.rb index 7a61af8..54ebf39 100644 --- a/test/deployment_group_test.rb +++ b/test/deployment_group_test.rb @@ -69,6 +69,10 @@ def self.common_behavior # rubocop:disable Metrics/AbcSize end describe "from_dir" do + def compress? + false + end + def load_logfiles_as_stream Streams.from_dir(logfile_pathname) end diff --git a/test/deployment_test.rb b/test/deployment_test.rb index 38e3f6f..88e4454 100644 --- a/test/deployment_test.rb +++ b/test/deployment_test.rb @@ -145,6 +145,10 @@ def self.common_behavior # rubocop:disable Metrics/AbcSize, Metrics/MethodLength end describe "from_dir" do + def compress? + false + end + def load_logfiles_as_stream Streams.from_dir(logfile_pathname) end diff --git a/test/dsl/summary_test.rb b/test/dsl/summary_test.rb index 23ae5be..d5e9fd9 100644 --- a/test/dsl/summary_test.rb +++ b/test/dsl/summary_test.rb @@ -35,6 +35,10 @@ module Log # :nodoc: ENV["TZ"] = "America/Sao_Paulo" end + def compress? + false + end + after do ENV["TZ"] = @tz end diff --git a/test/models/deployment_test.rb b/test/models/deployment_test.rb index b8279a9..4252905 100644 --- a/test/models/deployment_test.rb +++ b/test/models/deployment_test.rb @@ -119,6 +119,10 @@ def self.common_behavior # rubocop:disable Metrics/AbcSize @streams = @all_streams.find_task_by_name("task") end + def compress? + false + end + common_behavior end diff --git a/test/replay_manager_test.rb b/test/replay_manager_test.rb index ec0bfe2..88bb681 100644 --- a/test/replay_manager_test.rb +++ b/test/replay_manager_test.rb @@ -23,6 +23,11 @@ module Syskit::Log plan = Roby::ExecutablePlan.new @subject = ReplayManager.new(plan.execution_engine) + + @streams = load_logfiles_as_stream.find_task_by_name("task") + @port_stream = streams.find_port_by_name("out") + @deployment_m = Syskit::Log::Deployment + .for_streams(streams, model: task_m, name: "task") end def self.common_behavior # rubocop:disable Metrics/AbcSize, Metrics/MethodLength @@ -149,8 +154,7 @@ def self.common_behavior # rubocop:disable Metrics/AbcSize, Metrics/MethodLength stream0.write Time.at(0), Time.at(2), 2 end - streams = Streams.from_dir(logfile_pathname) - .find_task_by_name("task") + streams = load_logfiles_as_stream.find_task_by_name("task") task_m = Syskit::TaskContext.new_submodel do output_port "out", double_t end @@ -234,25 +238,21 @@ def self.common_behavior # rubocop:disable Metrics/AbcSize, Metrics/MethodLength end describe "from_dir" do - before do - @streams = Streams.from_dir(logfile_pathname) - .find_task_by_name("task") - @port_stream = streams.find_port_by_name("out") - @deployment_m = Syskit::Log::Deployment - .for_streams(streams, model: task_m, name: "task") + def compress? + false + end + + def load_logfiles_as_stream + Streams.from_dir(logfile_pathname) end common_behavior end describe "from_dataset" do - before do + def load_logfiles_as_stream _, dataset = import_logfiles - @streams = Streams.from_dataset(dataset) - .find_task_by_name("task") - @port_stream = streams.find_port_by_name("out") - @deployment_m = Syskit::Log::Deployment - .for_streams(streams, model: task_m, name: "task") + Streams.from_dataset(dataset).find_task_by_name("task") end common_behavior diff --git a/test/rock_stream_matcher_test.rb b/test/rock_stream_matcher_test.rb index 97f090b..5bb2391 100644 --- a/test/rock_stream_matcher_test.rb +++ b/test/rock_stream_matcher_test.rb @@ -20,6 +20,10 @@ module Syskit::Log end subject { RockStreamMatcher.new } + def compress? + false + end + def assert_finds_streams(query, *stream_names) assert_equal stream_names, streams.find_all_streams(query).map(&:name) end diff --git a/test/streams_test.rb b/test/streams_test.rb index ace1cd7..218a174 100644 --- a/test/streams_test.rb +++ b/test/streams_test.rb @@ -6,6 +6,10 @@ module Syskit::Log describe Streams do subject { Streams.new } + def compress? + false + end + describe "#add_file" do it "adds the file's streams to the object" do create_logfile "test.0.log" do diff --git a/test/task_streams_test.rb b/test/task_streams_test.rb index 34e4671..f26657b 100644 --- a/test/task_streams_test.rb +++ b/test/task_streams_test.rb @@ -146,6 +146,10 @@ def self.task_streams_behavior(stream_class) # rubocop:disable Metrics/AbcSize, assert @subject end + def compress? + false + end + task_streams_behavior(::Pocolog::DataStream) describe "#each_port_stream" do diff --git a/test/test_helper.rb b/test/test_helper.rb index db22e18..bcddac6 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -38,10 +38,6 @@ def make_tmppath path end - def logfile_pathname(*basename) - Pathname.new(logfile_path(*basename)) - end - def create_datastore(path) @datastore = Datastore.create(path) end @@ -71,16 +67,12 @@ def create_dataset(digest, metadata: {}) end end - def roby_log_path(name) - Pathname(__dir__) + "roby-logs" + "#{name}-events.log" + def compress? + ENV["SYSKIT_LOG_TEST_COMPRESS"] == "1" end - def create_roby_logfile(name) - path = Pathname(logfile_path(name)) - path.open "w" do |io| - Roby::DRoby::Logfile.write_header(io) - end - path + def roby_log_path(name) + Pathname(__dir__) + "roby-logs" + "#{name}-events.log" end # Create a "ready to use" datastore based on the given data in fixtures/ @@ -98,6 +90,75 @@ def prepare_fixture_datastore(name) [store, set] end + def logfile_pathname(*path) + raw = Pathname.new(logfile_path(*path)) + return raw unless /-events\.log$|\.\d+\.log$/.match?(path.last) + return raw unless compress? + + Pathname.new(logfile_path(*path[0..-2], path.last + ".zst")) + end + + def open_logfile(path, index_dir: nil, **kw) + path = logfile_pathname(*path) + index_dir ||= path.dirname + return super(path, index_dir: index_dir.to_s, **kw) unless compress? + + decompressed = Syskit::Log.decompressed(path, Pathname(index_dir)) + super(decompressed, index_dir: index_dir.to_s, **kw) + end + + def open_logfile_stream(path, stream_name, **kw) + open_logfile(path, **kw).stream(stream_name) + end + + def read_logfile(*name) + path = logfile_pathname(*name) + data = path.read + return data unless path.extname == ".zst" + + Zstd.decompress(data) + end + + def write_logfile(name, data) + path = logfile_pathname(name) + data = Zstd.compress(data) if path.extname == ".zst" + path.write data + end + + def create_logfile(name, truncate: 0) + path = Pathname.new(super(name)) + path.truncate(path.stat.size - truncate) + return path unless compress? + + compressed = Zstd.compress(path.read) + compressed_path = path.sub_ext(".log.zst") + compressed_path.write(compressed) + path.unlink + compressed_path + end + + def create_roby_logfile(name) + path = Pathname(logfile_path(name)) + path.open "w" do |io| + Roby::DRoby::Logfile.write_header(io) + end + return path unless compress? + + compressed = Zstd.compress(path.read) + compressed_path = path.sub_ext(".log.zst") + compressed_path.write(compressed) + path.unlink + compressed_path + end + + def copy_in_file(in_path, name) + out_path = logfile_pathname(name) + data = in_path.read + data = Zstd.compress(data) if compress? + + out_path.write(data) + end + def import_logfiles(path = logfile_pathname) root_path = make_tmppath datastore_path = root_path + "datastore" @@ -106,6 +167,7 @@ def import_logfiles(path = logfile_pathname) import = Datastore::Import.new( root_path + "import-core", + compress: compress?, cache_path: root_path + "import-cache" ) dataset = import.normalize_dataset([path])