From 618eea7af842a0301945d2998062bb95d0e91aea Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Tue, 6 Sep 2022 13:55:21 +0800 Subject: [PATCH] hide more file writing details from engine (#269) This is a preparing work for #258. Changes: - Move `fsync` to file handle instead of writer, so that we can call it concurrently in the future. In doing so, we have to remove the sync offset tracking and deprecate `bytes_per_sync` support - Move `prepare_write` into pipe_log. Introduce a trait `ReactiveBytes` for this purpose. - Rotate the file during write instead of doing it every write group, the timing is also delayed to the next write after exceeding limit (instead of the write that exceeds the limit). - Differentiate two types of I/O errors: unrecoverable error from fsync, and retriable error from pwrite. Only bubble error for the latter case, panic early for unrecoverable ones. - Also refactor the purge code, fix two cases where force-compact is not triggered Signed-off-by: tabokie --- CHANGELOG.md | 2 + src/config.rs | 18 +++---- src/engine.rs | 28 +++++----- src/env/default.rs | 37 ++++++------- src/env/mod.rs | 3 +- src/env/obfuscated.rs | 4 -- src/file_pipe_log/log_file.rs | 17 ++---- src/file_pipe_log/pipe.rs | 89 ++++++++++++++----------------- src/log_batch.rs | 9 +++- src/memtable.rs | 16 +++--- src/pipe_log.rs | 38 +++++++++---- src/purge.rs | 65 ++++++++++++---------- tests/failpoints/test_engine.rs | 4 +- tests/failpoints/test_io_error.rs | 7 ++- 14 files changed, 171 insertions(+), 166 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d2b9962..8dd8aca4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ * Add metadata deletion capability to `FileSystem` trait. Users can implement `exists_metadata` and `delete_metadata` to clean up obsolete metadata from older versions of Raft Engine. * Add `Engine::scan_messages` and `Engine::scan_raw_messages` for iterating over written key-values. * Add `Engine::get` for getting raw value. +* Move `sync` from `env::WriteExt` to `env::Handle`. +* Deprecate `bytes_per_sync`. ### Behavior Changes diff --git a/src/config.rs b/src/config.rs index 7ae133fe..5851cb71 100644 --- a/src/config.rs +++ b/src/config.rs @@ -49,11 +49,12 @@ pub struct Config { /// /// Default: "8KB" pub batch_compression_threshold: ReadableSize, + /// Deprecated. /// Incrementally sync log files after specified bytes have been written. /// Setting it to zero disables incremental sync. /// /// Default: "4MB" - pub bytes_per_sync: ReadableSize, + pub bytes_per_sync: Option, /// Version of the log file. /// @@ -101,7 +102,7 @@ impl Default for Config { recovery_read_block_size: ReadableSize::kb(16), recovery_threads: 4, batch_compression_threshold: ReadableSize::kb(8), - bytes_per_sync: ReadableSize::mb(4), + bytes_per_sync: None, format_version: Version::V2, target_file_size: ReadableSize::mb(128), purge_threshold: ReadableSize::gb(10), @@ -114,7 +115,6 @@ impl Default for Config { #[cfg(test)] { cfg.memory_limit = Some(ReadableSize(0)); - cfg.format_version = Version::V2; cfg.enable_log_recycle = true; } cfg @@ -132,8 +132,8 @@ impl Config { self.target_file_size.0, ))); } - if self.bytes_per_sync.0 == 0 { - self.bytes_per_sync = ReadableSize(u64::MAX); + if self.bytes_per_sync.is_some() { + warn!("bytes-per-sync has been deprecated."); } let min_recovery_read_block_size = ReadableSize(MIN_RECOVERY_READ_BLOCK_SIZE as u64); if self.recovery_read_block_size < min_recovery_read_block_size { @@ -204,14 +204,16 @@ mod tests { target-file-size = "1MB" purge-threshold = "3MB" format-version = 1 + enable-log-recycle = false "#; - let load: Config = toml::from_str(custom).unwrap(); + let mut load: Config = toml::from_str(custom).unwrap(); assert_eq!(load.dir, "custom_dir"); assert_eq!(load.recovery_mode, RecoveryMode::TolerateTailCorruption); - assert_eq!(load.bytes_per_sync, ReadableSize::kb(2)); + assert_eq!(load.bytes_per_sync, Some(ReadableSize::kb(2))); assert_eq!(load.target_file_size, ReadableSize::mb(1)); assert_eq!(load.purge_threshold, ReadableSize::mb(3)); assert_eq!(load.format_version, Version::V1); + load.sanitize().unwrap(); } #[test] @@ -226,7 +228,6 @@ mod tests { let soft_error = r#" recovery-read-block-size = "1KB" recovery-threads = 0 - bytes-per-sync = "0KB" target-file-size = "5000MB" format-version = 2 enable-log-recycle = true @@ -236,7 +237,6 @@ mod tests { soft_sanitized.sanitize().unwrap(); assert!(soft_sanitized.recovery_read_block_size.0 >= MIN_RECOVERY_READ_BLOCK_SIZE as u64); assert!(soft_sanitized.recovery_threads >= MIN_RECOVERY_THREADS); - assert_eq!(soft_sanitized.bytes_per_sync.0, u64::MAX); assert_eq!( soft_sanitized.purge_rewrite_threshold.unwrap(), soft_sanitized.target_file_size diff --git a/src/engine.rs b/src/engine.rs index 78e28c2e..bf9df9ef 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -147,15 +147,12 @@ where if let Some(mut group) = self.write_barrier.enter(&mut writer) { let now = Instant::now(); let _t = StopWatch::new_with(&*ENGINE_WRITE_LEADER_DURATION_HISTOGRAM, now); - let file_context = self.pipe_log.fetch_active_file(LogQueue::Append); for writer in group.iter_mut() { writer.entered_time = Some(now); sync |= writer.sync; let log_batch = writer.mut_payload(); let res = if !log_batch.is_empty() { - log_batch.prepare_write(&file_context)?; - self.pipe_log - .append(LogQueue::Append, log_batch.encoded_bytes()) + self.pipe_log.append(LogQueue::Append, log_batch) } else { // TODO(tabokie): use Option instead. Ok(FileBlockHandle { @@ -166,16 +163,11 @@ where }; writer.set_output(res); } - debug_assert!( - file_context.id == self.pipe_log.fetch_active_file(LogQueue::Append).id - ); perf_context!(log_write_duration).observe_since(now); - if let Err(e) = self.pipe_log.maybe_sync(LogQueue::Append, sync) { - panic!( - "Cannot sync {:?} queue due to IO error: {}", - LogQueue::Append, - e - ); + if sync { + // As per trait protocol, this error should be retriable. But we panic anyway to + // save the trouble of propagating it to other group members. + self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()"); } // Pass the perf context diff to all the writers. let diff = get_perf_context(); @@ -1258,8 +1250,12 @@ mod tests { check_purge(vec![1, 2, 3]); } - // 10th, rewrited - check_purge(vec![]); + // 10th, rewritten, but still needs to be compacted. + check_purge(vec![1, 2, 3]); + for rid in 1..=3 { + let memtable = engine.memtables.get(rid).unwrap(); + assert_eq!(memtable.read().rewrite_count(), 50); + } // compact and write some new data to trigger compact again. for rid in 2..=50 { @@ -1454,7 +1450,7 @@ mod tests { assert_eq!(engine.file_span(LogQueue::Append).0, old_active_file + 1); let old_active_file = engine.file_span(LogQueue::Rewrite).1; engine.purge_manager.must_rewrite_rewrite_queue(); - assert_eq!(engine.file_span(LogQueue::Rewrite).0, old_active_file + 1); + assert!(engine.file_span(LogQueue::Rewrite).0 > old_active_file); let engine = engine.reopen(); for rid in 1..=3 { diff --git a/src/env/default.rs b/src/env/default.rs index 4c5de77d..451ee9fe 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -74,22 +74,6 @@ impl LogFd { close(self.0).map_err(|e| from_nix_error(e, "close")) } - /// Synchronizes all in-memory data of the file except metadata to the - /// filesystem. - pub fn sync(&self) -> IoResult<()> { - fail_point!("log_fd::sync::err", |_| { - Err(from_nix_error(nix::Error::EINVAL, "fp")) - }); - #[cfg(target_os = "linux")] - { - nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) - } - #[cfg(not(target_os = "linux"))] - { - nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) - } - } - /// Reads some bytes starting at `offset` from this file into the specified /// buffer. Returns how many bytes were read. pub fn read(&self, mut offset: usize, buf: &mut [u8]) -> IoResult { @@ -168,6 +152,7 @@ impl LogFd { } impl Handle for LogFd { + #[inline] fn truncate(&self, offset: usize) -> IoResult<()> { fail_point!("log_fd::truncate::err", |_| { Err(from_nix_error(nix::Error::EINVAL, "fp")) @@ -175,6 +160,7 @@ impl Handle for LogFd { ftruncate(self.0, offset as i64).map_err(|e| from_nix_error(e, "ftruncate")) } + #[inline] fn file_size(&self) -> IoResult { fail_point!("log_fd::file_size::err", |_| { Err(from_nix_error(nix::Error::EINVAL, "fp")) @@ -183,6 +169,21 @@ impl Handle for LogFd { .map(|n| n as usize) .map_err(|e| from_nix_error(e, "lseek")) } + + #[inline] + fn sync(&self) -> IoResult<()> { + fail_point!("log_fd::sync::err", |_| { + Err(from_nix_error(nix::Error::EINVAL, "fp")) + }); + #[cfg(target_os = "linux")] + { + nix::unistd::fdatasync(self.0).map_err(|e| from_nix_error(e, "fdatasync")) + } + #[cfg(not(target_os = "linux"))] + { + nix::unistd::fsync(self.0).map_err(|e| from_nix_error(e, "fsync")) + } + } } impl Drop for LogFd { @@ -251,10 +252,6 @@ impl WriteExt for LogFile { Ok(()) } - fn sync(&mut self) -> IoResult<()> { - self.inner.sync() - } - fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { self.inner.allocate(offset, size) } diff --git a/src/env/mod.rs b/src/env/mod.rs index 50cce189..6ae4bf9d 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -56,11 +56,12 @@ pub trait Handle { /// Returns the current size of this file. fn file_size(&self) -> Result; + + fn sync(&self) -> Result<()>; } /// WriteExt is writer extension api pub trait WriteExt { fn truncate(&mut self, offset: usize) -> Result<()>; - fn sync(&mut self) -> Result<()>; fn allocate(&mut self, offset: usize, size: usize) -> Result<()>; } diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 18057696..831f5343 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -57,10 +57,6 @@ impl WriteExt for ObfuscatedWriter { self.0.truncate(offset) } - fn sync(&mut self) -> IoResult<()> { - self.0.sync() - } - fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { self.0.allocate(offset, size) } diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index 5a621ff6..f977f695 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -37,10 +37,10 @@ pub(super) fn build_file_writer( /// Append-only writer for log file. It also handles the file header write. pub struct LogFileWriter { + handle: Arc, writer: F::Writer, written: usize, capacity: usize, - last_sync: usize, } impl LogFileWriter { @@ -52,10 +52,10 @@ impl LogFileWriter { ) -> Result { let file_size = handle.file_size()?; let mut f = Self { + handle, writer, written: file_size, capacity: file_size, - last_sync: file_size, }; // TODO: add tests for file_size in [header_len, max_encode_len]. if file_size < LogFileFormat::encode_len(format.version) || force_reset { @@ -68,7 +68,6 @@ impl LogFileWriter { fn write_header(&mut self, format: LogFileFormat) -> Result<()> { self.writer.seek(SeekFrom::Start(0))?; - self.last_sync = 0; self.written = 0; let mut buf = Vec::with_capacity(LogFileFormat::encode_len(format.version)); format.encode(&mut buf)?; @@ -121,19 +120,11 @@ impl LogFileWriter { } pub fn sync(&mut self) -> Result<()> { - if self.last_sync < self.written { - let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM); - self.writer.sync()?; - self.last_sync = self.written; - } + let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM); + self.handle.sync()?; Ok(()) } - #[inline] - pub fn since_last_sync(&self) -> usize { - self.written - self.last_sync - } - #[inline] pub fn offset(&self) -> usize { self.written diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 870d568a..81f65c72 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -14,7 +14,9 @@ use crate::config::Config; use crate::env::FileSystem; use crate::event_listener::EventListener; use crate::metrics::*; -use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog}; +use crate::pipe_log::{ + FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, +}; use crate::{perf_context, Error, Result}; use super::format::{FileNameExt, LogFileFormat}; @@ -119,7 +121,6 @@ pub(super) struct SinglePipe { dir: String, file_format: LogFileFormat, target_file_size: usize, - bytes_per_sync: usize, file_system: Arc, listeners: Vec>, @@ -223,7 +224,6 @@ impl SinglePipe { dir: cfg.dir.clone(), file_format: LogFileFormat::new(cfg.format_version, alignment), target_file_size: cfg.target_file_size.0 as usize, - bytes_per_sync: cfg.bytes_per_sync.0 as usize, file_system, listeners, @@ -349,12 +349,24 @@ impl SinglePipe { reader.read(handle) } - fn append(&self, bytes: &[u8]) -> Result { + fn append(&self, bytes: &mut T) -> Result { fail_point!("file_pipe_log::append"); let mut active_file = self.active_file.lock(); + if active_file.writer.offset() >= self.target_file_size { + if let Err(e) = self.rotate_imp(&mut active_file) { + panic!( + "error when rotate [{:?}:{}]: {}", + self.queue, active_file.seq, e + ); + } + } + let seq = active_file.seq; - #[cfg(feature = "failpoints")] let format = active_file.format; + let ctx = LogFileContext { + id: FileId::new(self.queue, seq), + version: format.version, + }; let writer = &mut active_file.writer; #[cfg(feature = "failpoints")] @@ -378,7 +390,7 @@ impl SinglePipe { } } let start_offset = writer.offset(); - if let Err(e) = writer.write(bytes, self.target_file_size) { + if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) { if let Err(te) = writer.truncate() { panic!( "error when truncate {} after error: {}, get: {}", @@ -401,15 +413,11 @@ impl SinglePipe { Ok(handle) } - fn maybe_sync(&self, force: bool) -> Result<()> { + fn sync(&self) -> Result<()> { let mut active_file = self.active_file.lock(); let seq = active_file.seq; let writer = &mut active_file.writer; - if writer.offset() >= self.target_file_size { - if let Err(e) = self.rotate_imp(&mut active_file) { - panic!("error when rotate [{:?}:{}]: {}", self.queue, seq, e); - } - } else if writer.since_last_sync() >= self.bytes_per_sync || force { + { let _t = StopWatch::new(perf_context!(log_sync_duration)); if let Err(e) = writer.sync() { panic!("error when sync [{:?}:{}]: {}", self.queue, seq, e,); @@ -468,14 +476,6 @@ impl SinglePipe { self.flush_metrics(current.total_len); Ok((current.first_seq_in_use - prev.first_seq_in_use) as usize) } - - fn fetch_active_file(&self) -> LogFileContext { - let files = self.files.read(); - LogFileContext { - id: FileId::new(self.queue, files.first_seq + files.fds.len() as u64 - 1), - version: files.fds.back().unwrap().format.version, - } - } } /// A [`PipeLog`] implementation that stores data in filesystem. @@ -516,13 +516,17 @@ impl PipeLog for DualPipes { } #[inline] - fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result { + fn append( + &self, + queue: LogQueue, + bytes: &mut T, + ) -> Result { self.pipes[queue as usize].append(bytes) } #[inline] - fn maybe_sync(&self, queue: LogQueue, force: bool) -> Result<()> { - self.pipes[queue as usize].maybe_sync(force) + fn sync(&self, queue: LogQueue) -> Result<()> { + self.pipes[queue as usize].sync() } #[inline] @@ -544,11 +548,6 @@ impl PipeLog for DualPipes { fn purge_to(&self, file_id: FileId) -> Result { self.pipes[file_id.queue as usize].purge_to(file_id.seq) } - - #[inline] - fn fetch_active_file(&self, queue: LogQueue) -> LogFileContext { - self.pipes[queue as usize].fetch_active_file() - } } #[cfg(test)] @@ -614,7 +613,6 @@ mod tests { let cfg = Config { dir: path.to_owned(), target_file_size: ReadableSize::kb(1), - bytes_per_sync: ReadableSize::kb(32), ..Default::default() }; let queue = LogQueue::Append; @@ -626,17 +624,17 @@ mod tests { // generate file 1, 2, 3 let content: Vec = vec![b'a'; 1024]; - let file_handle = pipe_log.append(queue, &content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &content).unwrap(); assert_eq!(file_handle.id.seq, 1); assert_eq!(file_handle.offset, header_size); - assert_eq!(pipe_log.file_span(queue).1, 2); + assert_eq!(pipe_log.file_span(queue).1, 1); - let file_handle = pipe_log.append(queue, &content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &content).unwrap(); assert_eq!(file_handle.id.seq, 2); assert_eq!(file_handle.offset, header_size); - assert_eq!(pipe_log.file_span(queue).1, 3); + assert_eq!(pipe_log.file_span(queue).1, 2); + + pipe_log.rotate(queue).unwrap(); // purge file 1 assert_eq!(pipe_log.purge_to(FileId { queue, seq: 2 }).unwrap(), 1); @@ -647,13 +645,11 @@ mod tests { // append position let s_content = b"short content".to_vec(); - let file_handle = pipe_log.append(queue, &s_content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &s_content).unwrap(); assert_eq!(file_handle.id.seq, 3); assert_eq!(file_handle.offset, header_size); - let file_handle = pipe_log.append(queue, &s_content).unwrap(); - pipe_log.maybe_sync(queue, false).unwrap(); + let file_handle = pipe_log.append(queue, &mut &s_content).unwrap(); assert_eq!(file_handle.id.seq, 3); assert_eq!( file_handle.offset, @@ -679,11 +675,6 @@ mod tests { // leave only 1 file to truncate pipe_log.purge_to(FileId { queue, seq: 3 }).unwrap(); assert_eq!(pipe_log.file_span(queue), (3, 3)); - - // fetch active file - let file_context = pipe_log.fetch_active_file(LogQueue::Append); - assert_eq!(file_context.version, cfg.format_version); - assert_eq!(file_context.id.seq, 3); } #[test] @@ -770,7 +761,6 @@ mod tests { let cfg = Config { dir: path.to_owned(), target_file_size: ReadableSize(1), - bytes_per_sync: ReadableSize::kb(32), // super large capacity for recycling purge_threshold: ReadableSize::mb(100), enable_log_recycle: true, @@ -787,9 +777,10 @@ mod tests { } let mut handles = Vec::new(); for i in 0..10 { - handles.push(pipe_log.append(&content(i)).unwrap()); - pipe_log.maybe_sync(true).unwrap(); + handles.push(pipe_log.append(&mut &content(i)).unwrap()); + pipe_log.sync().unwrap(); } + pipe_log.rotate().unwrap(); let (first, last) = pipe_log.file_span(); assert_eq!(pipe_log.purge_to(last).unwrap() as u64, last - first); // Try to read stale file. @@ -808,8 +799,8 @@ mod tests { // Try to reuse. let mut handles = Vec::new(); for i in 0..10 { - handles.push(pipe_log.append(&content(i + 1)).unwrap()); - pipe_log.maybe_sync(true).unwrap(); + handles.push(pipe_log.append(&mut &content(i + 1)).unwrap()); + pipe_log.sync().unwrap(); } // Verify the data. for (i, handle) in handles.into_iter().enumerate() { diff --git a/src/log_batch.rs b/src/log_batch.rs index 5d3f0676..714120cd 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -11,7 +11,7 @@ use protobuf::Message; use crate::codec::{self, NumberEncoder}; use crate::memtable::EntryIndex; use crate::metrics::StopWatch; -use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext}; +use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext, ReactiveBytes}; use crate::util::{crc32, lz4}; use crate::{perf_context, Error, Result}; @@ -902,6 +902,13 @@ impl LogBatch { } } +impl ReactiveBytes for LogBatch { + fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8] { + self.prepare_write(ctx).unwrap(); + self.encoded_bytes() + } +} + /// Verifies the checksum of a slice of bytes that sequentially holds data and /// checksum. The checksum field may be signed by XOR-ing with an u32. fn verify_checksum_with_signature(buf: &[u8], signature: Option) -> Result<()> { diff --git a/src/memtable.rs b/src/memtable.rs index 2c6fc055..0cb80fbc 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -721,16 +721,12 @@ impl MemTable { } } - /// Returns the number of entries smaller than or equal to `gate`. - pub fn entries_count_before(&self, mut gate: FileId) -> usize { - gate.seq += 1; - let idx = self - .entry_indexes - .binary_search_by_key(&gate, |ei| ei.entries.unwrap().id); - match idx { - Ok(idx) => idx, - Err(idx) => idx, - } + #[inline] + pub fn has_at_least_some_entries_before(&self, gate: FileId, count: usize) -> bool { + debug_assert!(count > 0); + self.entry_indexes + .get(count - 1) + .map_or(false, |ei| ei.entries.unwrap().id.seq <= gate.seq) } /// Returns the region ID. diff --git a/src/pipe_log.rs b/src/pipe_log.rs index ff392cb6..aa9b7925 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -146,23 +146,45 @@ impl LogFileContext { } } +/// Some bytes whose value might be dependent on the file it is written to. +pub trait ReactiveBytes { + fn as_bytes(&mut self, ctx: &LogFileContext) -> &[u8]; +} + +impl ReactiveBytes for &T +where + T: AsRef<[u8]> + ?Sized, +{ + fn as_bytes(&mut self, _ctx: &LogFileContext) -> &[u8] { + (*self).as_ref() + } +} + /// A `PipeLog` serves reads and writes over multiple queues of log files. +/// +/// # Safety +/// +/// The pipe will panic if it encounters an unrecoverable failure. Otherwise the +/// operations on it should be atomic, i.e. failed operation will not affect +/// other ones, and user can still use it afterwards without breaking +/// consistency. pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. - /// - /// The result of `fetch_active_file` will not be affected by this method. - fn append(&self, queue: LogQueue, bytes: &[u8]) -> Result; + fn append( + &self, + queue: LogQueue, + bytes: &mut T, + ) -> Result; - /// Hints it to synchronize buffered writes. The synchronization is - /// mandotory when `sync` is true. + /// Synchronizes all buffered writes. /// /// This operation might incurs a great latency overhead. It's advised to /// call it once every batch of writes. - fn maybe_sync(&self, queue: LogQueue, sync: bool) -> Result<()>; + fn sync(&self, queue: LogQueue) -> Result<()>; /// Returns the smallest and largest file sequence number, still in use, /// of the specified log queue. @@ -194,8 +216,4 @@ pub trait PipeLog: Sized { /// /// Returns the number of deleted files. fn purge_to(&self, file_id: FileId) -> Result; - - /// Returns [`LogFileContext`] of the active file in the specific - /// log queue. - fn fetch_active_file(&self, queue: LogQueue) -> LogFileContext; } diff --git a/src/purge.rs b/src/purge.rs index 9be81c5b..cfefd219 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -76,7 +76,7 @@ where let mut should_compact = HashSet::new(); if self.needs_rewrite_log_files(LogQueue::Rewrite) { should_compact.extend(self.rewrite_rewrite_queue()?); - self.purge_to( + self.rescan_memtables_and_purge_stale_files( LogQueue::Rewrite, self.pipe_log.file_span(LogQueue::Rewrite).1, )?; @@ -110,7 +110,10 @@ where if append_queue_barrier == first_append && first_append < latest_append { warn!("Unable to purge expired files: blocked by barrier"); } - self.purge_to(LogQueue::Append, append_queue_barrier)?; + self.rescan_memtables_and_purge_stale_files( + LogQueue::Append, + append_queue_barrier, + )?; } } Ok(should_compact.into_iter().collect()) @@ -139,7 +142,7 @@ where if exit_after_step == Some(2) { return; } - self.purge_to( + self.rescan_memtables_and_purge_stale_files( LogQueue::Append, self.pipe_log.file_span(LogQueue::Append).1, ) @@ -150,7 +153,7 @@ where pub fn must_rewrite_rewrite_queue(&self) { let _lk = self.force_rewrite_candidates.try_lock().unwrap(); self.rewrite_rewrite_queue().unwrap(); - self.purge_to( + self.rescan_memtables_and_purge_stale_files( LogQueue::Rewrite, self.pipe_log.file_span(LogQueue::Rewrite).1, ) @@ -208,23 +211,32 @@ where let mut new_candidates = HashMap::with_capacity(rewrite_candidates.len()); let memtables = self.memtables.collect(|t| { - if let Some(f) = t.min_file_seq(LogQueue::Append) { - let sparse = t - .entries_count_before(FileId::new(LogQueue::Append, rewrite_watermark)) - < MAX_REWRITE_ENTRIES_PER_REGION; - // counter is the times that target region triggers force compact. - let compact_counter = rewrite_candidates.get(&t.region_id()).unwrap_or(&0); - if f < compact_watermark - && !sparse - && *compact_counter < MAX_COUNT_BEFORE_FORCE_REWRITE - { + let min_append_seq = t.min_file_seq(LogQueue::Append).unwrap_or(u64::MAX); + let old = min_append_seq < compact_watermark || t.rewrite_count() > 0; + let has_something_to_rewrite = min_append_seq <= rewrite_watermark; + let append_heavy = t.has_at_least_some_entries_before( + FileId::new(LogQueue::Append, rewrite_watermark), + MAX_REWRITE_ENTRIES_PER_REGION + t.rewrite_count(), + ); + let full_heavy = t.has_at_least_some_entries_before( + FileId::new(LogQueue::Append, rewrite_watermark), + MAX_REWRITE_ENTRIES_PER_REGION, + ); + // counter is the times that target region triggers force compact. + let compact_counter = rewrite_candidates.get(&t.region_id()).unwrap_or(&0); + if old && full_heavy { + if *compact_counter < MAX_COUNT_BEFORE_FORCE_REWRITE { + // repeatedly ask user to compact these heavy regions. should_compact.push(t.region_id()); new_candidates.insert(t.region_id(), *compact_counter + 1); - } else if f < rewrite_watermark { - return sparse || *compact_counter >= MAX_COUNT_BEFORE_FORCE_REWRITE; + return false; + } else { + // user is not responsive, do the rewrite ourselves. + should_compact.push(t.region_id()); + return has_something_to_rewrite; } } - false + !append_heavy && has_something_to_rewrite }); self.rewrite_memtables( @@ -266,7 +278,7 @@ where } // Exclusive. - fn purge_to(&self, queue: LogQueue, seq: FileSeq) -> Result<()> { + fn rescan_memtables_and_purge_stale_files(&self, queue: LogQueue, seq: FileSeq) -> Result<()> { let min_seq = self.memtables.fold(seq, |min, t| { t.min_file_seq(queue).map_or(min, |m| std::cmp::min(min, m)) }); @@ -347,16 +359,15 @@ where rewrite_watermark: Option, sync: bool, ) -> Result<()> { - let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; - if len == 0 { - return self.pipe_log.maybe_sync(LogQueue::Rewrite, sync); + if log_batch.is_empty() { + debug_assert!(sync); + return self.pipe_log.sync(LogQueue::Rewrite); + } + log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?; + let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; + if sync { + self.pipe_log.sync(LogQueue::Rewrite)? } - let file_context = self.pipe_log.fetch_active_file(LogQueue::Rewrite); - log_batch.prepare_write(&file_context)?; - let file_handle = self - .pipe_log - .append(LogQueue::Rewrite, log_batch.encoded_bytes())?; - self.pipe_log.maybe_sync(LogQueue::Rewrite, sync)?; log_batch.finish_write(file_handle); self.memtables.apply_rewrite_writes( log_batch.drain(), diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index d042f35a..7a51d717 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -127,7 +127,7 @@ fn test_pipe_log_listeners() { assert_eq!(hook.0[&LogQueue::Append].appends(), i); assert_eq!(hook.0[&LogQueue::Append].applys(), i); } - assert_eq!(hook.0[&LogQueue::Append].files(), 11); + assert_eq!(hook.0[&LogQueue::Append].files(), 10); engine.purge_expired_files().unwrap(); assert_eq!(hook.0[&LogQueue::Append].purged(), 8); @@ -154,7 +154,7 @@ fn test_pipe_log_listeners() { assert_eq!(hook.0[&LogQueue::Append].applys(), 32); engine.purge_expired_files().unwrap(); - assert_eq!(hook.0[&LogQueue::Append].purged(), 13); + assert_eq!(hook.0[&LogQueue::Append].purged(), 14); assert_eq!(hook.0[&LogQueue::Rewrite].purged(), rewrite_files as u64); // Write region 3 without applying. diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index 219cc630..52302327 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -84,7 +84,6 @@ fn test_file_write_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(1024), ..Default::default() }; @@ -133,7 +132,6 @@ fn test_file_rotate_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(4), ..Default::default() }; @@ -150,6 +148,9 @@ fn test_file_rotate_error() { engine .write(&mut generate_batch(1, 3, 4, Some(&entry)), false) .unwrap(); + engine + .write(&mut generate_batch(1, 4, 5, Some(&entry)), false) + .unwrap(); assert_eq!(engine.file_span(LogQueue::Append).1, 1); // The next write will be followed by a rotate. { @@ -209,7 +210,6 @@ fn test_concurrent_write_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(1024), ..Default::default() }; @@ -295,7 +295,6 @@ fn test_non_atomic_write_error() { .unwrap(); let cfg = Config { dir: dir.path().to_str().unwrap().to_owned(), - bytes_per_sync: ReadableSize::kb(1024), target_file_size: ReadableSize::kb(1024), ..Default::default() };