Skip to content

Commit

Permalink
scanner/forward: move to next key if accesses a lock with delete type (
Browse files Browse the repository at this point in the history
…tikv#11544)

close tikv#11541

Signed-off-by: youjiali1995 <[email protected]>
  • Loading branch information
youjiali1995 authored Dec 2, 2021
1 parent c04fd41 commit 29299bf
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
4 changes: 3 additions & 1 deletion src/storage/mvcc/reader/scanner/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub trait ScanPolicy<S: Snapshot> {
pub enum HandleRes<T> {
Return(T),
Skip(Key),
MoveToNext,
}

pub struct Cursors<S: Snapshot> {
Expand Down Expand Up @@ -270,6 +271,7 @@ impl<S: Snapshot, P: ScanPolicy<S>> ForwardScanner<S, P> {
return Ok(Some(output));
}
HandleRes::Skip(key) => key,
HandleRes::MoveToNext => continue,
};
}
if has_write {
Expand Down Expand Up @@ -391,7 +393,7 @@ impl<S: Snapshot> ScanPolicy<S> for LatestKvPolicy {
)
.map(|val| match val {
Some(v) => HandleRes::Return((current_user_key, v)),
None => HandleRes::Skip(current_user_key),
None => HandleRes::MoveToNext,
})
.map_err(Into::into);
}
Expand Down
62 changes: 41 additions & 21 deletions src/storage/mvcc/reader/scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,50 +769,70 @@ mod tests {
test_scan_bypass_locks_impl(true);
}

fn test_scan_access_locks_impl(desc: bool) {
fn test_scan_access_locks_impl(desc: bool, delete_bound: bool) {
let engine = TestEngineBuilder::new().build().unwrap();

for i in 0..=6 {
for i in 0..=8 {
must_prewrite_put(&engine, &[i], &[b'v', i], &[i], 10);
must_commit(&engine, &[i], 10, 20);
}

must_prewrite_put(&engine, &[0], &[b'v', 0, 0], &[0], 30);
must_prewrite_delete(&engine, &[1], &[1], 40);
must_prewrite_lock(&engine, &[2], &[2], 50);
must_prewrite_put(&engine, &[3], &[b'v', 3, 3], &[3], 60);
must_prewrite_put(&engine, &[4], &[b'v', 4, 4], &[4], 70);
must_prewrite_put(&engine, &[5], &[b'v', 5, 5], &[5], 80);
if delete_bound {
must_prewrite_delete(&engine, &[0], &[0], 30); // access delete
} else {
must_prewrite_put(&engine, &[0], &[b'v', 0, 0], &[0], 30); // access put
}
must_prewrite_put(&engine, &[1], &[b'v', 1, 1], &[1], 40); // access put
must_prewrite_delete(&engine, &[2], &[2], 50); // access delete
must_prewrite_lock(&engine, &[3], &[3], 60); // access lock(actually ignored)
must_prewrite_put(&engine, &[4], &[b'v', 4, 4], &[4], 70); // locked
must_prewrite_put(&engine, &[5], &[b'v', 5, 5], &[5], 80); // bypass
must_prewrite_put(&engine, &[6], &[b'v', 6, 6], &[6], 100); // locked with larger ts
if delete_bound {
must_prewrite_delete(&engine, &[8], &[8], 90); // access delete
} else {
must_prewrite_put(&engine, &[8], &[b'v', 8, 8], &[8], 90); // access put
}

let bypass_locks = TsSet::from_u64s(vec![70]);
let access_locks = TsSet::from_u64s(vec![30, 40, 50]);
let bypass_locks = TsSet::from_u64s(vec![80]);
let access_locks = TsSet::from_u64s(vec![30, 40, 50, 60, 90]);

let mut expected_result = vec![
(vec![0], Some(vec![b'v', 0, 0])), /* access */
/* vec![1] access */
(vec![2], Some(vec![b'v', 2])), /* ignore */
(vec![3], None), /* locked */
(vec![4], Some(vec![b'v', 4])), /* bypass */
(vec![5], Some(vec![b'v', 5])), /* ignore */
(vec![6], Some(vec![b'v', 6])), /* no lock */
(vec![0], Some(vec![b'v', 0, 0])), /* access put if not delete_bound */
(vec![1], Some(vec![b'v', 1, 1])), /* access put */
/* vec![2] access delete */
(vec![3], Some(vec![b'v', 3])), /* ignore LockType::Lock */
(vec![4], None), /* locked */
(vec![5], Some(vec![b'v', 5])), /* bypass */
(vec![6], Some(vec![b'v', 6])), /* ignore lock with larger ts */
(vec![7], Some(vec![b'v', 7])), /* no lock */
(vec![8], Some(vec![b'v', 8, 8])), /* access put if not delete_bound*/
];
if desc {
expected_result.reverse();
}
let snapshot = engine.snapshot(Default::default()).unwrap();
let scanner = ScannerBuilder::new(snapshot, 75.into())
let scanner = ScannerBuilder::new(snapshot, 95.into())
.desc(desc)
.bypass_locks(bypass_locks)
.access_locks(access_locks)
.build()
.unwrap();
check_scan_result(scanner, &expected_result);
check_scan_result(
scanner,
if delete_bound {
&expected_result[1..expected_result.len() - 1]
} else {
&expected_result
},
);
}

#[test]
fn test_scan_access_locks() {
test_scan_access_locks_impl(false);
test_scan_access_locks_impl(true);
for (desc, delete_bound) in [(false, false), (false, true), (true, false), (true, true)] {
test_scan_access_locks_impl(desc, delete_bound);
}
}

fn must_met_newer_ts_data<E: Engine>(
Expand Down

0 comments on commit 29299bf

Please sign in to comment.