Skip to content

Commit

Permalink
Expose rate limiter with mode feature
Browse files Browse the repository at this point in the history
  • Loading branch information
zaidoon1 committed Mar 20, 2024
1 parent 4b8491b commit 9880e74
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 5 deletions.
59 changes: 58 additions & 1 deletion src/db_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ impl BlockBasedOptions {
/// See full [list](https://github.com/facebook/rocksdb/blob/v8.6.7/include/rocksdb/table.h#L493-L521)
/// of the supported versions.
///
/// Default: 5.
/// Default: 6.
pub fn set_format_version(&mut self, version: i32) {
unsafe {
ffi::rocksdb_block_based_options_set_format_version(self.inner, version);
Expand Down Expand Up @@ -3154,6 +3154,55 @@ impl Options {
}
}

/// Create a RateLimiter object, which can be shared among RocksDB instances to
/// control write rate of flush and compaction.
///
/// rate_bytes_per_sec: this is the only parameter you want to set most of the
/// time. It controls the total write rate of compaction and flush in bytes per
/// second. Currently, RocksDB does not enforce rate limit for anything other
/// than flush and compaction, e.g. write to WAL.
///
/// refill_period_us: this controls how often tokens are refilled. For example,
/// when rate_bytes_per_sec is set to 10MB/s and refill_period_us is set to
/// 100ms, then 1MB is refilled every 100ms internally. Larger value can lead to
/// burstier writes while smaller value introduces more CPU overhead.
/// The default should work for most cases.
///
/// fairness: RateLimiter accepts high-pri requests and low-pri requests.
/// A low-pri request is usually blocked in favor of hi-pri request. Currently,
/// RocksDB assigns low-pri to request from compaction and high-pri to request
/// from flush. Low-pri requests can get blocked if flush requests come in
/// continuously. This fairness parameter grants low-pri requests permission by
/// 1/fairness chance even though high-pri requests exist to avoid starvation.
/// You should be good by leaving it at default 10.
///
/// mode: Mode indicates which types of operations count against the limit.
///
/// auto_tuned: Enables dynamic adjustment of rate limit within the range
/// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to
/// the recent demand for background I/O.
pub fn set_ratelimiter_with_mode(
&mut self,
rate_bytes_per_sec: i64,
refill_period_us: i64,
fairness: i32,
mode: RateLimiterMode,
auto_tuned: bool,
) {
unsafe {
let ratelimiter = ffi::rocksdb_ratelimiter_create_with_mode(
rate_bytes_per_sec,
refill_period_us,
fairness,
mode as c_int,
auto_tuned,
);
// Since limiter is wrapped in shared_ptr, we don't need to
// call rocksdb_ratelimiter_destroy explicitly.
ffi::rocksdb_options_set_ratelimiter(self.inner, ratelimiter);
}
}

/// Sets the maximal size of the info log file.
///
/// If the log file is larger than `max_log_file_size`, a new info log file
Expand Down Expand Up @@ -4036,6 +4085,14 @@ pub enum DBRecoveryMode {
SkipAnyCorruptedRecord = ffi::rocksdb_skip_any_corrupted_records_recovery as isize,
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
#[repr(i32)]
pub enum RateLimiterMode {
KReadsOnly = 0,
KWritesOnly = 1,
KAllIo = 2,
}

pub struct FifoCompactOptions {
pub(crate) inner: *mut ffi::rocksdb_fifo_compaction_options_t,
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub use crate::{
CompactOptions, CuckooTableOptions, DBCompactionStyle, DBCompressionType, DBPath,
DBRecoveryMode, DataBlockIndexType, FifoCompactOptions, FlushOptions,
IngestExternalFileOptions, KeyEncodingType, LogLevel, MemtableFactory, Options,
PlainTableFactoryOptions, ReadOptions, ReadTier, UniversalCompactOptions,
PlainTableFactoryOptions, RateLimiterMode, ReadOptions, ReadTier, UniversalCompactOptions,
UniversalCompactionStopStyle, WaitForCompactOptions, WriteBufferManager, WriteOptions,
},
db_pinnable_slice::DBPinnableSlice,
Expand Down
38 changes: 35 additions & 3 deletions tests/test_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use rust_rocksdb::{
perf::get_memory_usage_stats, BlockBasedOptions, BottommostLevelCompaction, Cache,
ColumnFamilyDescriptor, CompactOptions, CuckooTableOptions, DBAccess, DBCompactionStyle,
DBWithThreadMode, Env, Error, ErrorKind, FifoCompactOptions, IteratorMode, MultiThreaded,
Options, PerfContext, PerfMetric, ReadOptions, SingleThreaded, SliceTransform, Snapshot,
UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions, WriteBatch, DB,
DEFAULT_COLUMN_FAMILY_NAME,
Options, PerfContext, PerfMetric, RateLimiterMode, ReadOptions, SingleThreaded, SliceTransform,
Snapshot, UniversalCompactOptions, UniversalCompactionStopStyle, WaitForCompactOptions,
WriteBatch, DB, DEFAULT_COLUMN_FAMILY_NAME,
};
use util::{assert_iter, pair, DBPath};

Expand Down Expand Up @@ -1569,3 +1569,35 @@ fn test_atomic_flush_cfs() {
);
}
}

#[test]
fn ratelimiter_with_mode_test() {
let path = DBPath::new("_rust_rocksdb_ratelimiter_with_mode_test");
{
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.set_ratelimiter_with_mode(10000000, 100000, 10, RateLimiterMode::KAllIo, true);

let cfs = vec!["cf1"];
let db = DB::open_cf(&opts, &path, cfs).unwrap();
let cf1 = db.cf_handle("cf1").unwrap();
db.put_cf(&cf1, b"k1", b"v1").unwrap();
db.put_cf(&cf1, b"k2", b"v2").unwrap();
db.put_cf(&cf1, b"k3", b"v3").unwrap();
db.put_cf(&cf1, b"k4", b"v4").unwrap();
db.put_cf(&cf1, b"k5", b"v5").unwrap();

db.put(b"k1", b"v1").unwrap();
db.put(b"k2", b"v2").unwrap();
db.put(b"k3", b"v3").unwrap();
db.put(b"k4", b"v4").unwrap();
db.put(b"k5", b"v5").unwrap();

let mut wait_for_compact_opts: WaitForCompactOptions = WaitForCompactOptions::default();
wait_for_compact_opts.set_abort_on_pause(false);
wait_for_compact_opts.set_flush(true);

db.wait_for_compact(&wait_for_compact_opts).unwrap()
}
}

0 comments on commit 9880e74

Please sign in to comment.