Skip to content

Commit 20cfc02

Browse files
Bugfix for writer not being able to open again after closing (ros2#1599)
* re-applies fixes from ros2#1590 to rolling. Also removes new message definition in sequential writer test for multiple open operations. Also clears topic_names_to_message_definitions_ and handles message_definitions_s underlying container similarly. Lastly, also avoids reset of factory in the compression writer, adds unit test there too. Signed-off-by: Yannick Schulz <[email protected]> * removes unused compressor_ member from compresser writer class. Also delegates rest of the closing behavior to the base class in close method, as it is handled in the open and write methods of the compression writer Signed-off-by: Yannick Schulz <[email protected]> * Remove unrelated delta - message_definitions_ was intentionally allocated on the stack and should persist between writer close() and open() because it represents cache for message definitions which is not changes. Signed-off-by: Michael Orlov <[email protected]> * Don't call virtual methods from destructors Signed-off-by: Michael Orlov <[email protected]> * Cleanup 'rosbag2_directory_next' after the test run Signed-off-by: Michael Orlov <[email protected]> * Protect Writer::open(..) and Writer::close() with mutex on upper level - Rationale: To avoid race conditions if open(..) and close() could be ever be called from different threads. Signed-off-by: Michael Orlov <[email protected]> * Bugfix for WRITE_SPLIT callback not called for the last compressed file Signed-off-by: Michael Orlov <[email protected]> * Bugfix for lost messages from cache when closing compression writer Signed-off-by: Michael Orlov <[email protected]> * Use dedicated is_open_ flag instead of relying on storage_ Signed-off-by: Michael Orlov <[email protected]> --------- Signed-off-by: Yannick Schulz <[email protected]> Signed-off-by: Michael Orlov <[email protected]> Co-authored-by: Michael Orlov <[email protected]>
1 parent 3cc08f3 commit 20cfc02

File tree

7 files changed

+118
-29
lines changed

7 files changed

+118
-29
lines changed

rosbag2_compression/include/rosbag2_compression/sequential_compression_writer.hpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
182182

183183
private:
184184
std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory_{};
185-
std::shared_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{};
186185
std::mutex compressor_queue_mutex_;
187186
std::queue<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>
188187
compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
@@ -198,6 +197,7 @@ class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
198197
rosbag2_compression::CompressionOptions compression_options_{};
199198

200199
bool should_compress_last_file_{true};
200+
std::atomic_bool is_open_{false};
201201

202202
// Runs a while loop that pulls data from the compression queue until
203203
// compression_is_running_ is false; should be run in a separate thread

rosbag2_compression/src/rosbag2_compression/sequential_compression_writer.cpp

+25-17
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ SequentialCompressionWriter::SequentialCompressionWriter(
6565

6666
SequentialCompressionWriter::~SequentialCompressionWriter()
6767
{
68-
close();
68+
SequentialCompressionWriter::close();
6969
}
7070

7171
void SequentialCompressionWriter::compression_thread_fn()
@@ -225,13 +225,22 @@ void SequentialCompressionWriter::open(
225225
const rosbag2_storage::StorageOptions & storage_options,
226226
const rosbag2_cpp::ConverterOptions & converter_options)
227227
{
228+
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
229+
if (this->is_open_) {
230+
return; // The writer already opened.
231+
}
228232
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
229233
SequentialWriter::open(storage_options, converter_options);
230234
setup_compression();
235+
this->is_open_ = true;
231236
}
232237

233238
void SequentialCompressionWriter::close()
234239
{
240+
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
241+
if (!this->is_open_.exchange(false)) {
242+
return; // The writer is not open
243+
}
235244
if (!base_folder_.empty()) {
236245
// Reset may be called before initializing the compressor (ex. bad options).
237246
// We compress the last file only if it hasn't been compressed earlier (ex. in split_bagfile()).
@@ -241,7 +250,18 @@ void SequentialCompressionWriter::close()
241250
std::lock_guard<std::recursive_mutex> lock(storage_mutex_);
242251
std::lock_guard<std::mutex> compressor_lock(compressor_queue_mutex_);
243252
try {
244-
storage_.reset(); // Storage must be closed before it can be compressed.
253+
// Storage must be closed before file can be compressed.
254+
if (use_cache_) {
255+
// destructor will flush message cache
256+
cache_consumer_.reset();
257+
message_cache_.reset();
258+
}
259+
finalize_metadata();
260+
if (storage_) {
261+
storage_->update_metadata(metadata_);
262+
storage_.reset(); // Storage will be closed in storage_ destructor
263+
}
264+
245265
if (!metadata_.relative_file_paths.empty()) {
246266
std::string file = metadata_.relative_file_paths.back();
247267
compressor_file_queue_.push(file);
@@ -251,22 +271,10 @@ void SequentialCompressionWriter::close()
251271
ROSBAG2_COMPRESSION_LOG_WARN_STREAM("Could not compress the last bag file.\n" << e.what());
252272
}
253273
}
254-
255-
stop_compressor_threads();
256-
257-
finalize_metadata();
258-
if (storage_) {
259-
storage_->update_metadata(metadata_);
260-
}
261-
metadata_io_->write_metadata(base_folder_, metadata_);
262-
}
263-
264-
if (use_cache_) {
265-
cache_consumer_.reset();
266-
message_cache_.reset();
267274
}
268-
storage_.reset(); // Necessary to ensure that the storage is destroyed before the factory
269-
storage_factory_.reset();
275+
stop_compressor_threads(); // Note: The metadata_.relative_file_paths will be updated with
276+
// compressed filename when compressor threads will finish.
277+
SequentialWriter::close();
270278
}
271279

272280
void SequentialCompressionWriter::create_topic(

rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp

+25
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,31 @@ TEST_F(SequentialCompressionWriterTest, open_succeeds_on_supported_compression_f
222222
writer_->open(tmp_dir_storage_options_, {serialization_format_, serialization_format_}));
223223
}
224224

225+
TEST_F(SequentialCompressionWriterTest, open_succeeds_twice)
226+
{
227+
rosbag2_compression::CompressionOptions compression_options{
228+
DefaultTestCompressor, rosbag2_compression::CompressionMode::FILE,
229+
kDefaultCompressionQueueSize, kDefaultCompressionQueueThreads,
230+
kDefaultCompressionQueueThreadsPriority};
231+
initializeWriter(compression_options);
232+
233+
auto tmp_dir = fs::temp_directory_path() / "path_not_empty";
234+
auto tmp_dir_next = fs::temp_directory_path() / "path_not_empty_next";
235+
236+
auto storage_options = rosbag2_storage::StorageOptions();
237+
auto storage_options_next = rosbag2_storage::StorageOptions();
238+
239+
storage_options.uri = tmp_dir.string();
240+
storage_options_next.uri = tmp_dir_next.string();
241+
242+
EXPECT_NO_THROW(
243+
writer_->open(storage_options, {serialization_format_, serialization_format_}));
244+
245+
writer_->close();
246+
EXPECT_NO_THROW(
247+
writer_->open(storage_options_next, {serialization_format_, serialization_format_}));
248+
}
249+
225250
TEST_F(SequentialCompressionWriterTest, writer_calls_create_compressor)
226251
{
227252
rosbag2_compression::CompressionOptions compression_options{

rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
162162

163163
// Used to track topic -> message count. If cache is present, it is updated by CacheConsumer
164164
std::unordered_map<std::string, rosbag2_storage::TopicInformation> topics_names_to_info_;
165+
// Note: topics_names_to_info_ needs to be protected with mutex only when we are explicitly
166+
// adding or deleting items (create_topic(..)/remove_topic(..)) and when we access it from
167+
// CacheConsumer callback i.e., write_messages(..)
165168
std::mutex topics_info_mutex_;
166169

167170
LocalMessageDefinitionSource message_definitions_;
@@ -197,6 +200,7 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter
197200
void write_messages(
198201
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & messages);
199202
bool is_first_message_ {true};
203+
std::atomic_bool is_open_{false};
200204

201205
bag_events::EventCallbackManager callback_manager_;
202206
};

rosbag2_cpp/src/rosbag2_cpp/writer.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@ void Writer::open(
6262
const rosbag2_storage::StorageOptions & storage_options,
6363
const ConverterOptions & converter_options)
6464
{
65+
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
6566
writer_impl_->open(storage_options, converter_options);
6667
}
6768

6869
void Writer::close()
6970
{
71+
std::lock_guard<std::mutex> writer_lock(writer_mutex_);
7072
writer_impl_->close();
7173
}
7274

rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp

+28-10
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ SequentialWriter::SequentialWriter(
5959
metadata_io_(std::move(metadata_io)),
6060
converter_(nullptr),
6161
topics_names_to_info_(),
62-
message_definitions_(),
62+
topic_names_to_message_definitions_(),
6363
metadata_()
6464
{}
6565

@@ -69,7 +69,7 @@ SequentialWriter::~SequentialWriter()
6969
// Callbacks likely was created after SequentialWriter object and may point to the already
7070
// destructed objects.
7171
callback_manager_.delete_all_callbacks();
72-
close();
72+
SequentialWriter::close();
7373
}
7474

7575
void SequentialWriter::init_metadata()
@@ -97,8 +97,13 @@ void SequentialWriter::open(
9797
const rosbag2_storage::StorageOptions & storage_options,
9898
const ConverterOptions & converter_options)
9999
{
100+
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
101+
if (is_open_) {
102+
return; // The writer already opened
103+
}
100104
base_folder_ = storage_options.uri;
101105
storage_options_ = storage_options;
106+
102107
if (storage_options_.storage_id.empty()) {
103108
storage_options_.storage_id = rosbag2_storage::get_default_storage_id();
104109
}
@@ -161,10 +166,15 @@ void SequentialWriter::open(
161166

162167
init_metadata();
163168
storage_->update_metadata(metadata_);
169+
is_open_ = true;
164170
}
165171

166172
void SequentialWriter::close()
167173
{
174+
// Note. close and open methods protected with mutex on upper rosbag2_cpp::writer level.
175+
if (!is_open_.exchange(false)) {
176+
return; // The writer is not open
177+
}
168178
if (use_cache_) {
169179
// destructor will flush message cache
170180
cache_consumer_.reset();
@@ -180,13 +190,22 @@ void SequentialWriter::close()
180190
}
181191

182192
if (storage_) {
183-
auto info = std::make_shared<bag_events::BagSplitInfo>();
184-
info->closed_file = storage_->get_relative_file_path();
185193
storage_.reset(); // Destroy storage before calling WRITE_SPLIT callback to make sure that
186194
// bag file was closed before callback call.
195+
}
196+
if (!metadata_.relative_file_paths.empty()) {
197+
auto info = std::make_shared<bag_events::BagSplitInfo>();
198+
// Take the latest file name from metadata in case if it was updated after compression in
199+
// derived class
200+
info->closed_file =
201+
(fs::path(base_folder_) / metadata_.relative_file_paths.back()).generic_string();
187202
callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info);
188203
}
189-
storage_factory_.reset();
204+
205+
topics_names_to_info_.clear();
206+
topic_names_to_message_definitions_.clear();
207+
208+
converter_.reset();
190209
}
191210

192211
void SequentialWriter::create_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
@@ -226,7 +245,7 @@ void SequentialWriter::create_topic(
226245
return;
227246
}
228247

229-
if (!storage_) {
248+
if (!is_open_) {
230249
throw std::runtime_error("Bag is not open. Call open() before writing.");
231250
}
232251

@@ -260,7 +279,7 @@ void SequentialWriter::create_topic(
260279

261280
void SequentialWriter::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type)
262281
{
263-
if (!storage_) {
282+
if (!is_open_) {
264283
throw std::runtime_error("Bag is not open. Call open() before removing.");
265284
}
266285

@@ -308,14 +327,13 @@ void SequentialWriter::switch_to_next_storage()
308327
base_folder_,
309328
metadata_.relative_file_paths.size());
310329
storage_ = storage_factory_->open_read_write(storage_options_);
311-
storage_->update_metadata(metadata_);
312-
313330
if (!storage_) {
314331
std::stringstream errmsg;
315332
errmsg << "Failed to rollover bagfile to new file: \"" << storage_options_.uri << "\"!";
316333

317334
throw std::runtime_error(errmsg.str());
318335
}
336+
storage_->update_metadata(metadata_);
319337
// Re-register all topics since we rolled-over to a new bagfile.
320338
for (const auto & topic : topics_names_to_info_) {
321339
auto const & md = topic_names_to_message_definitions_[topic.first];
@@ -348,7 +366,7 @@ void SequentialWriter::split_bagfile()
348366

349367
void SequentialWriter::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
350368
{
351-
if (!storage_) {
369+
if (!is_open_) {
352370
throw std::runtime_error("Bag is not open. Call open() before writing.");
353371
}
354372

rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp

+33-1
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
5252
serialization.serialize_message(&test_msg, &serialized_msg);
5353

5454
auto rosbag_directory = fs::path("test_rosbag2_writer_api_bag");
55+
auto rosbag_directory_next = fs::path("test_rosbag2_writer_api_bag_next");
5556
// in case the bag was previously not cleaned up
5657
fs::remove_all(rosbag_directory);
58+
fs::remove_all(rosbag_directory_next);
5759

5860
{
5961
rosbag2_cpp::Writer writer;
@@ -99,7 +101,18 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
99101
// writing a non-serialized message
100102
writer.write(test_msg, "/a/ros2/message", rclcpp::Clock().now());
101103

102-
// close on scope exit
104+
// close as prompted
105+
writer.close();
106+
107+
// open a new bag with the same writer
108+
writer.open(rosbag_directory_next.string());
109+
110+
// write same topic to different bag
111+
writer.write(
112+
serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes",
113+
rclcpp::Clock().now());
114+
115+
// close by scope
103116
}
104117

105118
{
@@ -126,6 +139,24 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
126139
// close on scope exit
127140
}
128141

142+
{
143+
rosbag2_cpp::Reader reader;
144+
std::string topic;
145+
reader.open(rosbag_directory_next.string());
146+
ASSERT_TRUE(reader.has_next());
147+
148+
auto bag_message = reader.read_next();
149+
topic = bag_message->topic_name;
150+
151+
TestMsgT extracted_test_msg;
152+
rclcpp::SerializedMessage extracted_serialized_msg(*bag_message->serialized_data);
153+
serialization.deserialize_message(
154+
&extracted_serialized_msg, &extracted_test_msg);
155+
156+
EXPECT_EQ(test_msg, extracted_test_msg);
157+
EXPECT_EQ("/yet/another/topic", topic);
158+
}
159+
129160
// alternative reader
130161
{
131162
rosbag2_cpp::Reader reader;
@@ -140,6 +171,7 @@ TEST_P(TestRosbag2CPPAPI, minimal_writer_example)
140171

141172
// remove the rosbag again after the test
142173
EXPECT_TRUE(fs::remove_all(rosbag_directory));
174+
EXPECT_TRUE(fs::remove_all(rosbag_directory_next));
143175
}
144176

145177
INSTANTIATE_TEST_SUITE_P(

0 commit comments

Comments
 (0)