Skip to content

Commit

Permalink
Update text/0081-truncate.md
Browse files Browse the repository at this point in the history
Co-authored-by: Lei Zhao <[email protected]>

Update text/0081-truncate.md

Co-authored-by: Lei Zhao <[email protected]>

fix indent

Signed-off-by: longfangsong <[email protected]>
  • Loading branch information
longfangsong committed Dec 6, 2021
1 parent 9dc2a77 commit a2cc55c
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions text/0081-truncate.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ message TruncateResponse {
}
service Tikv {
// …
rpc Truncate(kvrpcpb.TruncateRequest) returns (kvrpcpb.TruncateResponse) {}
// …
rpc Truncate(kvrpcpb.TruncateRequest) returns (kvrpcpb.TruncateResponse) {}
}
```

Expand All @@ -44,45 +44,45 @@ There are several steps to follow when doing truncate:

### 1. Wait all committed raft log are applied

We can just polling each region's `RegionInfo` in a method similar with [`Debugger::region_info`](https://github.com/tikv/tikv/blob/789c99666f2f9faaa6c6e5b021ac0cf7a76ae24e/src/server/debug.rs#L188) does, and wait for `commit_index` to be equal to `apply_index`.
We can just polling each region's `RegionInfo` in a method similar with [`Debugger::region_info`](https://github.com/tikv/tikv/blob/789c99666f2f9faaa6c6e5b021ac0cf7a76ae24e/src/server/debug.rs#L188) does, and wait for `apply_index` to be equal to `commit_index`.

### 2. Clean WriteCF and DefaultCF

Then we can remove all data created by transactions which committed after `checkpoint_ts`, this can be done by remove all `Write` records which `commit_ts > checkpoint_ts` and corresponding data in `DEFAULT_CF` from KVEngine, ie.

```rust
fn scan_next_batch(&mut self, batch_size: usize) -> Option<Vec<(Vec<u8>, Write)>> {
let mut writes = None;
for _ in 0..batch_size {
if let Some((key, write)) = self.next_write()? {
let commit_ts = Key::decode_ts_from(keys::origin_key(&key));
let mut writes = writes.get_or_insert(Vec::new());
if commit_ts > self.ts {
writes.push((key, write));
}
} else {
return writes;
let mut writes = None;
for _ in 0..batch_size {
if let Some((key, write)) = self.next_write()? {
let commit_ts = Key::decode_ts_from(keys::origin_key(&key));
let mut writes = writes.get_or_insert(Vec::new());
if commit_ts > self.ts {
writes.push((key, write));
}
} else {
return writes;
}
writes
}
writes
}

pub fn process_next_batch(
&mut self,
batch_size: usize,
wb: &mut RocksWriteBatch,
&mut self,
batch_size: usize,
wb: &mut RocksWriteBatch,
) -> bool {
let writes = if let Some(writes) = self.scan_next_batch(batch_size) {
writes
} else {
return false;
}
for (key, write) in writes {
let default_key = Key::append_ts(Key::from_raw(&key), write.start_ts).to_raw().unwrap();
box_try!(wb.delete_cf(CF_WRITE, &key));
box_try!(wb.delete_cf(CF_DEFAULT, &default_key));
}
true
let writes = if let Some(writes) = self.scan_next_batch(batch_size) {
writes
} else {
return false;
}
for (key, write) in writes {
let default_key = Key::append_ts(Key::from_raw(&key), write.start_ts).to_raw().unwrap();
box_try!(wb.delete_cf(CF_WRITE, &key));
box_try!(wb.delete_cf(CF_DEFAULT, &default_key));
}
true
}
```

Expand Down

0 comments on commit a2cc55c

Please sign in to comment.