Skip to content

Commit

Permalink
feat: write minimal index files during import
Browse files Browse the repository at this point in the history
  • Loading branch information
doudou committed Aug 30, 2023
1 parent fbcf7fd commit c34da86
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 12 deletions.
30 changes: 29 additions & 1 deletion lib/syskit/log/datastore/normalize.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ class Output
attr_reader :path
attr_reader :stream_block
attr_reader :digest
attr_reader :stream_size
attr_reader :stream_block_pos
attr_reader :last_data_block_time
attr_reader :tell
attr_reader :interval_rt
attr_reader :interval_lg

WRITE_BLOCK_SIZE = 1024**2

Expand All @@ -50,12 +52,32 @@ def initialize(
@wio = wio
@stream_block = stream_block
@stream_block_pos = stream_block_pos
@stream_size = 0
@interval_rt = []
@interval_lg = []
@digest = digest
@tell = wio.tell
@buffer = "".dup
end

def write_pocolog_minimal_index
index_path = Syskit::Log.minimal_index_path(path)
Syskit::Log.write_pocolog_minimal_index([index_stream_info], index_path)
end

def index_stream_info
Pocolog::Format::Current::IndexStreamInfo.new(
declaration_pos: stream_block_pos,
index_pos: 0,
base_time: 0,
stream_size: stream_size,
rt_min: interval_rt[0] || 0,
rt_max: interval_rt[1] || 0,
lg_min: interval_lg[0] || 0,
lg_max: interval_lg[1] || 0
)
end

def write(data)
if data.size + @buffer.size > WRITE_BLOCK_SIZE
@wio.write @buffer + data
Expand Down Expand Up @@ -87,12 +109,16 @@ def create_block_stream
end

def add_data_block(rt_time, lg_time, raw_data, raw_payload)
@stream_size += 1

write raw_data[0, 2]
write ZERO_BYTE
write raw_data[4..-1]
write raw_payload
@interval_rt[0] ||= rt_time
@interval_rt[1] = rt_time
@interval_lg[0] ||= lg_time
@interval_lg[1] = lg_time
@last_data_block_time = [rt_time, lg_time]
end

Expand Down Expand Up @@ -146,8 +172,10 @@ def normalize_logfile_group(
raise
end

out_files.each_value(&:close)
out_files.each_value.map do |output|
output.write_pocolog_minimal_index
output.close

Dataset::IdentityEntry.new(
output.path, output.tell, output.string_digest
)
Expand Down
14 changes: 14 additions & 0 deletions test/datastore/import_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ def tty_reporter
dataset.dataset_path + "pocolog" + "task0::port.0.log#{file_ext}"
assert expected_file.exist?
end
it "generates valid minimal indexes" do
dataset = import.normalize_dataset([logfile_pathname])

pocolog_path = Syskit::Log.find_path_plain_or_compressed(
dataset.dataset_path + "pocolog" + "task0::port.0.log"
)
index_path = dataset.dataset_path + "pocolog" + "task0::port.0.idx"
assert index_path.exist?

# This relies on the validation of the stream block
Syskit::Log.read_single_lazy_data_stream(
pocolog_path, index_path, Pathname("")
)
end
it "calculates the dataset digest" do
dataset = import.normalize_dataset([logfile_pathname])
identity = dataset.compute_dataset_identity_from_files
Expand Down
11 changes: 0 additions & 11 deletions test/datastore/normalize_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,6 @@ class Datastore
refute input0_path.exist?
assert input1_path.exist?
end
it "generates valid index files for the normalized streams" do
skip if compress?

logfile_pathname("normalized").mkdir
normalize.normalize([logfile_pathname("file0.0.log")])
flexmock(Pocolog::Logfiles).new_instances
.should_receive(:rebuild_and_load_index)
.never
open_logfile_stream ["normalized", "task0::port.0.log"], "task0.port"
open_logfile_stream ["normalized", "task1::port.0.log"], "task1.port"
end
describe "digest generation" do
it "optionally computes the sha256 digest of the generated file, "\
"without the prologue" do
Expand Down

0 comments on commit c34da86

Please sign in to comment.