Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async IO for write data #52

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor some method
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Little-Wallace committed Sep 25, 2020
commit cc2887579e811359cfd646dd0a77a252807d3291
287 changes: 131 additions & 156 deletions src/engine.rs
Original file line number Diff line number Diff line change
@@ -137,84 +137,35 @@ where
W: EntryExt<E> + 'static,
P: GenericPipeLog,
{
// Recover from disk.
fn recover(
queue: LogQueue,
pipe_log: &P,
memtables: &MemTableAccessor<E, W>,
cache_submitor: &mut CacheSubmitor,
recovery_mode: RecoveryMode,
) -> Result<()> {
// Get first file number and last file number.
let first_file_num = pipe_log.first_file_num(queue);
let active_file_num = pipe_log.active_file_num(queue);

// Iterate and recover from files one by one.
let start = Instant::now();
for file_num in first_file_num..=active_file_num {
// Read a file.
let content = pipe_log.read_whole_file(queue, file_num)?;

// Verify file header.
let mut buf = content.as_slice();
if !buf.starts_with(FILE_MAGIC_HEADER) {
if file_num != active_file_num {
warn!("Raft log header is corrupted at {:?}.{}", queue, file_num);
return Err(box_err!("Raft log file header is corrupted"));
fn apply_to_memtable(&self, item: LogItem<E>, queue: LogQueue, file_num: u64) {
let memtable = self.memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries = entries_to_add.entries;
let entries_index = entries_to_add.entries_index.into_inner();
if queue == LogQueue::Rewrite {
memtable.wl().append_rewrite(entries, entries_index);
} else {
pipe_log.truncate_active_log(queue, Some(0)).unwrap();
break;
memtable.wl().append(entries, entries_index);
}
}

// Iterate all LogBatch in one file.
let start_ptr = buf.as_ptr();
buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len());
let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64;
loop {
match LogBatch::from_bytes(&mut buf, file_num, offset) {
Ok(Some(mut log_batch)) => {
let mut encoded_size = 0;
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
encoded_size += entries.encoded_size.get();
}
}

if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) {
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
entries.attach_cache_tracker(tracker.clone());
}
}
}
cache_submitor.fill_chunk(encoded_size);
apply_to_memtable(memtables, &mut log_batch, queue, file_num);
offset = (buf.as_ptr() as usize - start_ptr as usize) as u64;
}
Ok(None) => {
info!("Recovered raft log {:?}.{}.", queue, file_num);
break;
}
Err(e) => {
warn!(
"Raft log content is corrupted at {:?}.{}:{}, error: {}",
queue, file_num, offset, e
);
// There may be a pre-allocated space at the tail of the active log.
if file_num == active_file_num
&& recovery_mode == RecoveryMode::TolerateCorruptedTailRecords
{
pipe_log.truncate_active_log(queue, Some(offset as usize))?;
break;
}
return Err(box_err!("Raft log content is corrupted"));
LogItemContent::Command(Command::Clean) => {
self.memtables.remove(item.raft_group_id);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.wl().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let (key, value) = (kv.key, kv.value.unwrap());
match queue {
LogQueue::Append => memtable.wl().put(key, value, file_num),
LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num),
}
}
}
OpType::Del => memtable.wl().delete(kv.key.as_slice()),
},
}
info!("Recover raft log takes {:?}", start.elapsed());
Ok(())
}

