Skip to content

Commit

Permalink
feat: handle compressed pocolog files in the datastore itself
Browse files Browse the repository at this point in the history
  • Loading branch information
doudou committed Aug 22, 2023
1 parent 5125fed commit c924b22
Show file tree
Hide file tree
Showing 17 changed files with 218 additions and 238 deletions.
10 changes: 7 additions & 3 deletions lib/syskit/log/cli/datastore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,10 +223,14 @@ def raw_dataset?(path)
return unless path.directory?

has_pocolog_files =
Pathname.enum_for(:glob, path + "*.0.log").any? { true }
Pathname.enum_for(:glob, path + "*.0.log").any? { true } ||
Pathname.enum_for(:glob, path + "*.0.log.zst").any? { true }
has_roby_events =
Pathname.enum_for(:glob, path + "*-events.log").any? { true }
has_process_server_info_yml = (path + "info.yml").exist?
Pathname.enum_for(:glob, path + "*-events.log").any? { true } ||
Pathname.enum_for(:glob, path + "*-events.log.zst").any? { true }
has_process_server_info_yml =
(path + "info.yml").exist? ||
(path + "info.yml.zst").exist?

has_pocolog_files &&
(has_roby_events || has_process_server_info_yml)
Expand Down
72 changes: 33 additions & 39 deletions lib/syskit/log/datastore/dataset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ def compute_file_identity(path)
end
sha2 = compute_file_sha2(io)

IdentityEntry.new(path, io.tell, sha2)
identity_path = path.dirname + path.basename(".zst")
IdentityEntry.new(identity_path, io.tell, sha2)
end
end

Expand Down Expand Up @@ -326,12 +327,14 @@ def compute_dataset_digest(
def each_important_file
return enum_for(__method__) unless block_given?

Pathname.glob(dataset_path + "pocolog" + "*.*.log") do |path|
yield(path)
end
["", ".zst"].each do |ext|
Pathname.glob(dataset_path + "pocolog" + "*.*.log#{ext}") do |path|
yield(path)
end

Pathname.glob(dataset_path + "roby-events.*.log") do |path|
yield(path)
Pathname.glob(dataset_path + "roby-events.*.log#{ext}") do |path|
yield(path)
end
end
end

Expand Down Expand Up @@ -566,12 +569,13 @@ def metadata_write_to_file

def pocolog_path(name)
path = dataset_path + "pocolog" + "#{name}.0.log"
unless path.exist?
raise ArgumentError,
"no pocolog file for stream #{name} (expected #{path})"
end
return path if path.exist?

path = path.sub_ext(".log.zst")
return Syskit::Log.decompressed(path, cache_path) if path.exist?

path
raise ArgumentError,
"no pocolog file for stream #{name} (expected #{path})"
end

def each_pocolog_path
Expand All @@ -580,6 +584,10 @@ def each_pocolog_path
Pathname.glob(dataset_path + "pocolog" + "*.log") do |logfile_path|
yield(logfile_path)
end

Pathname.glob(dataset_path + "pocolog" + "*.log.zst") do |logfile_path|
yield(logfile_path)
end
end

# Enumerate the pocolog streams available in this dataset
Expand All @@ -591,6 +599,7 @@ def each_pocolog_stream

pocolog_index_dir = (cache_path + "pocolog").to_s
each_pocolog_path do |logfile_path|
logfile_path = Syskit::Log.decompressed(logfile_path, cache_path)
logfile = Pocolog::Logfiles.open(
logfile_path, index_dir: pocolog_index_dir, silent: true
)
Expand All @@ -602,38 +611,23 @@ def each_pocolog_stream
#
# Load lazy data stream information from disk
def read_lazy_data_streams
pocolog_index_dir = (cache_path + "pocolog").to_s
Pathname.enum_for(:glob, dataset_path + "pocolog" + "*.log").map do |logfile_path|
each_pocolog_path.map do |logfile_path|
raw_logfile_path =
logfile_path.dirname + logfile_path.basename(".zst")

index_path = Pocolog::Logfiles.default_index_filename(
logfile_path.to_s, index_dir: pocolog_index_dir.to_s
raw_logfile_path.to_s, index_dir: logfile_path.dirname.to_s
)
index_path = Pathname.new(index_path)
logfile_path.open do |file_io|
index_path.open do |index_io|
stream_info =
Pocolog::Format::Current
.read_minimal_info(index_io, file_io)
stream_block, index_stream_info = stream_info.first

interval_rt = index_stream_info.interval_rt.map do |t|
Pocolog::StreamIndex.time_from_internal(t, 0)
end
interval_lg = index_stream_info.interval_lg.map do |t|
Pocolog::StreamIndex.time_from_internal(t, 0)
end

LazyDataStream.new(
logfile_path,
pocolog_index_dir,
stream_block.name,
stream_block.type,
stream_block.metadata,
interval_rt,
interval_lg,
index_stream_info.stream_size
)
end
unless index_path.exist?
Syskit::Log.generate_pocolog_minimal_index(
logfile_path, index_path
)
end

Syskit::Log.read_single_lazy_data_stream(
logfile_path, index_path, cache_path + "pocolog"
)
end
end

Expand Down
13 changes: 9 additions & 4 deletions lib/syskit/log/datastore/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ def normalize_pocolog_files(files, delete_input: false)
total: bytes_total
)

