Skip to content

Commit

Permalink
Add some comments
Browse files Browse the repository at this point in the history
ref tikv#11555

Signed-off-by: longfangsong <[email protected]>
  • Loading branch information
longfangsong committed Jan 27, 2022
1 parent 7a47325 commit 04db2d2
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
6 changes: 1 addition & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ procinfo = { git = "https://github.com/tikv/procinfo-rs", rev = "5125fc1a69496b7
# kvproto at the same time.
# After the PR to kvproto is merged, remember to comment this out and run `cargo update -p kvproto`.
[patch.'https://github.com/pingcap/kvproto']
kvproto = {git = "https://github.com/longfangsong/kvproto", rev = "b7f0ce4e3945d2f5c7081372dd7d23487c1a659b"}
kvproto = {git = "https://github.com/longfangsong/kvproto", rev = "00db780adc9e10dbd0a00159be39fe917cd62ca9"}

[workspace]
# See https://github.com/rust-lang/rfcs/blob/master/text/2957-cargo-features2.md
Expand Down
48 changes: 38 additions & 10 deletions src/server/reset_to_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,42 @@ use txn_types::{Key, TimeStamp, Write, WriteRef};

const BATCH_SIZE: usize = 256;

/// `ResetToVersionState` is the current state of the reset-to-version process.
/// We can report this to the user.
#[derive(Debug, Clone)]
pub enum ResetToVersionState {
RemoveWrite { scanned: usize },
RemoveLock { scanned: usize },
/// `RemovingWrite` means we are removing stale data in the `WRITE` and `DEFAULT` cf
RemovingWrite { scanned: usize },
/// `RemovingWrite` means we are removing stale data in the `LOCK` cf
RemovingLock { scanned: usize },
/// `Done` means we have finshed the task
Done,
}

impl ResetToVersionState {
pub fn scanned(&mut self) -> &mut usize {
match self {
ResetToVersionState::RemoveWrite { scanned } => scanned,
ResetToVersionState::RemoveLock { scanned } => scanned,
ResetToVersionState::RemovingWrite { scanned } => scanned,
ResetToVersionState::RemovingLock { scanned } => scanned,
_ => unreachable!(),
}
}
}

/// `ResetToVersionWorker` is the worker that does the actual reset-to-version work.
pub struct ResetToVersionWorker {
/// `ts` is the timestamp to reset to.
ts: TimeStamp,
/// `write_iter` is the iterator to scan through the WRITE cf
write_iter: RocksEngineIterator,
/// `lock_iter` is the iterator to scan through the LOCK cf
lock_iter: RocksEngineIterator,
/// `state` is current state of this task
state: Arc<Mutex<ResetToVersionState>>,
}

/// `Batch` means a batch of writes load from the engine.
/// We scan writes in batches to prevent huge memory usage.
struct Batch {
writes: Vec<(Vec<u8>, Write)>,
has_more: bool,
Expand All @@ -55,7 +67,7 @@ impl ResetToVersionWorker {
*state
.lock()
.expect("failed to lock `state` in `ResetToVersionWorker::new`") =
ResetToVersionState::RemoveWrite { scanned: 0 };
ResetToVersionState::RemovingWrite { scanned: 0 };
write_iter.seek(SeekKey::Start).unwrap();
lock_iter.seek(SeekKey::Start).unwrap();
Self {
Expand All @@ -74,7 +86,7 @@ impl ResetToVersionWorker {
.expect("failed to lock ResetToVersionWorker::state");
debug_assert!(matches!(
*state,
ResetToVersionState::RemoveWrite { scanned: _ }
ResetToVersionState::RemovingWrite { scanned: _ }
));
*state.scanned() += 1;
drop(state);
Expand Down Expand Up @@ -136,7 +148,7 @@ impl ResetToVersionWorker {
.expect("failed to lock ResetToVersionWorker::state");
debug_assert!(matches!(
*state,
ResetToVersionState::RemoveLock { scanned: _ }
ResetToVersionState::RemovingLock { scanned: _ }
));
*state.scanned() += 1;
drop(state);
Expand All @@ -152,9 +164,15 @@ impl ResetToVersionWorker {
Ok(has_more)
}
}

/// `ResetToVersionManager` is the manager that manages the reset-to-version process.
/// User should interact with `ResetToVersionManager` instead of using `ResetToVersionWorker` directly.
pub struct ResetToVersionManager {
/// Current state of the reset-to-version process.
state: Arc<Mutex<ResetToVersionState>>,
/// The engine we are working on
engine: RocksEngine,
/// Current working worker
worker_handle: RefCell<Option<JoinHandle<()>>>,
}

Expand All @@ -171,14 +189,15 @@ impl Clone for ResetToVersionManager {
#[allow(dead_code)]
impl ResetToVersionManager {
pub fn new(engine: RocksEngine) -> Self {
let state = Arc::new(Mutex::new(ResetToVersionState::RemoveWrite { scanned: 0 }));
let state = Arc::new(Mutex::new(ResetToVersionState::RemovingWrite { scanned: 0 }));
ResetToVersionManager {
state,
engine,
worker_handle: RefCell::new(None),
}
}

/// Start a reset-to-version process which reset version to `ts`.
pub fn start(&self, ts: TimeStamp) {
let readopts = IterOptions::new(None, None, false);
let write_iter = self
Expand All @@ -189,6 +208,10 @@ impl ResetToVersionManager {
let mut worker = ResetToVersionWorker::new(write_iter, lock_iter, ts, self.state.clone());
let mut wb = self.engine.write_batch();
let props = tikv_util::thread_group::current_properties();
if self.worker_handle.borrow().is_some() {
warn!("A reset-to-version process is already in progress! Wait until it finish first.");
self.wait();
}
*self.worker_handle.borrow_mut() = Some(std::thread::Builder::new()
.name("reset_to_version".to_string())
.spawn(move || {
Expand All @@ -199,7 +222,7 @@ impl ResetToVersionManager {
}
*worker.state.lock()
.expect("failed to lock `ResetToVersionWorker::state` in `ResetToVersionWorker::process_next_batch`")
= ResetToVersionState::RemoveLock { scanned: 0 };
= ResetToVersionState::RemovingLock { scanned: 0 };
while worker.process_next_batch_lock(BATCH_SIZE, &mut wb).expect("reset_to_version failed when removing invalid locks") {
}
*worker.state.lock()
Expand All @@ -211,14 +234,15 @@ impl ResetToVersionManager {
.expect("failed to spawn reset_to_version thread"));
}

/// Current process state.
pub fn state(&self) -> ResetToVersionState {
self.state
.lock()
.expect("failed to lock `state` in `ResetToVersionManager::state`")
.clone()
}

#[cfg(test)]
/// Wait until the process finished.
pub fn wait(&mut self) {
self.worker_handle.take().unwrap().join().unwrap();
}
Expand Down Expand Up @@ -354,18 +378,22 @@ mod tests {
remaining_locks.push((key, lock));
}

// Writes which start_ts >= 100 should be removed.
assert_eq!(remaining_writes.len(), 1);
let (key, _) = &remaining_writes[0];
// So the only write left is the one with start_ts = 99
assert_eq!(
Key::from_encoded(key.clone()).decode_ts().unwrap(),
99.into()
);
// Defaults corresponding to the removed writes should be removed.
assert_eq!(remaining_defaults.len(), 1);
let (key, _) = &remaining_defaults[0];
assert_eq!(
Key::from_encoded(key.clone()).decode_ts().unwrap(),
98.into()
);
// All locks should be removed.
assert!(remaining_locks.is_empty());
}
}

0 comments on commit 04db2d2

Please sign in to comment.