Skip to content

Commit

Permalink
Merge pull request #35 from rock-core/export_minimal_indexes_during_i…
Browse files Browse the repository at this point in the history
…mport

feat: export minimal indexes during import (on top of #33)
  • Loading branch information
doudou authored Aug 30, 2023
2 parents 210e471 + c34da86 commit 3d6f4b9
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 93 deletions.
26 changes: 19 additions & 7 deletions lib/syskit/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -243,17 +243,24 @@ def self.generate_pocolog_minimal_index(
block_stream = Pocolog::BlockStream.new(digest_io)
end

stream_info = Pocolog.file_index_builder(block_stream, skip_payload: false)
write_pocolog_minimal_index(stream_info, index_path)
digest_io&.digest
streams_info = Pocolog.file_index_builder(block_stream, skip_payload: false)
index_streams_info = streams_info.map do |info|
Pocolog::Format::Current.index_stream_info(info, 0)
end
write_pocolog_minimal_index(index_streams_info, index_path)
end

def self.write_pocolog_minimal_index(stream_info, index_path)
# Write a pocolog file's minimal index
#
# @param [Array<Pocolog::IndexStreamInfo>] index_stream_info index-specific stream
# information
# @param [Pathname] index_path path to the file that should be written
def self.write_pocolog_minimal_index(index_stream_info, index_path)
index_path.open("w") do |index_io|
Pocolog::Format::Current.write_index_header(index_io, 0, Time.now, 1)
Pocolog::Format::Current.write_index_stream_info(
index_io, stream_info
)
index_stream_info.each do
index_io.write(_1.marshal)
end
end
end

Expand All @@ -271,6 +278,11 @@ def self.index_path(logfile_path, cache_path)
Pathname(index_path)
end

# Return the path of the minimal index for a given pocolog file
#
# @param [Pathname] logfile_path the path to the log file (possibly
# compressed)
# @return [Pathname]
def self.minimal_index_path(logfile_path)
basename = logfile_path.basename(".zst").to_s
index = Pocolog::Logfiles.default_index_filename(
Expand Down
5 changes: 2 additions & 3 deletions lib/syskit/log/datastore/import.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def self.move_dataset_to_store(dataset, datastore)
final_core_dir = datastore.core_path_of(dataset_digest)
FileUtils.mv dataset.dataset_path, final_core_dir

final_cache_dir = datastore.cache_path_of(dataset_digest)
if dataset.cache_path.exist?
final_cache_dir = datastore.cache_path_of(dataset_digest)
FileUtils.mv dataset.cache_path, final_cache_dir
end

Expand Down Expand Up @@ -244,7 +244,6 @@ def normalize_pocolog_files(files, delete_input: false)

out_pocolog_dir = (@output_path + "pocolog")
out_pocolog_dir.mkpath
out_pocolog_cache_dir = (@cache_path + "pocolog")
bytes_total = files.inject(0) { |s, p| s + p.size }
@reporter.reset_progressbar(
"|:bar| :current_byte/:total_byte :eta (:byte_rate/s)",
Expand All @@ -253,7 +252,7 @@ def normalize_pocolog_files(files, delete_input: false)

entries = Syskit::Log::Datastore.normalize(
files,
output_path: out_pocolog_dir, index_dir: out_pocolog_cache_dir,
output_path: out_pocolog_dir,
delete_input: delete_input, compress: @compress, reporter: @reporter
)
entries.each { |e| e.path = identity_path(e.path) }
Expand Down
110 changes: 57 additions & 53 deletions lib/syskit/log/datastore/normalize.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ class Datastore
def self.normalize(
paths,
output_path: paths.first.dirname + "normalized", reporter: NullReporter.new,
index_dir: output_path, delete_input: false,
compress: false
delete_input: false, compress: false
)
Normalize.new(compress: compress).normalize(
paths,
output_path: output_path, reporter: reporter,
index_dir: index_dir, delete_input: delete_input
output_path: output_path, reporter: reporter, delete_input: delete_input
)
end

Expand All @@ -35,23 +33,51 @@ class InvalidFollowupStream < RuntimeError; end
#
# Internal representation of the output of a normalization operation
class Output
attr_reader :path, :stream_info, :digest, :stream_block_pos,
:index_map, :last_data_block_time, :tell, :interval_rt
attr_reader :path
attr_reader :stream_block
attr_reader :digest
attr_reader :stream_size
attr_reader :stream_block_pos
attr_reader :last_data_block_time
attr_reader :tell
attr_reader :interval_rt
attr_reader :interval_lg

WRITE_BLOCK_SIZE = 1024**2

def initialize(path, wio, stream_info, digest, stream_block_pos)
def initialize(
path, wio, stream_block, digest, stream_block_pos
)
@path = path
@wio = wio
@stream_info = stream_info
@stream_block = stream_block
@stream_block_pos = stream_block_pos
@stream_size = 0
@interval_rt = []
@interval_lg = []
@digest = digest
@index_map = []
@tell = wio.tell
@buffer = "".dup
end

def write_pocolog_minimal_index
index_path = Syskit::Log.minimal_index_path(path)
Syskit::Log.write_pocolog_minimal_index([index_stream_info], index_path)
end

def index_stream_info
Pocolog::Format::Current::IndexStreamInfo.new(
declaration_pos: stream_block_pos,
index_pos: 0,
base_time: 0,
stream_size: stream_size,
rt_min: interval_rt[0] || 0,
rt_max: interval_rt[1] || 0,
lg_min: interval_lg[0] || 0,
lg_max: interval_lg[1] || 0
)
end

def write(data)
if data.size + @buffer.size > WRITE_BLOCK_SIZE
@wio.write @buffer + data
Expand Down Expand Up @@ -83,13 +109,16 @@ def create_block_stream
end

def add_data_block(rt_time, lg_time, raw_data, raw_payload)
@index_map << (@tell + @buffer.size) << lg_time
@stream_size += 1

write raw_data[0, 2]
write ZERO_BYTE
write raw_data[4..-1]
write raw_payload
@interval_rt[0] ||= rt_time
@interval_rt[1] = rt_time
@interval_lg[0] ||= lg_time
@interval_lg[1] = lg_time
@last_data_block_time = [rt_time, lg_time]
end

Expand All @@ -111,19 +140,17 @@ def compress?
def normalize(
paths,
output_path: paths.first.dirname + "normalized",
index_dir: output_path, reporter: NullReporter.new,
delete_input: false
reporter: NullReporter.new, delete_input: false
)
output_path.mkpath
index_dir.mkpath
logfile_groups = paths.group_by do
/\.\d+\.log(?:\.zst)?$/.match(_1.basename.to_s).pre_match
end

result = logfile_groups.values.map do |files|
result = logfile_groups.map do |key, files|
reporter.info "Normalizing group #{key}"
group_result = normalize_logfile_group(
files,
output_path: output_path, index_dir: index_dir, reporter: reporter
files, output_path: output_path, reporter: reporter
)

files.each(&:unlink) if delete_input
Expand All @@ -134,7 +161,7 @@ def normalize(
end

def normalize_logfile_group(
files, output_path:, index_dir:, reporter: NullReporter.new
files, output_path:, reporter: NullReporter.new
)
files.each do |logfile_path|
normalize_logfile(logfile_path, output_path, reporter: reporter)
Expand All @@ -145,12 +172,10 @@ def normalize_logfile_group(
raise
end

# When compressed, we don't have a "plain" logfile to use with the
# index ... it makes little sense to write the index
write_pending_pocolog_indexes(index_dir) unless compress?

out_files.each_value(&:close)
out_files.each_value.map do |output|
output.write_pocolog_minimal_index
output.close

Dataset::IdentityEntry.new(
output.path, output.tell, output.string_digest
)
Expand All @@ -166,29 +191,6 @@ def normalize_logfile_group(
out_files.clear
end

def write_pending_pocolog_indexes(index_dir)
indexes = []
# Now write the indexes
out_files.each_value do |output|
block_stream = output.create_block_stream
raw_stream_info = Pocolog::IndexBuilderStreamInfo.new(
output.stream_block_pos, output.index_map
)
stream_info = Pocolog.create_index_from_raw_info(
block_stream, [raw_stream_info], interval_rt: [output.interval_rt]
)
index_path = default_index_pathname(output.path, index_dir: index_dir)
indexes << index_path
index_path.open("w") do |io|
Pocolog::Format::Current
.write_index(io, block_stream.io, stream_info)
end
end
rescue Exception # rubocop:disable Lint/RescueException
indexes.map { _1.unlink if _1.exist? }
raise
end

def default_index_pathname(logfile_path, index_dir:)
logfile_path = logfile_path.sub_ext("") if logfile_path.extname == ".zst"
path = Pocolog::Logfiles.default_index_filename(
Expand Down Expand Up @@ -380,9 +382,9 @@ def normalize_logfile_process_data_block(
end

def create_or_reuse_out_io(
output_path, raw_header, stream_info, initial_blocks
output_path, raw_header, stream_block, initial_blocks
)
basename = Streams.normalized_filename(stream_info.metadata)
basename = Streams.normalized_filename(stream_block.metadata)
ext = ".zst" if compress?
out_file_path = output_path + "#{basename}.0.log#{ext}"

Expand All @@ -391,20 +393,20 @@ def create_or_reuse_out_io(
if (existing = out_files[out_file_path])
# This is a file we've already seen, reuse its info
# and do some consistency checks
if existing.stream_info.type != stream_info.type
if existing.stream_block.type != stream_block.type
raise InvalidFollowupStream,
"multi-IO stream #{stream_info.name} is not consistent: "\
"multi-IO stream #{stream_block.name} is not consistent: "\
"type mismatch"
end
# Note: normalize_logfile is checking that the files follow
# each other
return existing
end

raw_payload = stream_info.encode
raw_payload = stream_block.encode
raw_header[4, 4] = [raw_payload.size].pack("V")
initialize_out_file(
out_file_path, stream_info, raw_header, raw_payload, initial_blocks
out_file_path, stream_block, raw_header, raw_payload, initial_blocks
)
end

Expand All @@ -414,15 +416,17 @@ def create_or_reuse_out_io(
#
# @return [Output]
def initialize_out_file(
out_file_path, stream_info, raw_header, raw_payload, initial_blocks
out_file_path, stream_block, raw_header, raw_payload, initial_blocks
)
wio = Syskit::Log.open_out_stream(out_file_path)

Pocolog::Format::Current.write_prologue(wio)
digest = Digest::SHA256.new
wio = DigestIO.new(wio, digest)

output = Output.new(out_file_path, wio, stream_info, digest, wio.tell)
output = Output.new(
out_file_path, wio, stream_block, digest, wio.tell
)
output.write initial_blocks
output.write raw_header[0, 2]
output.write ZERO_BYTE
Expand Down
19 changes: 15 additions & 4 deletions test/datastore/import_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,7 @@ def tty_reporter
end

it "normalizes the pocolog logfiles" do
expected_normalize_args = hsh(
output_path: @output_path + "pocolog",
index_dir: @cache_path + "pocolog"
)
expected_normalize_args = hsh(output_path: @output_path + "pocolog")

flexmock(Syskit::Log::Datastore)
.should_receive(:normalize)
Expand All @@ -130,6 +127,20 @@ def tty_reporter
dataset.dataset_path + "pocolog" + "task0::port.0.log#{file_ext}"
assert expected_file.exist?
end
it "generates valid minimal indexes" do
dataset = import.normalize_dataset([logfile_pathname])

pocolog_path = Syskit::Log.find_path_plain_or_compressed(
dataset.dataset_path + "pocolog" + "task0::port.0.log"
)
index_path = dataset.dataset_path + "pocolog" + "task0::port.0.idx"
assert index_path.exist?

# This relies on the validation of the stream block
Syskit::Log.read_single_lazy_data_stream(
pocolog_path, index_path, Pathname("")
)
end
it "calculates the dataset digest" do
dataset = import.normalize_dataset([logfile_pathname])
identity = dataset.compute_dataset_identity_from_files
Expand Down
26 changes: 0 additions & 26 deletions test/datastore/normalize_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,32 +95,6 @@ class Datastore
refute input0_path.exist?
assert input1_path.exist?
end
it "generates valid index files for the normalized streams" do
skip if compress?

logfile_pathname("normalized").mkdir
normalize.normalize([logfile_pathname("file0.0.log")])
flexmock(Pocolog::Logfiles).new_instances
.should_receive(:rebuild_and_load_index)
.never
open_logfile_stream ["normalized", "task0::port.0.log"], "task0.port"
open_logfile_stream ["normalized", "task1::port.0.log"], "task1.port"
end
it "allows to specify the cache directory" do
skip if compress?

logfile_pathname("normalized").mkdir
index_dir = logfile_pathname("cache")
normalize.normalize(
[logfile_pathname("file0.0.log")], index_dir: index_dir
)
flexmock(Pocolog::Logfiles)
.new_instances
.should_receive(:rebuild_and_load_index)
.never
open_logfile_stream ["normalized", "task0::port.0.log"], "task0.port", index_dir: index_dir
open_logfile_stream ["normalized", "task1::port.0.log"], "task1.port", index_dir: index_dir
end
describe "digest generation" do
it "optionally computes the sha256 digest of the generated file, "\
"without the prologue" do
Expand Down

0 comments on commit 3d6f4b9

Please sign in to comment.