Skip to content

Commit

Permalink
update CI toolchain and improve scan interfaces
Browse files Browse the repository at this point in the history
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
tabokie committed Jul 15, 2022
1 parent f3c268b commit 9747bcf
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 124 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2022-05-01
toolchain: nightly-2022-07-13
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2022-05-01
toolchain: nightly-2022-07-13
override: true
components: llvm-tools-preview
- uses: Swatinem/rust-cache@v1
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{util::ReadableSize, Result};
const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512;
const MIN_RECOVERY_THREADS: usize = 1;

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RecoveryMode {
AbsoluteConsistency,
Expand Down
139 changes: 120 additions & 19 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,27 +224,31 @@ where
Ok(None)
}

/// Iterates over [start_key, end_key) range of Raft Group key-values and
/// yields messages of the required type. Unparsable items are skipped.
pub fn scan_messages<S, C>(
&self,
region_id: u64,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
reverse: bool,
callback: C,
mut callback: C,
) -> Result<()>
where
S: Message,
C: FnMut(&[u8], S) -> bool,
{
let _t = StopWatch::new(&*ENGINE_READ_MESSAGE_DURATION_HISTOGRAM);
if let Some(memtable) = self.memtables.get(region_id) {
memtable
.read()
.scan_messages(start_key, end_key, reverse, callback)?;
}
Ok(())
self.scan_raw_messages(region_id, start_key, end_key, reverse, move |k, raw_v| {
if let Ok(v) = parse_from_bytes(raw_v) {
callback(k, v)
} else {
true
}
})
}

/// Iterates over [start_key, end_key) range of Raft Group key-values and
/// yields all key value pairs as bytes.
pub fn scan_raw_messages<C>(
&self,
region_id: u64,
Expand All @@ -260,7 +264,7 @@ where
if let Some(memtable) = self.memtables.get(region_id) {
memtable
.read()
.scan_raw_messages(start_key, end_key, reverse, callback)?;
.scan(start_key, end_key, reverse, callback)?;
}
Ok(())
}
Expand Down Expand Up @@ -823,6 +827,99 @@ mod tests {
run_steps(&[Some((1, 5)), None]);
}

#[test]
fn test_key_value_scan() {
fn key(i: u64) -> Vec<u8> {
format!("k{}", i).as_bytes().to_vec()
}
fn value(i: u64) -> Vec<u8> {
format!("v{}", i).as_bytes().to_vec()
}
fn rich_value(i: u64) -> RaftLocalState {
RaftLocalState {
last_index: i,
..Default::default()
}
}

let dir = tempfile::Builder::new()
.prefix("test_key_value_scan")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let rid = 1;
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();

engine
.scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
panic!("unexpected message.");
})
.unwrap();

let mut batch = LogBatch::default();
let mut res = Vec::new();
let mut rich_res = Vec::new();
batch.put(rid, key(1), value(1));
batch.put(rid, key(2), value(2));
batch.put(rid, key(3), value(3));
engine.write(&mut batch, false).unwrap();

engine
.scan_raw_messages(rid, None, None, false, |k, v| {
res.push((k.to_vec(), v.to_vec()));
true
})
.unwrap();
assert_eq!(
res,
vec![(key(1), value(1)), (key(2), value(2)), (key(3), value(3))]
);
res.clear();
engine
.scan_raw_messages(rid, None, None, true, |k, v| {
res.push((k.to_vec(), v.to_vec()));
true
})
.unwrap();
assert_eq!(
res,
vec![(key(3), value(3)), (key(2), value(2)), (key(1), value(1))]
);
res.clear();
engine
.scan_messages::<RaftLocalState, _>(rid, None, None, false, |_, _| {
panic!("unexpected message.")
})
.unwrap();

batch.put_message(rid, key(22), &rich_value(22)).unwrap();
batch.put_message(rid, key(33), &rich_value(33)).unwrap();
engine.write(&mut batch, false).unwrap();

engine
.scan_messages(rid, None, None, false, |k, v| {
rich_res.push((k.to_vec(), v));
false
})
.unwrap();
assert_eq!(rich_res, vec![(key(22), rich_value(22))]);
rich_res.clear();
engine
.scan_messages(rid, None, None, true, |k, v| {
rich_res.push((k.to_vec(), v));
false
})
.unwrap();
assert_eq!(rich_res, vec![(key(33), rich_value(33))]);
rich_res.clear();
}

