From bfc1c3dee357ec16ccfb54ac253460fcd7cdbb45 Mon Sep 17 00:00:00 2001 From: Sylvain Date: Mon, 31 May 2021 20:22:53 -0300 Subject: [PATCH] adapt the unit tests --- lib/syskit/log.rb | 2 +- lib/syskit/log/cli/datastore.rb | 127 +++++++++++---------- lib/syskit/log/datastore/dataset.rb | 4 +- lib/syskit/log/datastore/import.rb | 70 +++++++----- lib/syskit/log/datastore/index_build.rb | 12 +- lib/syskit/log/datastore/normalize.rb | 42 +++++-- lib/syskit/log/dsl.rb | 11 +- lib/syskit/log/roby_sql_index/accessors.rb | 12 +- lib/syskit/log/streams.rb | 9 +- lib/syskit/pocolog.rb | 55 --------- test/cli/datastore_test.rb | 87 +++++++------- test/datastore/dataset_test.rb | 49 +++++--- test/datastore/import_test.rb | 69 ++++++----- test/datastore/index_build_test.rb | 32 +++--- test/datastore/normalize_test.rb | 28 +++-- test/streams_test.rb | 53 ++++++--- 16 files changed, 357 insertions(+), 305 deletions(-) delete mode 100644 lib/syskit/pocolog.rb diff --git a/lib/syskit/log.rb b/lib/syskit/log.rb index f892177..ed7758c 100644 --- a/lib/syskit/log.rb +++ b/lib/syskit/log.rb @@ -41,7 +41,7 @@ module Log module Syskit module Log # rubocop:disable Style/Documentation - # Returns the paths of the log files in a given directory + # Returns the paths of the pocolog log files in a given directory # # The returned paths are sorted in 'pocolog' order, i.e. multi-IO files are # following each other in the order of their place in the overall IO diff --git a/lib/syskit/log/cli/datastore.rb b/lib/syskit/log/cli/datastore.rb index 9e17f24..abd5fbb 100644 --- a/lib/syskit/log/cli/datastore.rb +++ b/lib/syskit/log/cli/datastore.rb @@ -153,19 +153,42 @@ def show_dataset_pocolog(pastel, store, dataset) end end - def import_dataset(path, reporter, datastore, metadata, merge: false) + # @api private + # + # Parse a metadata option such as --set some=value some-other=value + def parse_metadata_option(hash) + hash.each_with_object({}) do |arg, metadata| + key, value = arg.split('=') + unless value + raise ArgumentError, + "metadata setters need to be specified as "\ + "key=value (got #{arg})" + end + (metadata[key] ||= Set.new) << value + end + end + + def import_dataset?(datastore, path, reporter:) last_import_digest, last_import_time = Syskit::Log::Datastore::Import.find_import_info(path) - already_imported = last_import_digest && - datastore.has?(last_import_digest) - if already_imported && !options[:force] - reporter.info( - "#{path} already seem to have been imported as "\ - "#{last_import_digest} at #{last_import_time}. Give "\ - "--force to import again" - ) - return - end + already_imported = + last_import_digest && datastore.has?(last_import_digest) + return true if !already_imported || options[:force] + + reporter.info( + "#{path} already seem to have been imported as "\ + "#{last_import_digest} at #{last_import_time}. Give "\ + "--force to import again" + ) + false + end + + def dataset_duration(dataset) + dataset.each_pocolog_stream.map(&:duration_lg).max || 0 + end + + def import_dataset(path, reporter, datastore, metadata, merge: false) + return unless import_dataset?(datastore, path, reporter: reporter) paths = if merge @@ -177,45 +200,40 @@ def import_dataset(path, reporter, datastore, metadata, merge: false) datastore.in_incoming do |core_path, cache_path| importer = Syskit::Log::Datastore::Import.new(datastore) dataset = importer.normalize_dataset( - paths, core_path, cache_path: cache_path, - reporter: reporter + paths, core_path, + cache_path: cache_path, reporter: reporter ) metadata.each { |k, v| dataset.metadata_set(k, *v) } dataset.metadata_write_to_file - stream_duration = dataset.each_pocolog_stream - .map(&:duration_lg) - .max - stream_duration ||= 0 - - if already_imported - # --force is implied as otherwise we would have - # skipped earlier + dataset_duration = dataset_duration(dataset) + unless dataset_duration >= options[:min_duration] reporter.info( - "#{path} seem to have already been imported but --force "\ - "is given, overwriting" + "#{path} lasts only %.1fs, ignored" % [dataset_duration] ) - datastore.delete(last_import_digest) + break end - if stream_duration >= options[:min_duration] - begin - final_core_dir = importer.move_dataset_to_store( - path, dataset, - force: options[:force], reporter: reporter - ) - puts File.basename(final_core_dir) - rescue Syskit::Log::Datastore::Import::DatasetAlreadyExists - reporter.info( - "#{path} already seem to have been imported as "\ - "#{dataset.compute_dataset_digest}. Give "\ - "--force to import again" - ) - end - else + begin + importer.validate_dataset_import( + dataset, force: options[:force], reporter: reporter + ) + rescue Syskit::Log::Datastore::Import::DatasetAlreadyExists reporter.info( - "#{path} lasts only %.1fs, ignored" % [stream_duration] + "#{path} already seem to have been imported as "\ + "#{dataset.compute_dataset_digest}. Give "\ + "--force to import again" + ) + break + end + + dataset = importer.move_dataset_to_store(dataset) + t = Time.now + paths.each do |p| + Syskit::Log::Datastore::Import.save_import_info( + p, dataset, time: t ) end + dataset end end @@ -370,21 +388,14 @@ def import(root_path, description = nil) datastore = create_store metadata = {} - metadata['description'] = description if description - metadata['tags'] = options[:tags] - options[:metadata].each do |pair| - k, v = pair.split('=') - unless v - raise ArgumentError, - 'expected key=value pair as argument to '\ - "--metadata but got '#{pair}'" - end - (metadata[k] ||= []) << v - end + metadata["description"] = description if description + metadata["tags"] = options[:tags] + metadata.merge!(parse_metadata_option(options[:metadata])) paths.each do |p| - import_dataset(p, reporter, datastore, metadata, - merge: options[:merge]) + dataset = import_dataset(p, reporter, datastore, metadata, + merge: options[:merge]) + puts dataset.digest if dataset end end @@ -483,15 +494,7 @@ def metadata(*query) end if options[:set] - setters = Hash.new - options[:set].map do |arg| - key, value = arg.split('=') - if !value - raise ArgumentError, "metadata setters need to be specified as key=value (got #{arg})" - end - (setters[key] ||= Set.new) << value - end - + setters = parse_metadata_option(options[:set]) datasets.each do |set| setters.each do |k, v| set.metadata_set(k, *v) diff --git a/lib/syskit/log/datastore/dataset.rb b/lib/syskit/log/datastore/dataset.rb index 7f39cf5..ae39945 100644 --- a/lib/syskit/log/datastore/dataset.rb +++ b/lib/syskit/log/datastore/dataset.rb @@ -255,10 +255,10 @@ def compute_dataset_digest( def each_important_file return enum_for(__method__) unless block_given? - Pathname.glob(dataset_path + 'pocolog' + '*.*.log') do |path| + Pathname.glob(dataset_path + "pocolog" + "*.*.log") do |path| yield(path) end - Pathname.glob(dataset_path + '*-events.log') do |path| + Pathname.glob(dataset_path + "roby-events.*.log") do |path| yield(path) end end diff --git a/lib/syskit/log/datastore/import.rb b/lib/syskit/log/datastore/import.rb index ebbbcd6..4d18229 100644 --- a/lib/syskit/log/datastore/import.rb +++ b/lib/syskit/log/datastore/import.rb @@ -46,30 +46,41 @@ def prepare_import(dir_path) # @param [Pathname] dir_path the input directory # @return [Pathname] the directory of the imported dataset in the store def import( - in_path, in_dataset_paths, - force: false, reporter: Pocolog::CLI::NullReporter.new + in_dataset_paths, force: false, reporter: Pocolog::CLI::NullReporter.new ) datastore.in_incoming do |core_path, cache_path| dataset = normalize_dataset( in_dataset_paths, core_path, cache_path: cache_path, reporter: reporter ) - move_dataset_to_store( - in_path, dataset, force: force, reporter: reporter + validate_dataset_import( + dataset, force: force, reporter: reporter ) + move_dataset_to_store(dataset) end end # Find if a directory has already been imported # - # @return [(String,Time),nil] if the directory has already been - # imported, the time and digest of the import. Otherwise, returns nil + # @param [Pathname] path + # @return [(String,Time)] the digest and time of the last import def self.find_import_info(path) info_path = (path + BASENAME_IMPORT_TAG) return unless info_path.exist? info = YAML.safe_load(info_path.read, [Time]) - [info['sha2'], info['time']] + [info["digest"], info["time"]] + end + + # Save import info, used by {.find_import_info} + # + # @param [Pathname] path + # @param [ImportInfo] info + def self.save_import_info(path, dataset, time: Time.now) + (path + BASENAME_IMPORT_TAG).open("w") do |io| + h = { "digest" => dataset.digest, "time" => time } + YAML.dump(h, io) + end end # Move the given dataset to the store @@ -80,26 +91,11 @@ def self.find_import_info(path) # @param [Boolean] force if force (the default), the method will fail if # the dataset is already in the store. Otherwise, it will erase the # existing dataset with the new one - # @return [Pathname] the path to the new dataset in the store + # @return [Dataset] the dataset at its final place # @raise DatasetAlreadyExists if a dataset already exists with the same # ID than the new one and 'force' is false - def move_dataset_to_store(in_path, dataset, - force: false, - reporter: Pocolog::CLI::NullReporter.new) - dataset_digest = dataset.compute_dataset_digest - - if datastore.has?(dataset_digest) - if force - datastore.delete(dataset_digest) - reporter.warn "Replacing existing dataset #{dataset_digest} "\ - 'with new one' - else - raise DatasetAlreadyExists, - "a dataset identical to #{dataset.dataset_path} already "\ - "exists in the store (computed digest is #{dataset_digest})" - end - end - + def move_dataset_to_store(dataset) + dataset_digest = dataset.digest final_core_dir = datastore.core_path_of(dataset_digest) FileUtils.mv dataset.dataset_path, final_core_dir final_cache_dir = datastore.cache_path_of(dataset_digest) @@ -107,11 +103,29 @@ def move_dataset_to_store(in_path, dataset, FileUtils.mv dataset.cache_path, final_cache_dir end - (in_path + BASENAME_IMPORT_TAG).open('w') do |io| - YAML.dump(Hash['sha2' => dataset_digest, 'time' => Time.now], io) + Dataset.new(final_core_dir, + digest: dataset_digest, + cache: final_cache_dir) + end + + # @api private + # + # Verifies that the given data should be imported + def validate_dataset_import( + dataset, force: false, reporter: Pocolog::CLI::NullReporter.new + ) + return unless datastore.has?(dataset.digest) + + if force + datastore.delete(dataset.digest) + reporter.warn "Replacing existing dataset #{dataset.digest} "\ + "with new one" + return end - final_core_dir + raise DatasetAlreadyExists, + "a dataset identical to #{dataset.dataset_path} already "\ + "exists in the store (computed digest is #{dataset.digest})" end # Import Roby's info.yml information into the dataset metadata diff --git a/lib/syskit/log/datastore/index_build.rb b/lib/syskit/log/datastore/index_build.rb index a8c32e7..ed768f9 100644 --- a/lib/syskit/log/datastore/index_build.rb +++ b/lib/syskit/log/datastore/index_build.rb @@ -77,7 +77,7 @@ def rebuild_pocolog_indexes( def rebuild_roby_index(force: false, reporter: Pocolog::CLI::NullReporter.new) dataset.cache_path.mkpath event_logs = Syskit::Log.logfiles_in_dir(dataset.dataset_path) - event_logs.each do |roby_log_path| + event_logs = event_logs.find_all do |roby_log_path| rebuild_roby_own_index( roby_log_path, force: force, reporter: reporter ) @@ -89,10 +89,14 @@ def rebuild_roby_index(force: false, reporter: Pocolog::CLI::NullReporter.new) # @api private # # Rebuild Roby's own index file + # + # @return [Boolean] true if the log file is valid and has a valid index, + # false otherwise (e.g. if the log file format is too old) def rebuild_roby_own_index( roby_log_path, force: false, reporter: Pocolog::CLI::NullReporter.new ) - roby_index_path = dataset.cache_path + roby_log_path.sub_ext(".idx") + roby_index_path = + dataset.cache_path + roby_log_path.basename.sub_ext(".idx") needs_rebuild = force || !Roby::DRoby::Logfile::Index.valid_file?( @@ -100,7 +104,7 @@ def rebuild_roby_own_index( ) unless needs_rebuild reporter.log " up-to-date: #{roby_log_path.basename}" - return + return true end reporter.log " rebuilding: #{roby_log_path.basename}" @@ -108,9 +112,11 @@ def rebuild_roby_own_index( Roby::DRoby::Logfile::Index.rebuild_file( roby_log_path, roby_index_path ) + true rescue Roby::DRoby::Logfile::InvalidFormatVersion reporter.warn " #{roby_log_path.basename} is in an obsolete Roby "\ "log file format, skipping" + false end end diff --git a/lib/syskit/log/datastore/normalize.rb b/lib/syskit/log/datastore/normalize.rb index 4c16f17..9f77076 100644 --- a/lib/syskit/log/datastore/normalize.rb +++ b/lib/syskit/log/datastore/normalize.rb @@ -62,7 +62,7 @@ def create_block_stream end def add_data_block(rt_time, lg_time, raw_data, raw_payload) - @index_map << @tell << lg_time + @index_map << (@tell + @buffer.size) << lg_time write raw_data[0, 2] write ZERO_BYTE write raw_data[4..-1] @@ -111,23 +111,21 @@ def initialize def normalize( paths, - output_path: paths.first.dirname + 'normalized', + output_path: paths.first.dirname + "normalized", index_dir: output_path, reporter: Pocolog::CLI::NullReporter.new, compute_sha256: false ) output_path.mkpath paths.each do |logfile_path| - e, out_io = normalize_logfile(logfile_path, output_path, reporter: reporter, compute_sha256: compute_sha256) + e, out_io = normalize_logfile( + logfile_path, output_path, + reporter: reporter, compute_sha256: compute_sha256 + ) + if e - warn "normalize: exception caught while processing #{logfile_path}, deleting #{out_io.size} output files: #{out_io.map(&:path).sort.join(", ")}" - out_io.each do |output| - out_files.delete(output.path) - output.close - - Pathname.new(output.path).unlink - index_path = Pathname.new(Pocolog::Logfiles.default_index_filename(output.path, index_dir: index_dir)) - index_path.unlink if index_path.exist? - end + normalize_cleanup_after_error( + reporter, logfile_path, out_io, index_dir + ) raise e end end @@ -156,6 +154,26 @@ def normalize( out_files.each_value(&:close) end + def normalize_cleanup_after_error(reporter, logfile_path, out_io, index_dir) + reporter.warn( + "normalize: exception caught while processing "\ + "#{logfile_path}, deleting #{out_io.size} output files: "\ + "#{out_io.map(&:path).sort.join(', ')}" + ) + out_io.each do |output| + out_files.delete(output.path) + output.close + + Pathname.new(output.path).unlink + index_path = Pathname.new( + Pocolog::Logfiles.default_index_filename( + output.path, index_dir: index_dir + ) + ) + index_path.unlink if index_path.exist? + end + end + NormalizationState = Struct .new(:out_io_streams, :control_blocks, :followup_stream_time) do diff --git a/lib/syskit/log/dsl.rb b/lib/syskit/log/dsl.rb index 8ad0156..e29ecbe 100644 --- a/lib/syskit/log/dsl.rb +++ b/lib/syskit/log/dsl.rb @@ -215,10 +215,10 @@ def __try_resolve_timepoint(obj) # @raise ArgumentError if the object cannot be interpreted as a timepoint # @see __try_resolve_timepoint def __resolve_timepoint(obj, interval_index) - if (result = __try_resolve_timepoint(obj)) - result - elsif (interval = __try_resolve_interval(obj)) + if (interval = __try_resolve_interval(obj)) interval[interval_index] + elsif (result = __try_resolve_timepoint(obj)) + result else raise ArgumentError, "cannot resolve #{obj} as a timepoint" end @@ -351,6 +351,9 @@ def interval_shift_end(offset) TIME_EPSILON = 1/1_000_000r # Convert fields of a data stream into a Daru frame + # + # @param [Array] streams an array if objects that can be converted to + # samples using {#samples_of} def to_daru_frame(*streams, timeout: nil) interval_start, interval_end = streams.map(&:interval_lg).transpose interval_start = interval_start.min @@ -368,7 +371,7 @@ def to_daru_frame(*streams, timeout: nil) if builders.size == 1 builders.first.to_daru_frame( - @interval_zero_time, samples, timeout: timeout + @interval_zero_time, samples.first, timeout: timeout ) else joint_stream = Pocolog::StreamAligner.new(false, *samples) diff --git a/lib/syskit/log/roby_sql_index/accessors.rb b/lib/syskit/log/roby_sql_index/accessors.rb index d25cd8d..d6f5635 100644 --- a/lib/syskit/log/roby_sql_index/accessors.rb +++ b/lib/syskit/log/roby_sql_index/accessors.rb @@ -125,8 +125,8 @@ def method_missing(m, *args, **kw, &block) event_name = m_to_s[0..-7] unless @index.event_with_name?(event_name) - raise NoMethodError.new(m), - "no events named #{event_name} have been emitted" + msg = "no events named #{event_name} have been emitted" + raise NoMethodError.new(msg, m) end has_events = @@ -135,9 +135,9 @@ def method_missing(m, *args, **kw, &block) .exist? unless has_events - raise NoMethodError.new("", m), - "there are emitted events named #{event_name}, but "\ + msg = "there are emitted events named #{event_name}, but "\ "not for a task of model #{@name}" + raise NoMethodError.new(msg, m) end EventModel.new(@index, event_name, self) @@ -295,6 +295,10 @@ def ==(other) other.kind_of?(Event) && other.id == id end + def full_name + "#{task.model.name}.#{name}" + end + def initialize(index, id, time, name, task, model) # rubocop:disable Metrics/ParameterLists @index = index @id = id diff --git a/lib/syskit/log/streams.rb b/lib/syskit/log/streams.rb index deb0556..42f2211 100644 --- a/lib/syskit/log/streams.rb +++ b/lib/syskit/log/streams.rb @@ -91,10 +91,13 @@ def each_task( task_m = Syskit::TaskContext .find_model_from_orogen_name(task_model_name) end + + if !task_m && raise_on_missing_task_models + raise OroGen::NotFound, "cannot find #{task_model_name}" + end + if task_m || !skip_tasks_without_models available_tasks[s.metadata['rock_task_name']] << s - elsif raise_on_missing_task_models - raise OroGen::NotFound, "cannot find #{task_model_name}" else ignored_streams[task_model_name] << s end @@ -213,7 +216,7 @@ def self.sanitize_metadata(metadata, stream_name: nil) metadata.delete("rock_task_model") end - if model.start_with?("OroGen.") + if model&.start_with?("OroGen.") normalized_model = model[7..-1].gsub(".", "::") Syskit::Log.warn( diff --git a/lib/syskit/pocolog.rb b/lib/syskit/pocolog.rb deleted file mode 100644 index 72c459f..0000000 --- a/lib/syskit/pocolog.rb +++ /dev/null @@ -1,55 +0,0 @@ -require "pocolog" -require "syskit" - -module Syskit - module Log - extend Logger::Root('Syskit::Log', Logger::WARN) - end -end - -require 'digest/sha2' -require "metaruby/dsls/find_through_method_missing" -require 'pocolog/cli/null_reporter' -require "syskit/log/version" -require "syskit/log/exceptions" -require "syskit/log/lazy_data_stream" -require "syskit/log/streams" -require "syskit/log/task_streams" -require "syskit/log/rock_stream_matcher" - -require "syskit/log/models/deployment" -require "syskit/log/deployment" -require "syskit/log/models/replay_task_context" -require "syskit/log/replay_task_context" -require "syskit/log/replay_manager" - -require 'syskit/log/extensions' -require 'syskit/log/shell_interface' -require 'syskit/log/registration_namespace' -require 'syskit/log/plugin' - -require 'syskit/log/datastore' - -module Syskit - module Log - # Returns the paths of the log files in a given directory - # - # The returned paths are sorted in 'pocolog' order, i.e. multi-IO files are - # following each other in the order of their place in the overall IO - # sequence - # - # @param [Pathname] dir_path path to the directory - def self.logfiles_in_dir(dir_path) - path = Pathname.new(dir_path).realpath - - paths = Array.new - Pathname.glob(path + '*.*.log') do |path| - basename = path.basename - if basename.to_s =~ /(.*)\.(\d+)\.log$/ - paths << [$1, Integer($2), path] - end - end - paths.sort.map { |_, _, path| path } - end - end -end diff --git a/test/cli/datastore_test.rb b/test/cli/datastore_test.rb index 0a479f8..386c99d 100644 --- a/test/cli/datastore_test.rb +++ b/test/cli/datastore_test.rb @@ -45,7 +45,7 @@ def call_cli(*args, silent: true) flexmock(datastore_m::Import) .new_instances.should_receive(:normalize_dataset) .with( - logfile_pathname, incoming_path + 'core', + [logfile_pathname], incoming_path + 'core', on do |h| h[:cache_path] == incoming_path + 'cache' && h[:reporter].kind_of?(Pocolog::CLI::NullReporter) @@ -59,13 +59,7 @@ def call_cli(*args, silent: true) end flexmock(datastore_m::Import) .new_instances.should_receive(:move_dataset_to_store) - .with( - logfile_pathname, expected_dataset, - on do |h| - h[:force] == false && - h[:reporter].kind_of?(Pocolog::CLI::NullReporter) - end - ) + .with(expected_dataset) .once.pass_thru call_cli('import', '--min-duration=0', @@ -106,7 +100,7 @@ def call_cli(*args, silent: true) flexmock(datastore_m::Import) .new_instances.should_receive(:normalize_dataset) .with( - logfile_pathname, incoming_path + 'core', + [logfile_pathname], incoming_path + 'core', on do |h| h[:cache_path] == incoming_path + 'cache' && h[:reporter].kind_of?(Pocolog::CLI::NullReporter) @@ -120,14 +114,8 @@ def call_cli(*args, silent: true) end flexmock(datastore_m::Import) - .new_instances.should_receive(:move_dataset_to_store). - with( - logfile_pathname, expected_dataset, - on do |h| - h[:force] == false && - h[:reporter].kind_of?(Pocolog::CLI::NullReporter) - end - ) + .new_instances.should_receive(:move_dataset_to_store) + .with(expected_dataset) .once.pass_thru call_cli('import', '--auto', '--min-duration=0', @@ -146,10 +134,12 @@ def call_cli(*args, silent: true) call_cli('import', '--auto', '--min-duration=0', '--store', datastore_path.to_s, logfile_pathname.dirname.to_s, silent: true) - flexmock(datastore_m::Import).new_instances.should_receive(:normalize_dataset). - never - flexmock(datastore_m::Import).new_instances.should_receive(:move_dataset_to_store). - never + flexmock(datastore_m::Import) + .new_instances.should_receive(:normalize_dataset) + .never + flexmock(datastore_m::Import) + .new_instances.should_receive(:move_dataset_to_store) + .never out, = capture_io do call_cli('import', '--auto', '--min-duration=0', '--store', datastore_path.to_s, @@ -168,17 +158,17 @@ def call_cli(*args, silent: true) call_cli('import', '--auto', '--min-duration=0', '--store', datastore_path.to_s, logfile_pathname.dirname.to_s, silent: true) - flexmock(datastore_m::Import).new_instances.should_receive(:normalize_dataset). - once.pass_thru - flexmock(datastore_m::Import).new_instances.should_receive(:move_dataset_to_store). - once.pass_thru - out, = capture_io do + flexmock(datastore_m::Import) + .new_instances.should_receive(:normalize_dataset) + .once.pass_thru + flexmock(datastore_m::Import) + .new_instances.should_receive(:move_dataset_to_store) + . once.pass_thru + capture_io do call_cli('import', '--auto', '--min-duration=0', '--force', '--store', datastore_path.to_s, logfile_pathname.dirname.to_s, silent: false) end - assert_match /#{logfile_pathname} seem to have already been imported but --force is given, overwriting/, - out end it "ignores datasets that do not seem to be already imported, but are" do create_logfile('test.0.log') do @@ -190,11 +180,14 @@ def call_cli(*args, silent: true) call_cli('import', '--auto', '--min-duration=0', '--store', datastore_path.to_s, logfile_pathname.dirname.to_s, silent: true) - (logfile_pathname + datastore_m::Import::BASENAME_IMPORT_TAG).unlink - flexmock(datastore_m::Import).new_instances.should_receive(:normalize_dataset). - once.pass_thru - flexmock(datastore_m::Import).new_instances.should_receive(:move_dataset_to_store). - once.pass_thru + (logfile_pathname + datastore_m::Import::BASENAME_IMPORT_TAG) + .unlink + flexmock(datastore_m::Import) + .new_instances.should_receive(:normalize_dataset) + .once.pass_thru + flexmock(datastore_m::Import) + .new_instances.should_receive(:move_dataset_to_store) + .never out, = capture_io do call_cli('import', '--auto', '--min-duration=0', '--store', datastore_path.to_s, @@ -217,10 +210,12 @@ def call_cli(*args, silent: true) marker_path = datastore.core_path_of(digest) + "marker" FileUtils.touch(marker_path) (logfile_pathname + datastore_m::Import::BASENAME_IMPORT_TAG).unlink - flexmock(datastore_m::Import).new_instances.should_receive(:normalize_dataset). - once.pass_thru - flexmock(datastore_m::Import).new_instances.should_receive(:move_dataset_to_store). - once.pass_thru + flexmock(datastore_m::Import) + .new_instances.should_receive(:normalize_dataset) + .once.pass_thru + flexmock(datastore_m::Import) + .new_instances.should_receive(:move_dataset_to_store) + .once.pass_thru out, = capture_io do call_cli('import', '--auto', '--force', '--min-duration=0', '--store', datastore_path.to_s, @@ -229,20 +224,23 @@ def call_cli(*args, silent: true) assert_match /Replacing existing dataset #{digest} with new one/, out refute marker_path.exist? end - it "ignores an empty dataset if --min-duration is non-zero" do + it "ignores an empty dataset after normalization if --min-duration "\ + "is non-zero" do create_logfile('test.0.log') {} FileUtils.touch logfile_path('test-events.log') - incoming_path = datastore_path + 'incoming' + '0' - flexmock(datastore_m::Import).new_instances.should_receive(:normalize_dataset). - once.pass_thru - flexmock(datastore_m::Import).new_instances.should_receive(:move_dataset_to_store). - never + flexmock(datastore_m::Import) + .new_instances.should_receive(:normalize_dataset) + .once.pass_thru + flexmock(datastore_m::Import) + .new_instances.should_receive(:move_dataset_to_store) + .never call_cli('import', '--auto', '--min-duration=1', '--store', datastore_path.to_s, logfile_pathname.dirname.to_s, silent: true) end - it "ignores datasets whose logical duration is lower than --min-duration" do + it "ignores datasets whose logical duration is "\ + "lower than --min-duration" do create_logfile('test.0.log') do create_logfile_stream( 'test', metadata: { 'rock_task_name' => 'task', @@ -252,7 +250,6 @@ def call_cli(*args, silent: true) write_logfile_sample Time.now + 10, Time.now + 1, 20 end FileUtils.touch logfile_path('test-events.log') - incoming_path = datastore_path + 'incoming' + '0' flexmock(datastore_m::Import) .new_instances.should_receive(:normalize_dataset) .once.pass_thru diff --git a/test/datastore/dataset_test.rb b/test/datastore/dataset_test.rb index 31c134f..4928dbb 100644 --- a/test/datastore/dataset_test.rb +++ b/test/datastore/dataset_test.rb @@ -27,7 +27,7 @@ def dataset_pathname(*names) metadata: Hash['rock_task_name' => 'task0', 'rock_task_object_name' => 'port'] end FileUtils.touch dataset_pathname('text', 'test.txt') - dataset_pathname('roby-events.log').open('w') { |io| io.write "ROBY" } + dataset_pathname('roby-events.0.log').open('w') { |io| io.write "ROBY" } FileUtils.touch dataset_pathname('ignored', 'not_recognized_file') dataset_pathname('ignored', 'not_recognized_dir').mkpath FileUtils.touch dataset_pathname('ignored', 'not_recognized_dir', 'test') @@ -58,8 +58,9 @@ def dataset_pathname(*names) it "lists the full paths to the pocolog and roby files" do files = dataset.each_important_file.to_set expected = [ - dataset_pathname('roby-events.log'), - dataset_pathname('pocolog', 'task0::port.0.log')].to_set + dataset_pathname('roby-events.0.log'), + dataset_pathname('pocolog', 'task0::port.0.log') + ].to_set assert_equal expected, files end end @@ -116,7 +117,7 @@ def dataset_pathname(*names) describe "#compute_dataset_identity_from_files" do it "returns a list of entries with full path, size and sha256 digest" do - roby_path = dataset_pathname('roby-events.log') + roby_path = dataset_pathname('roby-events.0.log') roby_digest = Digest::SHA256.hexdigest(roby_path.read) pocolog_path = dataset_pathname('pocolog', 'task0::port.0.log') pocolog_digest = Digest::SHA256.hexdigest( @@ -316,19 +317,25 @@ def write_metadata( dataset.weak_validate_identity_metadata end it "raises if a file is missing on disk" do - dataset_pathname("roby-events.log").unlink + dataset_pathname("roby-events.0.log").unlink assert_raises(Dataset::InvalidIdentityMetadata) do dataset.weak_validate_identity_metadata end end - it "raises if a new important file is added on disk" do - FileUtils.touch dataset_pathname("test-events.log") + it "raises if a new pocolog log file is added on disk" do + FileUtils.touch dataset_pathname("pocolog", "some::Task.0.log") + assert_raises(Dataset::InvalidIdentityMetadata) do + dataset.weak_validate_identity_metadata + end + end + it "raises if a new Roby log file is added on disk" do + FileUtils.touch dataset_pathname("roby-events.1.log") assert_raises(Dataset::InvalidIdentityMetadata) do dataset.weak_validate_identity_metadata end end it "raises if a file size mismatches" do - dataset_pathname("roby-events.log").open('a') { |io| io.write('10') } + dataset_pathname("roby-events.0.log").open('a') { |io| io.write('10') } assert_raises(Dataset::InvalidIdentityMetadata) do dataset.weak_validate_identity_metadata end @@ -343,25 +350,31 @@ def write_metadata( dataset.validate_identity_metadata end it "raises if a file is missing on disk" do - dataset_pathname("roby-events.log").unlink + dataset_pathname("roby-events.0.log").unlink assert_raises(Dataset::InvalidIdentityMetadata) do dataset.validate_identity_metadata end end - it "raises if a new important file is added on disk" do - FileUtils.touch dataset_pathname("test-events.log") + it "raises if a new pocolog log file is added on disk" do + FileUtils.touch dataset_pathname("pocolog", "some::Task.0.log") + assert_raises(Dataset::InvalidIdentityMetadata) do + dataset.validate_identity_metadata + end + end + it "raises if a new Roby log file is added on disk" do + FileUtils.touch dataset_pathname("roby-events.1.log") assert_raises(Dataset::InvalidIdentityMetadata) do dataset.validate_identity_metadata end end it "raises if a file size mismatches" do - dataset_pathname("roby-events.log").open('a') { |io| io.write('10') } + dataset_pathname("roby-events.0.log").open('a') { |io| io.write('10') } assert_raises(Dataset::InvalidIdentityMetadata) do dataset.validate_identity_metadata end end it "raises if the contents of a file changed" do - dataset_pathname("roby-events.log").open('a') { |io| io.seek(5); io.write('0') } + dataset_pathname("roby-events.0.log").open('a') { |io| io.seek(5); io.write('0') } assert_raises(Dataset::InvalidIdentityMetadata) do dataset.validate_identity_metadata end @@ -524,7 +537,7 @@ def write_metadata( end it "loads stream information and returns LazyDataStream objects" do - streams = dataset.read_lazy_data_streams + streams = dataset.read_lazy_data_streams.sort_by(&:name).reverse assert_equal ['test', 'other_test'], streams.map(&:name) assert_equal [int32_t, double_t], streams.map(&:type) assert_equal [Hash['rock_task_name' => 'task0', 'rock_task_object_name' => 'port'], @@ -539,9 +552,11 @@ def write_metadata( it "sets up the lazy data stream to load the actual stream properly" do lazy_streams = dataset.read_lazy_data_streams - flexmock(Pocolog::Logfiles).new_instances. - should_receive(:rebuild_and_load_index).never - streams = lazy_streams.map(&:syskit_eager_load) + flexmock(Pocolog::Logfiles) + .new_instances + .should_receive(:rebuild_and_load_index).never + streams = lazy_streams.map(&:syskit_eager_load).sort_by(&:name) + .reverse assert_equal ['test', 'other_test'], streams.map(&:name) assert_equal [int32_t, double_t], streams.map(&:type) assert_equal [Hash['rock_task_name' => 'task0', 'rock_task_object_name' => 'port'], diff --git a/test/datastore/import_test.rb b/test/datastore/import_test.rb index 8e47a79..648a148 100644 --- a/test/datastore/import_test.rb +++ b/test/datastore/import_test.rb @@ -24,15 +24,18 @@ class Datastore FileUtils.touch(file0_1 = logfile_pathname('file0.1.log')) FileUtils.touch(file0_0 = logfile_pathname('file0.0.log')) FileUtils.touch(file1_0 = logfile_pathname('file1.0.log')) - assert_equal [[file0_0, file0_1, file1_0], [], nil, []], import.prepare_import(logfile_pathname) + assert_equal [[file0_0, file0_1, file1_0], [], [], []], + import.prepare_import(logfile_pathname) end it "lists the test files that should be copied" do FileUtils.touch(path = logfile_pathname('file0.txt')) - assert_equal [[], [path], nil, []], import.prepare_import(logfile_pathname) + assert_equal [[], [path], [], []], + import.prepare_import(logfile_pathname) end it "lists the Roby log files that should be copied" do FileUtils.touch(path = logfile_pathname('test-events.log')) - assert_equal [[], [], path, []], import.prepare_import(logfile_pathname) + assert_equal [[], [], [path], []], + import.prepare_import(logfile_pathname) end it "raises if more than one file looks like a roby log file" do FileUtils.touch(logfile_pathname('test-events.log')) @@ -45,20 +48,21 @@ class Datastore it "ignores pocolog's index files" do FileUtils.touch(path = logfile_pathname('file0.1.log')) FileUtils.touch(logfile_pathname('file0.1.idx')) - assert_equal [[path], [], nil, []], import.prepare_import(logfile_pathname) + assert_equal [[path], [], [], []], + import.prepare_import(logfile_pathname) end it "ignores Roby index files" do FileUtils.touch(path = logfile_pathname('test-events.log')) FileUtils.touch(logfile_pathname('test-index.log')) - assert_equal [[], [], path, []], import.prepare_import(logfile_pathname) + assert_equal [[], [], [path], []], import.prepare_import(logfile_pathname) end it "lists unrecognized files" do FileUtils.touch(path = logfile_pathname('not_matching')) - assert_equal [[], [], nil, [path]], import.prepare_import(logfile_pathname) + assert_equal [[], [], [], [path]], import.prepare_import(logfile_pathname) end it "lists unrecognized directories" do (path = logfile_pathname('not_matching')).mkpath - assert_equal [[], [], nil, [path]], import.prepare_import(logfile_pathname) + assert_equal [[], [], [], [path]], import.prepare_import(logfile_pathname) end end @@ -81,14 +85,14 @@ def tty_reporter it 'can import an empty folder' do Dir.mktmpdir do |dir| - import.import(Pathname.new(dir)) + import.import([Pathname.new(dir)]) end end it "moves the results under the dataset's ID" do flexmock(Dataset).new_instances.should_receive(:compute_dataset_digest). and_return('ABCDEF') - import_dir = import.import(logfile_pathname) + import_dir = import.import([logfile_pathname]).dataset_path assert_equal(datastore_path + 'core' + 'ABCDEF', import_dir) end it 'raises if the target dataset ID already exists' do @@ -96,7 +100,7 @@ def tty_reporter and_return('ABCDEF') (datastore_path + 'core' + 'ABCDEF').mkpath assert_raises(Import::DatasetAlreadyExists) do - import.import(logfile_pathname) + import.import([logfile_pathname]) end end it "replaces the current dataset by the new one if the ID already exists but 'force' is true" do @@ -108,7 +112,7 @@ def tty_reporter FileUtils.touch (datastore_path + 'core' + digest + 'file') out, = capture_io do import.import( - logfile_pathname, reporter: tty_reporter, force: true + [logfile_pathname], reporter: tty_reporter, force: true ) end assert_match /Replacing existing dataset #{digest} with new one/, out @@ -119,7 +123,7 @@ def tty_reporter # path that reports progress, but checks nothing except the lack # of exceptions capture_io do - import.import(logfile_pathname) + import.import([logfile_pathname]) end end it 'normalizes the pocolog logfiles' do @@ -130,21 +134,21 @@ def tty_reporter flexmock(Syskit::Log::Datastore).should_receive(:normalize). with([logfile_pathname('test.0.log')], expected_normalize_args).once. pass_thru - import_dir = import.import(logfile_pathname) - assert (import_dir + 'pocolog' + 'task0::port.0.log').exist? + dataset = import.import([logfile_pathname]) + assert (dataset.dataset_path + 'pocolog' + 'task0::port.0.log').exist? end it "copies the text files" do - import_dir = import.import(logfile_pathname) + import_dir = import.import([logfile_pathname]).dataset_path assert logfile_pathname('test.txt').exist? assert (import_dir + 'text' + 'test.txt').exist? end - it "copies the roby log files into roby-events.log" do - import_dir = import.import(logfile_pathname) + it "copies the roby log files into roby-events.N.log" do + import_dir = import.import([logfile_pathname]).dataset_path assert logfile_pathname('test-events.log').exist? - assert (import_dir + 'roby-events.log').exist? + assert (import_dir + 'roby-events.0.log').exist? end it "copies the unrecognized files" do - import_dir = import.import(logfile_pathname) + import_dir = import.import([logfile_pathname]).dataset_path assert logfile_pathname('not_recognized_file').exist? assert logfile_pathname('not_recognized_dir').exist? @@ -159,20 +163,23 @@ def tty_reporter logfile_pathname("info.yml").open('w') do |io| YAML.dump(roby_metadata, io) end - import_dir = import.import(logfile_pathname) - assert_equal Hash['roby:app_name' => Set['test']], Dataset.new(import_dir).metadata + dataset = import.import([logfile_pathname]) + assert_equal({ 'roby:app_name' => Set['test'] }, dataset.metadata) + assert_equal({ 'roby:app_name' => Set['test'] }, + Dataset.new(dataset.dataset_path).metadata) end it "ignores the Roby metadata if it cannot be loaded" do logfile_pathname("info.yml").open('w') do |io| io.write "%invalid_yaml" end - import_dir = nil + imported = nil _out, err = capture_io do - import_dir = import.import(logfile_pathname) + imported = import.import([logfile_pathname]) end assert_match /failed to load Roby metadata/, err - assert_equal Hash[], Dataset.new(import_dir).metadata + assert_equal({}, imported.metadata) + assert_equal({}, Dataset.new(imported.dataset_path).metadata) end end @@ -182,12 +189,14 @@ def tty_reporter end it "returns the import information of an imported directory" do - path = Timecop.freeze(base_time = Time.now) do - import.import(logfile_pathname) - end - digest, time = Import.find_import_info(logfile_pathname) - assert_equal digest, path.basename.to_s - assert_equal base_time, time + dataset = flexmock(dataset_path: logfile_pathname, + digest: "something") + + dir = Pathname(make_tmpdir) + Import.save_import_info(dir, dataset, time: (t = Time.now)) + digest, time = Import.find_import_info(dir) + assert_equal digest, dataset.digest + assert_equal t, time end end end diff --git a/test/datastore/index_build_test.rb b/test/datastore/index_build_test.rb index 196d56a..01639b1 100644 --- a/test/datastore/index_build_test.rb +++ b/test/datastore/index_build_test.rb @@ -98,13 +98,13 @@ def cache_path include Roby::Test::DRobyLogHelpers before do - droby_create_event_log((dataset_path + 'roby-events.log').to_s) do + droby_create_event_log((dataset_path + 'roby-events.0.log').to_s) do droby_write_event :test, 10 end end it 'does nothing if there are no roby indexes' do - (dataset_path + 'roby-events.log').unlink + (dataset_path + 'roby-events.0.log').unlink index_build.rebuild_roby_index end @@ -115,40 +115,42 @@ def cache_path it 'does nothing if a valid index file exists' do cache_path.mkpath Roby::DRoby::Logfile::Index.rebuild_file( - dataset_path + 'roby-events.log', - cache_path + 'roby-events.idx' + dataset_path + 'roby-events.0.log', + cache_path + 'roby-events.0.idx' ) - flexmock(Roby::DRoby::Logfile::Index).should_receive(:rebuild).never + flexmock(Roby::DRoby::Logfile::Index) + .should_receive(:rebuild_file).never index_build.rebuild_roby_index end it 'rebuilds if a valid index file exists but force is true' do cache_path.mkpath Roby::DRoby::Logfile::Index.rebuild_file( - dataset_path + 'roby-events.log', - cache_path + 'roby-events.idx' + dataset_path + 'roby-events.0.log', + cache_path + 'roby-events.0.idx' ) flexmock(Roby::DRoby::Logfile::Index) - .should_receive(:rebuild).once.pass_thru + .should_receive(:rebuild_file).once.pass_thru index_build.rebuild_roby_index(force: true) end it 'rebuilds if no index file exists' do flexmock(Roby::DRoby::Logfile::Index) - .should_receive(:rebuild) + .should_receive(:rebuild_file) .once.pass_thru index_build.rebuild_roby_index assert Roby::DRoby::Logfile::Index.valid_file?( - dataset_path + 'roby-events.log', - cache_path + 'roby-events.idx' + dataset_path + 'roby-events.0.log', + cache_path + 'roby-events.0.idx' ) end it 'skips the roby file if its format is not current' do - (dataset_path + 'roby-events.log').open('w') do |io| + (dataset_path + 'roby-events.0.log').open('w') do |io| Roby::DRoby::Logfile.write_header(io, version: 0) end reporter = flexmock(Pocolog::CLI::NullReporter.new) - reporter - .should_receive(:warn).once - .with(/roby-events.log is in an obsolete Roby log file format, skipping/) + reporter.should_receive(:warn) + .with(" roby-events.0.log is in an obsolete "\ + "Roby log file format, skipping") + .once index_build.rebuild_roby_index(reporter: reporter) end diff --git a/test/datastore/normalize_test.rb b/test/datastore/normalize_test.rb index ea2e0de..057b094 100644 --- a/test/datastore/normalize_test.rb +++ b/test/datastore/normalize_test.rb @@ -24,10 +24,16 @@ class Datastore logfile_pathname('normalized').mkdir normalize.normalize([logfile_pathname('file0.0.log')]) normalized_dir = logfile_pathname('normalized') - stream = open_logfile_stream (normalized_dir + "task0::port.0.log"), 'stream0' - assert_equal [[base_time + 2, base_time + 20, 2]], stream.samples.to_a - stream = open_logfile_stream (normalized_dir + "task1::port.0.log"), 'stream1' - assert_equal [[base_time + 1, base_time + 10, 1]], stream.samples.to_a + stream = open_logfile_stream( + normalized_dir + "task0::port.0.log", "task0.port" + ) + assert_equal [[base_time + 2, base_time + 20, 2]], + stream.samples.to_a + stream = open_logfile_stream( + normalized_dir + "task1::port.0.log", "task1.port" + ) + assert_equal [[base_time + 1, base_time + 10, 1]], + stream.samples.to_a end it "generates valid index files for the normalized streams" do logfile_pathname('normalized').mkdir @@ -36,8 +42,8 @@ class Datastore should_receive(:rebuild_and_load_index). never normalized_dir = logfile_pathname('normalized') - open_logfile_stream (normalized_dir + "task0::port.0.log"), 'stream0' - open_logfile_stream (normalized_dir + "task1::port.0.log"), 'stream1' + open_logfile_stream (normalized_dir + "task0::port.0.log"), 'task0.port' + open_logfile_stream (normalized_dir + "task1::port.0.log"), 'task1.port' end it "allows to specify the cache directory" do logfile_pathname('normalized').mkdir @@ -47,8 +53,8 @@ class Datastore should_receive(:rebuild_and_load_index). never normalized_dir = logfile_pathname('normalized') - open_logfile_stream (normalized_dir + "task0::port.0.log"), 'stream0', index_dir: index_dir - open_logfile_stream (normalized_dir + "task1::port.0.log"), 'stream1', index_dir: index_dir + open_logfile_stream (normalized_dir + "task0::port.0.log"), 'task0.port', index_dir: index_dir + open_logfile_stream (normalized_dir + "task1::port.0.log"), 'task1.port', index_dir: index_dir end describe "digest generation" do it "optionally computes the sha256 digest of the generated file, without the prologue" do @@ -66,8 +72,8 @@ class Datastore should_receive(:rebuild_and_load_index). never normalized_dir = logfile_pathname('normalized') - open_logfile_stream (normalized_dir + "task0::port.0.log"), 'stream0' - open_logfile_stream (normalized_dir + "task1::port.0.log"), 'stream1' + open_logfile_stream (normalized_dir + "task0::port.0.log"), 'task0.port' + open_logfile_stream (normalized_dir + "task1::port.0.log"), 'task1.port' end end it "detects followup streams" do @@ -77,7 +83,7 @@ class Datastore end normalize.normalize([logfile_pathname('file0.0.log'), logfile_pathname('file0.1.log')]) normalized_dir = logfile_pathname('normalized') - stream = open_logfile_stream (normalized_dir + "task0::port.0.log"), 'stream0' + stream = open_logfile_stream (normalized_dir + "task0::port.0.log"), 'task0.port' assert_equal [[base_time + 2, base_time + 20, 2], [base_time + 3, base_time + 30, 3]], stream.samples.to_a end diff --git a/test/streams_test.rb b/test/streams_test.rb index f39ce70..71157de 100644 --- a/test/streams_test.rb +++ b/test/streams_test.rb @@ -64,8 +64,8 @@ module Syskit::Log end flexmock(Syskit::Log) .should_receive(:warn) - .with('removing empty metadata property "rock_task_model" '\ - 'from /stream0') + .with("removing empty metadata property 'rock_task_model' "\ + "from /stream0") .once stream = open_logfile_stream('test.0.log', '/stream0') subject.add_stream(stream) @@ -238,33 +238,60 @@ def should_warn(matcher) flexmock(app).should_receive(:using_task_library).never should_warn /ignored 2 streams.*project::Task.*\/test0, \/test1/ should_warn /ignored.*other_project::Task.*other_project/ - assert_equal [], subject.each_task(load_models: false).to_a + + tasks = subject.each_task( + load_models: false, skip_tasks_without_models: true + ).to_a + assert_equal [], tasks + end + + it "does enumerate tasks without models if "\ + "skip_tasks_without_models is false" do + flexmock(app).should_receive(:using_task_library).never + tasks = subject.each_task(load_models: false, + skip_tasks_without_models: false) + assert_equal %w[task other_task], tasks.map(&:task_name) + end + + it "does enumerate tasks without models if skip_tasks_without_models is "\ + "false even if it tries and fails to load the model" do + tasks = subject.each_task(load_models: true, + skip_tasks_without_models: false) + assert_equal %w[task other_task], tasks.map(&:task_name) end it "attempts to load the model's project if load_models is true" do - loader = flexmock - loader.should_receive(:project_model_from_name).once. - with('project'). - and_return { Syskit::TaskContext.new_submodel(orogen_model_name: 'project::Task') } - loader.should_receive(:project_model_from_name).once. - with('other_project'). - pass_thru + loader = Roby.app.default_loader + project_m = OroGen::Spec::Project.new(loader) + project_m.name "project" + project_m.import_types_from "std" + project_m.task_context "Task" + loader.register_project_model(project_m) + should_warn /ignored 1 stream.*other_project::Task.*other_project/ - assert_equal ['task'], subject.each_task(load_models: true, loader: loader).map(&:task_name) + tasks = subject.each_task( + load_models: true, skip_tasks_without_models: true, loader: loader + ) + + assert_equal ["task"], tasks.map(&:task_name) end it "raises if the task project's cannot be found and raise_on_missing_task_models is true" do loader = OroGen::Loaders::Aggregate.new assert_raises(OroGen::ProjectNotFound) do - subject.each_task(raise_on_missing_task_models: true, loader: loader).to_a + subject.each_task(load_models: true, loader: loader, + raise_on_missing_task_models: true).to_a end end it "raises if the task is not present in its project and raise_on_missing_task_models is true" do loader = flexmock + project_m = OroGen::Spec::Project.new(loader) loader.should_receive(:project_model_from_name) + .and_return(project_m) assert_raises(OroGen::NotFound) do - subject.each_task(raise_on_missing_task_models: true, loader: loader).to_a + subject.each_task(load_models: true, loader: loader, + raise_on_missing_task_models: true).to_a end end