Skip to content

Commit

Permalink
Don't take archived log size into account when calculating log size f…
Browse files Browse the repository at this point in the history
…or flush
  • Loading branch information
HypenZou committed Jun 18, 2024
1 parent af50823 commit e975dd1
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 2 deletions.
7 changes: 7 additions & 0 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/types.h"
#include "test_util/sync_point.h"
#include "util/file_checksum_helper.h"
Expand All @@ -29,6 +30,7 @@
namespace ROCKSDB_NAMESPACE {

Status DBImpl::FlushForGetLiveFiles() {
TEST_SYNC_POINT("DBImpl::FlushForGetLiveFiles");
return DBImpl::FlushAllColumnFamilies(FlushOptions(),
FlushReason::kGetLiveFiles);
}
Expand Down Expand Up @@ -217,6 +219,11 @@ Status DBImpl::GetLiveFilesStorageInfo(
// We may be able to cover 2PC case too.
uint64_t total_wal_size = 0;
for (auto& wal : live_wal_files) {
// Don't take archived log size into account
// when calculating log size for flush
if (wal->Type() == kArchivedLogFile) {
continue;
}
total_wal_size += wal->SizeFileBytes();
}
if (total_wal_size < opts.wal_size_for_flush) {
Expand Down
3 changes: 3 additions & 0 deletions db/wal_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
Status s = env_->RenameFile(fname, archived_log_name);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
// The sync point below is used in
// (CheckPointTest, CheckpointWithArchievedLog)
TEST_SYNC_POINT("WalManager::ArchiveWALFile");
ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
fname.c_str(), archived_log_name.c_str(),
s.ToString().c_str());
Expand Down
6 changes: 4 additions & 2 deletions include/rocksdb/utilities/checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ class Checkpoint {
// same filesystem as the database, and copied otherwise.
// (2) other required files (like MANIFEST) are always copied.
// log_size_for_flush: if the total log file size is equal or larger than
// this value, then a flush is triggered for all the column families. The
// default value is 0, which means flush is always triggered. If you move
// this value, then a flush is triggered for all the column families.
// The archived log size will not be included when calculating the total log
// size.
// The default value is 0, which means flush is always triggered. If you move
// away from the default, the checkpoint may not contain up-to-date data
// if WAL writing is not always enabled.
// Flush will always trigger if it is 2PC.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
When calculating total log size for the `log_size_for_flush` argument in `CreateCheckpoint` API, the size of the archived log will not be included to avoid unnecessary flush
40 changes: 40 additions & 0 deletions utilities/checkpoint/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#ifndef OS_WIN
#include <unistd.h>
#endif
#include <atomic>
#include <iostream>
#include <thread>
#include <utility>
Expand All @@ -23,6 +24,7 @@
#include "port/stack_trace.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
Expand Down Expand Up @@ -970,6 +972,44 @@ TEST_F(CheckpointTest, CheckpointWithDbPath) {
delete checkpoint;
}

TEST_F(CheckpointTest, CheckpointWithArchievedLog) {
std::atomic_bool flushed = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WalManager::ArchiveWALFile",
"CheckpointTest:CheckpointWithArchievedLog"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushForGetLiveFiles", [&](void*) { flushed = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.WAL_ttl_seconds = 3600;
DestroyAndReopen(options);

ASSERT_OK(Put("key1", std::string(1024 * 1024, 'a')));
// flush and archive the first log
ASSERT_OK(Flush());
ASSERT_OK(Put("key2", std::string(1024, 'a')));

Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
TEST_SYNC_POINT("CheckpointTest:CheckpointWithArchievedLog");
// unflushed log size < 1024 * 1024 < total file size including archived log,
// flush shouldn't occur
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 1024 * 1024));
ASSERT_TRUE(!flushed);

DB* snapshot_db;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshot_db));
ReadOptions read_opts;
std::string get_result;
ASSERT_OK(snapshot_db->Get(read_opts, "key1", &get_result));
ASSERT_EQ(std::string(1024 * 1024, 'a'), get_result);
ASSERT_OK(snapshot_db->Get(read_opts, "key2", &get_result));
ASSERT_EQ(std::string(1024, 'a'), get_result);
delete snapshot_db;
}

TEST(test, test) {}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down

0 comments on commit e975dd1

Please sign in to comment.