// Write a batch needs 3 steps:
@@ -246,7 +197,7 @@ where
if let LogItemContent::Entries(ref entries) = item.content {
entries.update_position(LogQueue::Append, file_num, offset, &tracker);
}
apply_item_to_memtable(&self.memtables, item, LogQueue::Append, file_num);
self.apply_to_memtable(item, LogQueue::Append, file_num);
}
}
return Ok(bytes);
@@ -276,6 +227,10 @@ where
E: Message + Clone,
W: EntryExt<E> + 'static,
{
pub fn new(cfg: Config) -> Engine<E, W, PipeLog> {
Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap()
}

fn new_impl(cfg: Config, chunk_limit: usize) -> Result<Engine<E, W, PipeLog>> {
let cache_limit = cfg.cache_limit.0 as usize;
let global_stats = Arc::new(GlobalStats::default());
@@ -300,35 +255,42 @@ where
);
cache_evict_worker.start(cache_evict_runner, Some(Duration::from_secs(1)));

let recovery_mode = cfg.recovery_mode;
let mut cache_submitor = CacheSubmitor::new(
cache_limit,
chunk_limit,
cache_evict_worker.scheduler(),
global_stats.clone(),
);

let cfg = Arc::new(cfg);
let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone());
let (wal_sender, wal_receiver) = channel();
let engine = Engine {
cfg,
memtables,
pipe_log,
global_stats,
purge_manager,
wal_sender,
workers: Arc::new(RwLock::new(Workers {
cache_evict: cache_evict_worker,
wal: None,
})),
};
cache_submitor.block_on_full();
Engine::recover(
LogQueue::Rewrite,
&pipe_log,
&memtables,
engine.recover(
&mut cache_submitor,
LogQueue::Rewrite,
RecoveryMode::TolerateCorruptedTailRecords,
)?;
Engine::recover(
LogQueue::Append,
&pipe_log,
&memtables,
engine.recover(
&mut cache_submitor,
recovery_mode,
LogQueue::Append,
engine.cfg.recovery_mode,
)?;
cache_submitor.nonblock_on_full();

let cfg = Arc::new(cfg);
let purge_manager = PurgeManager::new(cfg.clone(), memtables.clone(), pipe_log.clone());
let (wal_sender, wal_receiver) = channel();

let mut wal_runner = WalRunner::new(cache_submitor, pipe_log.clone(), wal_receiver);
let mut wal_runner = WalRunner::new(cache_submitor, engine.pipe_log.clone(), wal_receiver);
let th = ThreadBuilder::new()
.name("wal".to_string())
.spawn(move || {
@@ -337,23 +299,89 @@ where
.unwrap_or_else(|e| warn!("run error because: {}", e))
})
.unwrap();

Ok(Engine {
cfg,
memtables,
pipe_log,
global_stats,
purge_manager,
wal_sender,
workers: Arc::new(RwLock::new(Workers {
cache_evict: cache_evict_worker,
wal: Some(th),
})),
})
engine.workers.wl().wal = Some(th);
Ok(engine)
}
// Recover from disk.
fn recover(
&self,
cache_submitor: &mut CacheSubmitor,
queue: LogQueue,
recovery_mode: RecoveryMode,
) -> Result<()> {
// Get first file number and last file number.
let first_file_num = self.pipe_log.first_file_num(queue);
let active_file_num = self.pipe_log.active_file_num(queue);

pub fn new(cfg: Config) -> Engine<E, W, PipeLog> {
Self::new_impl(cfg, DEFAULT_CACHE_CHUNK_SIZE).unwrap()
// Iterate and recover from files one by one.
let start = Instant::now();
for file_num in first_file_num..=active_file_num {
// Read a file.
let content = self.pipe_log.read_whole_file(queue, file_num)?;

// Verify file header.
let mut buf = content.as_slice();
if !buf.starts_with(FILE_MAGIC_HEADER) {
if file_num != active_file_num {
warn!("Raft log header is corrupted at {:?}.{}", queue, file_num);
return Err(box_err!("Raft log file header is corrupted"));
} else {
self.pipe_log.truncate_active_log(queue, Some(0)).unwrap();
break;
}
}

// Iterate all LogBatch in one file.
let start_ptr = buf.as_ptr();
buf.consume(FILE_MAGIC_HEADER.len() + VERSION.len());
let mut offset = (FILE_MAGIC_HEADER.len() + VERSION.len()) as u64;
loop {
match LogBatch::<E, W>::from_bytes(&mut buf, file_num, offset) {
Ok(Some(mut log_batch)) => {
let mut encoded_size = 0;
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
encoded_size += entries.encoded_size.get();
}
}

if let Some(tracker) = cache_submitor.get_cache_tracker(file_num, offset) {
for item in &log_batch.items {
if let LogItemContent::Entries(ref entries) = item.content {
entries.attach_cache_tracker(tracker.clone());
}
}
}
cache_submitor.fill_chunk(encoded_size);
for item in log_batch.items.drain(..) {
self.apply_to_memtable(item, queue, file_num);
}
offset = (buf.as_ptr() as usize - start_ptr as usize) as u64;
}
Ok(None) => {
info!("Recovered raft log {:?}.{}.", queue, file_num);
break;
}
Err(e) => {
warn!(
"Raft log content is corrupted at {:?}.{}:{}, error: {}",
queue, file_num, offset, e
);
// There may be a pre-allocated space at the tail of the active log.
if file_num == active_file_num
&& recovery_mode == RecoveryMode::TolerateCorruptedTailRecords
{
self.pipe_log
.truncate_active_log(queue, Some(offset as usize))?;
break;
}
return Err(box_err!("Raft log content is corrupted"));
}
}
}
}
info!("Recover raft log takes {:?}", start.elapsed());
Ok(())
}
}

@@ -568,59 +596,6 @@ where
Ok(e)
}

fn apply_item_to_memtable<E, W>(
memtables: &MemTableAccessor<E, W>,
item: LogItem<E>,
queue: LogQueue,
file_num: u64,
) where
E: Message + Clone,
W: EntryExt<E>,
{
let memtable = memtables.get_or_insert(item.raft_group_id);
match item.content {
LogItemContent::Entries(entries_to_add) => {
let entries = entries_to_add.entries;
let entries_index = entries_to_add.entries_index.into_inner();
if queue == LogQueue::Rewrite {
memtable.wl().append_rewrite(entries, entries_index);
} else {
memtable.wl().append(entries, entries_index);
}
}
LogItemContent::Command(Command::Clean) => {
memtables.remove(item.raft_group_id);
}
LogItemContent::Command(Command::Compact { index }) => {
memtable.wl().compact_to(index);
}
LogItemContent::Kv(kv) => match kv.op_type {
OpType::Put => {
let (key, value) = (kv.key, kv.value.unwrap());
match queue {
LogQueue::Append => memtable.wl().put(key, value, file_num),
LogQueue::Rewrite => memtable.wl().put_rewrite(key, value, file_num),
}
}
OpType::Del => memtable.wl().delete(kv.key.as_slice()),
},
}
}

fn apply_to_memtable<E, W>(
memtables: &MemTableAccessor<E, W>,
log_batch: &mut LogBatch<E, W>,
queue: LogQueue,
file_num: u64,
) where
E: Message + Clone,
W: EntryExt<E>,
{
for item in log_batch.items.drain(..) {
apply_item_to_memtable(memtables, item, queue, file_num);
}
}

#[cfg(test)]
mod tests {
use super::*;