#[test]
fn test_delete_key_value() {
let dir = tempfile::Builder::new()
Expand All @@ -844,16 +941,28 @@ mod tests {
let mut delete_batch = LogBatch::default();
delete_batch.delete(rid, key.clone());

// put | delete
// ^ rewrite
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
assert_eq!(
engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
None
);
assert_eq!(engine.get(rid, &key), None);

// put | delete
// ^ rewrite
engine.write(&mut batch_1.clone(), true).unwrap();
assert!(engine.get_message::<RaftLocalState>(rid, &key).is_err());
engine.purge_manager.must_rewrite_append_queue(None, None);
engine.write(&mut delete_batch.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key), None);
assert_eq!(
engine.get_message::<RaftLocalState>(rid, &key).unwrap(),
None
);

// Incomplete purge.
engine.write(&mut batch_1.clone(), true).unwrap();
engine
Expand Down Expand Up @@ -910,14 +1019,6 @@ mod tests {
engine.write(&mut batch_2.clone(), true).unwrap();
let engine = engine.reopen();
assert_eq!(engine.get(rid, &key).unwrap(), v2);
let mut res = vec![];
engine
.scan_raw_messages(rid, Some(&key), None, false, |key, value| {
res.push((key.to_vec(), value.to_vec()));
true
})
.unwrap();
assert_eq!(res, vec![(key.clone(), v2.clone())]);

// put | delete | put |
// ^ rewrite
Expand Down
2 changes: 1 addition & 1 deletion src/file_pipe_log/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub(super) fn lock_file_path<P: AsRef<Path>>(dir: P) -> PathBuf {
}

/// Version of log file format.
#[derive(Clone, Copy, Debug, Eq, PartialEq, FromPrimitive, ToPrimitive)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, FromPrimitive, ToPrimitive)]
#[repr(u64)]
pub enum Version {
V1 = 1,
Expand Down
2 changes: 1 addition & 1 deletion src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{Error, Result};

/// `FilterResult` determines how to alter the existing log items in
/// `RhaiFilterMachine`.
#[derive(PartialEq)]
#[derive(PartialEq, Eq)]
enum FilterResult {
/// Apply in the usual way.
Default,
Expand Down
29 changes: 12 additions & 17 deletions src/log_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type SliceReader<'a> = &'a [u8];

// Format:
// { count | first index | [ tail offsets ] }
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct EntryIndexes(pub Vec<EntryIndex>);

impl EntryIndexes {
Expand Down Expand Up @@ -115,7 +115,7 @@ impl EntryIndexes {

// Format:
// { type | (index) }
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Command {
Clean,
Compact { index: u64 },
Expand Down Expand Up @@ -158,7 +158,7 @@ impl Command {
}

#[repr(u8)]
#[derive(Debug, PartialEq, Copy, Clone)]
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub enum OpType {
Put = 1,
Del = 2,
Expand All @@ -180,7 +180,7 @@ impl OpType {

// Format:
// { op_type | key len | key | ( value len | value ) }
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct KeyValue {
pub op_type: OpType,
pub key: Vec<u8>,
Expand Down Expand Up @@ -239,13 +239,13 @@ impl KeyValue {

// Format:
// { 8 byte region id | 1 byte type | item }
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct LogItem {
pub raft_group_id: u64,
pub content: LogItemContent,
}

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum LogItemContent {
EntryIndexes(EntryIndexes),
Command(Command),
Expand Down Expand Up @@ -343,7 +343,7 @@ pub(crate) type LogItemDrain<'a> = std::vec::Drain<'a, LogItem>;
/// A lean batch of log item, without entry data.
// Format:
// { item count | [items] | crc32 }
#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct LogItemBatch {
items: Vec<LogItem>,
item_size: usize,
Expand Down Expand Up @@ -513,7 +513,7 @@ impl LogItemBatch {
}
}

#[derive(Copy, Clone, Debug, PartialEq)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum BufState {
/// Buffer contains header and optionally entries.
/// # Invariants
Expand Down Expand Up @@ -545,7 +545,7 @@ enum BufState {
/// limits.
// Calling protocol:
// Insert log items -> [`finish_populate`] -> [`finish_write`]
#[derive(Clone, PartialEq, Debug)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct LogBatch {
item_batch: LogItemBatch,
buf_state: BufState,
Expand Down Expand Up @@ -737,17 +737,10 @@ impl LogBatch {

#[cfg(feature = "failpoints")]
{
let corrupted_items = || {
fail::fail_point!("log_batch::corrupted_items", |_| true);
false
};
let corrupted_entries = || {
fail::fail_point!("log_batch::corrupted_entries", |_| true);
false
};
if corrupted_items() {
self.buf[footer_roffset] += 1;
}
if corrupted_entries() && footer_roffset > LOG_BATCH_HEADER_LEN {
self.buf[footer_roffset - 1] += 1;
}
Expand Down Expand Up @@ -1121,7 +1114,7 @@ mod tests {

let item_batch = batch.item_batch.clone();
// decode item batch
let mut bytes_slice = &*encoded;
let mut bytes_slice = encoded;
let (offset, compression_type, len) =
LogBatch::decode_header(&mut bytes_slice).unwrap();
assert_eq!(len, encoded.len());
Expand Down Expand Up @@ -1201,6 +1194,8 @@ mod tests {
kvs.push((k, v));
}

batch1.merge(&mut LogBatch::default()).unwrap();

let mut batch2 = LogBatch::default();
entries.push(generate_entries(11, 21, Some(&data)));
batch2
Expand Down
Loading

0 comments on commit 9747bcf

Please sign in to comment.