Skip to content

Commit

Permalink
Fix some race conditions in listener_test (#13385)
Browse files Browse the repository at this point in the history
Summary:
There are some data races reported for this test. This PR fixes two races:
1) Test main body and event listener callback race to access a variable `call_count_` in the test's event listener.

2) Test event listener access `ColumnFamilyData::current_` during `OnFlushCompleted` without locking DB mutex and raced with a background compaction job.

Example [run](https://github.com/facebook/rocksdb/actions/runs/13208433475/job/36876956677?fbclid=IwZXh0bgNhZW0CMTEAAR0_gG1Brx7I6bhN3PVD267c2d06GSf7QBEQ8cbNcFHNvn-ZX2JWHtr05qg_aem_915NHkfFh-6cMk83uTHWKw)

Pull Request resolved: #13385

Reviewed By: cbi42

Differential Revision: D69333371

Pulled By: jowlyzhang

fbshipit-source-id: dee4a5f5e161d9b1f5b47b37163ee5b91fe18977
  • Loading branch information
jowlyzhang authored and facebook-github-bot committed Feb 8, 2025
1 parent dd01f73 commit d48af21
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 10 deletions.
4 changes: 2 additions & 2 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,9 @@ class ColumnFamilyData {
return mem()->GetFirstSequenceNumber() == 0 && imm()->NumNotFlushed() == 0;
}

Version* current() { return current_; }
Version* dummy_versions() { return dummy_versions_; }
void SetCurrent(Version* _current);
Version* current() { return current_; } // REQUIRE: DB mutex held
void SetCurrent(Version* _current); // REQUIRE: DB mutex held
uint64_t GetNumLiveVersions() const; // REQUIRE: DB mutex held
uint64_t GetTotalSstFilesSize() const; // REQUIRE: DB mutex held
uint64_t GetLiveSstFilesSize() const; // REQUIRE: DB mutex held
Expand Down
36 changes: 28 additions & 8 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,9 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
EXPECT_NE(cfd, nullptr);

test_->dbfull()->TEST_LockMutex();
Version* const current = cfd->current();
test_->dbfull()->TEST_UnlockMutex();
EXPECT_NE(current, nullptr);

const VersionStorageInfo* const storage_info = current->storage_info();
Expand Down Expand Up @@ -1325,10 +1327,9 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
}

void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
call_count_++;

{
std::lock_guard<std::mutex> lock(mutex_);
IncreaseCallCount(/*mutex_locked*/ true);
flushed_files_.push_back(info.file_path);
}

Expand All @@ -1339,7 +1340,7 @@ class BlobDBJobLevelEventListenerTest : public EventListener {

void OnCompactionCompleted(DB* /*db*/,
const CompactionJobInfo& info) override {
call_count_++;
IncreaseCallCount(/*mutex_locked*/ false);

EXPECT_EQ(info.blob_compression_type, kNoCompression);

Expand All @@ -1355,12 +1356,31 @@ class BlobDBJobLevelEventListenerTest : public EventListener {
}
}

void IncreaseCallCount(bool mutex_locked) {
if (!mutex_locked) {
std::lock_guard<std::mutex> lock(mutex_);
call_count_++;
} else {
call_count_++;
}
}

uint32_t GetCallCount() {
std::lock_guard<std::mutex> lock(mutex_);
return call_count_;
}

void ResetCallCount() {
std::lock_guard<std::mutex> lock(mutex_);
call_count_ = 0;
}

EventListenerTest* test_;
uint32_t call_count_;

private:
std::vector<std::string> flushed_files_;
std::mutex mutex_;
std::vector<std::string> flushed_files_;
uint32_t call_count_;
};

// Test OnFlushCompleted EventListener called for blob files
Expand Down Expand Up @@ -1389,7 +1409,7 @@ TEST_F(EventListenerTest, BlobDBOnFlushCompleted) {
ASSERT_EQ(Get("Key2"), "blob_value2");
ASSERT_EQ(Get("Key3"), "blob_value3");

ASSERT_GT(blob_event_listener->call_count_, 0U);
ASSERT_GT(blob_event_listener->GetCallCount(), 0U);
}

// Test OnCompactionCompleted EventListener called for blob files
Expand Down Expand Up @@ -1423,7 +1443,7 @@ TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
ASSERT_OK(Put("Key6", "blob_value6"));
ASSERT_OK(Flush());

blob_event_listener->call_count_ = 0;
blob_event_listener->ResetCallCount();
constexpr Slice* begin = nullptr;
constexpr Slice* end = nullptr;

Expand All @@ -1432,7 +1452,7 @@ TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));

// Make sure, OnCompactionCompleted is called.
ASSERT_GT(blob_event_listener->call_count_, 0U);
ASSERT_GT(blob_event_listener->GetCallCount(), 0U);
}

// Test CompactFiles calls OnCompactionCompleted EventListener for blob files
Expand Down

0 comments on commit d48af21

Please sign in to comment.