Syskit::Log::Datastore.normalize(
entries = Syskit::Log::Datastore.normalize(
files,
output_path: out_pocolog_dir, index_dir: out_pocolog_cache_dir,
delete_input: delete_input, compress: @compress, reporter: @reporter
)
entries.each { |e| e.path = identity_path(e.path) }
ensure
@reporter.finish
end
Expand Down Expand Up @@ -299,7 +300,7 @@ def copy_roby_event_log(event_log_path, roby_sql_index) # rubocop:disable Metric
end

@reporter.reset_progressbar(
"#{event_log_path.basename} [:bar]", total: event_log_path.stat.size
"#{event_log_path.basename} [:bar]", total: in_stat.size
)

rebuilder = Roby::DRoby::PlanRebuilder.new
Expand All @@ -315,7 +316,7 @@ def copy_roby_event_log(event_log_path, roby_sql_index) # rubocop:disable Metric
end

entry = Dataset::IdentityEntry.new(
out_path, out_io.tell, out_io.string_digest
identity_path(out_path), out_io.tell, out_io.string_digest
)
out_io.close
FileUtils.touch out_path.to_s, mtime: in_stat.mtime
Expand Down Expand Up @@ -395,7 +396,7 @@ def copy_roby_event_log_no_index(event_log_path)
end

entry = Dataset::IdentityEntry.new(
out_path, out_io.tell, out_io.string_digest
identity_path(out_path), out_io.tell, out_io.string_digest
)
out_io.close
FileUtils.touch out_path.to_s, mtime: in_stat.mtime
Expand All @@ -405,6 +406,10 @@ def copy_roby_event_log_no_index(event_log_path)
out_io&.close unless out_io&.closed?
end

def identity_path(file_path)
file_path.dirname + file_path.basename(".zst")
end

# @api private
#
# Initialize the IOs and objects needed for a roby event log copy
Expand Down
5 changes: 4 additions & 1 deletion lib/syskit/log/lazy_data_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ def registry
def syskit_eager_load
return @pocolog_stream if @pocolog_stream

file = Pocolog::Logfiles.open(path, index_dir: index_dir)
file = Pocolog::Logfiles.open(
Syskit::Log.decompressed(path, index_dir).to_s,
index_dir: index_dir
)
@pocolog_stream = file.streams.first
end

Expand Down
2 changes: 1 addition & 1 deletion lib/syskit/log/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def self.from_file(file)
streams
end

# Load the set of streams available from a file
# Load the set of streams available from a dataset
def self.from_dataset(dataset)
streams = new
streams.add_dataset(dataset)
Expand Down
26 changes: 17 additions & 9 deletions test/datastore/dataset_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,13 @@ class Datastore
attr_reader :root_path, :dataset, :dataset_path, :cache_path, :store
attr_reader :roby_digest, :pocolog_digest

def dataset_pathname(*names)
dataset_path + File.join(*names)
def dataset_pathname(*names, compress: compress?)
path = dataset_path.join(*names[0..-2])
if compress
path.join(names[-1] + ".zst")
else
path.join(names[-1])
end
end

before do
Expand All @@ -31,7 +36,7 @@ def dataset_pathname(*names)
FileUtils.touch dataset_pathname("text", "test.txt")
dataset_pathname("roby-events.0.log").open("w") { |io| io.write "ROBY" }
FileUtils.touch dataset_pathname("ignored", "not_recognized_file")
dataset_pathname("ignored", "not_recognized_dir").mkpath
dataset_pathname("ignored", "not_recognized_dir", compress: false).mkpath
FileUtils.touch dataset_pathname("ignored", "not_recognized_dir", "test")
end
after do
Expand Down Expand Up @@ -557,10 +562,11 @@ def write_metadata(
describe "#each_pocolog_stream" do
it "expects the pocolog cache files in the dataset's cache directory" do
cache_path.mkpath
open_logfile logfile_path("task0::port.0.log"), index_dir: (cache_path + "pocolog").to_s
flexmock(Pocolog::Logfiles).new_instances
.should_receive(:rebuild_and_load_index)
.never
open_logfile "task0::port.0.log", index_dir: (cache_path + "pocolog").to_s
flexmock(Pocolog::Logfiles)
.new_instances
.should_receive(:rebuild_and_load_index)
.never
streams = dataset.each_pocolog_stream.to_a
assert_equal ["test"], streams.map(&:name)
end
Expand All @@ -585,8 +591,10 @@ def write_metadata(
write_logfile_sample base_time + 100, base_time + 300, 3
end
cache_path.mkpath
open_logfile logfile_path("task0::port.0.log"), index_dir: (cache_path + "pocolog").to_s
open_logfile logfile_path("task0::other.0.log"), index_dir: (cache_path + "pocolog").to_s
open_logfile "task0::port.0.log",
index_dir: (cache_path + "pocolog").to_s
open_logfile "task0::other.0.log",
index_dir: (cache_path + "pocolog").to_s
end

it "loads stream information and returns LazyDataStream objects" do
Expand Down
65 changes: 0 additions & 65 deletions test/datastore/import_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -297,71 +297,6 @@ def tty_reporter
assert_equal t, time
end
end

def logfile_pathname(*path)
return super unless /-events\.log$|\.\d+\.log$/.match?(path.last)
return super unless compress?

super(*path[0..-2], path.last + ".zst")
end

def open_logfile_stream(path, stream_name, **kw)
return super unless compress?

Tempfile.open(["", ".log"]) do |temp_io|
path = path.sub_ext(".log.zst") if path.extname != ".zst"

temp_io.write Zstd.decompress(path.read)
temp_io.flush
temp_io.rewind
return super(temp_io.path, stream_name, **kw)
end
end

def read_logfile(*name)
path = logfile_pathname(*name)
data = path.read
return data unless path.extname == ".zst"

Zstd.decompress(data)
end

def write_logfile(name, data)
path = logfile_pathname(name)
data = Zstd.compress(data) if path.extname == ".zst"
path.write data
end

def create_logfile(name, truncate: 0)
path = Pathname.new(super(name))
path.truncate(path.stat.size - truncate)
return path unless compress?

compressed = Zstd.compress(path.read)
compressed_path = path.sub_ext(".log.zst")
compressed_path.write(compressed)
path.unlink
compressed_path
end

def create_roby_logfile(name)
path = Pathname.new(super(name))
return path unless compress?

compressed = Zstd.compress(path.read)
compressed_path = path.sub_ext(".log.zst")
compressed_path.write(compressed)
path.unlink
compressed_path
end

def copy_in_file(in_path, name)
out_path = logfile_pathname(name)
data = in_path.read
data = Zstd.compress(data) if compress?

out_path.write(data)
end
end
end
end
Loading

0 comments on commit c924b22

Please sign in to comment.