From 9880e74aa657bcfca499199e77c9068d99674fdc Mon Sep 17 00:00:00 2001 From: zaidoon Date: Wed, 20 Mar 2024 00:12:14 -0400 Subject: [PATCH] Expose rate limiter with mode feature --- src/db_options.rs | 59 ++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 2 +- tests/test_db.rs | 38 +++++++++++++++++++++++++++--- 3 files changed, 94 insertions(+), 5 deletions(-) diff --git a/src/db_options.rs b/src/db_options.rs index 462683f..56a81cb 100644 --- a/src/db_options.rs +++ b/src/db_options.rs @@ -668,7 +668,7 @@ impl BlockBasedOptions { /// See full [list](https://github.com/facebook/rocksdb/blob/v8.6.7/include/rocksdb/table.h#L493-L521) /// of the supported versions. /// - /// Default: 5. + /// Default: 6. pub fn set_format_version(&mut self, version: i32) { unsafe { ffi::rocksdb_block_based_options_set_format_version(self.inner, version); @@ -3154,6 +3154,55 @@ impl Options { } } + /// Create a RateLimiter object, which can be shared among RocksDB instances to + /// control write rate of flush and compaction. + /// + /// rate_bytes_per_sec: this is the only parameter you want to set most of the + /// time. It controls the total write rate of compaction and flush in bytes per + /// second. Currently, RocksDB does not enforce rate limit for anything other + /// than flush and compaction, e.g. write to WAL. + /// + /// refill_period_us: this controls how often tokens are refilled. For example, + /// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to + /// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to + /// burstier writes while smaller value introduces more CPU overhead. + /// The default should work for most cases. + /// + /// fairness: RateLimiter accepts high-pri requests and low-pri requests. + /// A low-pri request is usually blocked in favor of hi-pri request. Currently, + /// RocksDB assigns low-pri to request from compaction and high-pri to request + /// from flush. Low-pri requests can get blocked if flush requests come in + /// continuously. This fairness parameter grants low-pri requests permission by + /// 1/fairness chance even though high-pri requests exist to avoid starvation. + /// You should be good by leaving it at default 10. + /// + /// mode: Mode indicates which types of operations count against the limit. + /// + /// auto_tuned: Enables dynamic adjustment of rate limit within the range + /// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to + /// the recent demand for background I/O. + pub fn set_ratelimiter_with_mode( + &mut self, + rate_bytes_per_sec: i64, + refill_period_us: i64, + fairness: i32, + mode: RateLimiterMode, + auto_tuned: bool, + ) { + unsafe { + let ratelimiter = ffi::rocksdb_ratelimiter_create_with_mode( + rate_bytes_per_sec, + refill_period_us, + fairness, + mode as c_int, + auto_tuned, + ); + // Since limiter is wrapped in shared_ptr, we don't need to + // call rocksdb_ratelimiter_destroy explicitly. + ffi::rocksdb_options_set_ratelimiter(self.inner, ratelimiter); + } + } + /// Sets the maximal size of the info log file. /// /// If the log file is larger than `max_log_file_size`, a new info log file @@ -4036,6 +4085,14 @@ pub enum DBRecoveryMode { SkipAnyCorruptedRecord = ffi::rocksdb_skip_any_corrupted_records_recovery as isize, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[repr(i32)] +pub enum RateLimiterMode { + KReadsOnly = 0, + KWritesOnly = 1, + KAllIo = 2, +} + pub struct FifoCompactOptions { pub(crate) inner: *mut ffi::rocksdb_fifo_compaction_options_t, } diff --git a/src/lib.rs b/src/lib.rs index 1fa8d58..0dc9039 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,7 +118,7 @@ pub use crate::{ CompactOptions, CuckooTableOptions, DBCompactionStyle, DBCompressionType, DBPath, DBRecoveryMode, DataBlockIndexType, FifoCompactOptions, FlushOptions, IngestExternalFileOptions, KeyEncodingType, LogLevel, MemtableFactory, Options, - PlainTableFactoryOptions, ReadOptions, ReadTier, UniversalCompactOptions, + PlainTableFactoryOptions, RateLimiterMode, ReadOptions, ReadTier, UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, WriteBufferManager, WriteOptions, }, db_pinnable_slice::DBPinnableSlice, diff --git a/tests/test_db.rs b/tests/test_db.rs index 67a5eda..0ab41ad 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -24,9 +24,9 @@ use rust_rocksdb::{ perf::get_memory_usage_stats, BlockBasedOptions, BottommostLevelCompaction, Cache, ColumnFamilyDescriptor, CompactOptions, CuckooTableOptions, DBAccess, DBCompactionStyle, DBWithThreadMode, Env, Error, ErrorKind, FifoCompactOptions, IteratorMode, MultiThreaded, - Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded, SliceTransform, Snapshot, - UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, WriteBatch, DB, - DEFAULT_COLUMN_FAMILY_NAME, + Options, PerfContext, PerfMetric, RateLimiterMode, ReadOptions, SingleThreaded, SliceTransform, + Snapshot, UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, + WriteBatch, DB, DEFAULT_COLUMN_FAMILY_NAME, }; use util::{assert_iter, pair, DBPath}; @@ -1569,3 +1569,35 @@ fn test_atomic_flush_cfs() { ); } } + +#[test] +fn ratelimiter_with_mode_test() { + let path = DBPath::new("_rust_rocksdb_ratelimiter_with_mode_test"); + { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + opts.set_ratelimiter_with_mode(10000000, 100000, 10, RateLimiterMode::KAllIo, true); + + let cfs = vec!["cf1"]; + let db = DB::open_cf(&opts, &path, cfs).unwrap(); + let cf1 = db.cf_handle("cf1").unwrap(); + db.put_cf(&cf1, b"k1", b"v1").unwrap(); + db.put_cf(&cf1, b"k2", b"v2").unwrap(); + db.put_cf(&cf1, b"k3", b"v3").unwrap(); + db.put_cf(&cf1, b"k4", b"v4").unwrap(); + db.put_cf(&cf1, b"k5", b"v5").unwrap(); + + db.put(b"k1", b"v1").unwrap(); + db.put(b"k2", b"v2").unwrap(); + db.put(b"k3", b"v3").unwrap(); + db.put(b"k4", b"v4").unwrap(); + db.put(b"k5", b"v5").unwrap(); + + let mut wait_for_compact_opts: WaitForCompactOptions = WaitForCompactOptions::default(); + wait_for_compact_opts.set_abort_on_pause(false); + wait_for_compact_opts.set_flush(true); + + db.wait_for_compact(&wait_for_compact_opts).unwrap() + } +}