Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

read entries using AIO #286

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
fix unit test.
Signed-off-by: root <1019495690@qq.com>
ustc-wxy committed Feb 25, 2023
commit aadefae47718093987c94c70ba9f81626c6a73bb
64 changes: 8 additions & 56 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -336,19 +336,13 @@ where
memtable
.read()
.fetch_entries_to(begin, end, max_size, &mut ents_idx)?;
println!(
"[fetch_entries_to_aio] (stage1) time cost: {:?} us",
start.elapsed().as_micros()
);


let bytes = self.pipe_log.async_read_bytes(&mut ents_idx).unwrap();
parse_entries_from_bytes::<M>(bytes, &mut ents_idx, vec);

ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64);
println!(
"[fetch_entries_to_aio] (end) time cost: {:?} us",
start.elapsed().as_micros()
);

return Ok(ents_idx.len());
}

@@ -846,66 +840,24 @@ mod tests {
assert_eq!(d, &data);
});
}

// Recover the engine.
let engine = engine.reopen();
for i in 10..20 {
let rid = i;
let index = i;
engine.scan_entries(rid, index, index + 2, |_, q, d| {
engine.scan_entries_aio(rid, index, index + 2, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
}
}
}

#[test]
fn test_async_read() {
let normal_batch_size = 10;
let compressed_batch_size = 5120;
for &entry_size in &[normal_batch_size] {
if entry_size == normal_batch_size {
println!("[normal_batch_size]");
} else if entry_size == compressed_batch_size {
println!("[compressed_batch_size]");
}
let dir = tempfile::Builder::new()
.prefix("test_get_entry")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize::gb(1),
..Default::default()
};

let engine =
RaftLogEngine::open_with_file_system(cfg.clone(), Arc::new(DefaultFileSystem))
.unwrap();

assert_eq!(engine.path(), dir.path().to_str().unwrap());
let data = vec![b'x'; entry_size];
for i in 10..1010 {
for rid in 10..15 {
let index = i;
engine.append(rid, index, index + 1, Some(&data));
}
}
for i in 10..15 {
// Recover the engine.
let engine = engine.reopen();
for i in 10..20 {
let rid = i;
let index = 10;
println!("[PREAD]");
engine.scan_entries(rid, index, index + 1000, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
println!("[AIO]");
engine.scan_entries_aio(rid, index, index + 1000, |_, q, d| {
let index = i;
engine.scan_entries(rid, index, index + 2, |_, q, d| {
assert_eq!(q, LogQueue::Append);
assert_eq!(d, &data);
});
println!("====================================================================================");
}
}
}