diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 000000000..6706980f5 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,3 @@ +[env] +CXX="g++" +CXXFLAGS="-std=c++20 -DCXXASYNC_HAVE_COROUTINE_HEADER" diff --git a/.gitmodules b/.gitmodules index 1dce46cb8..be4195355 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,9 +1,9 @@ [submodule "rocksdb"] path = librocksdb_sys/rocksdb - url = https://github.com/tikv/rocksdb.git - branch = 6.29.tikv + url = https://github.com/subains/rocksdb.git + branch = coroutines+iouring [submodule "titan"] path = librocksdb_sys/libtitan_sys/titan - url = https://github.com/tikv/titan.git - branch = master + url = https://github.com/subains/titan.git + branch = coroutine+iouring diff --git a/Cargo.toml b/Cargo.toml index ef55b35a0..115a982a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ valgrind = [] [dependencies] libc = "0.2.11" librocksdb_sys = { path = "librocksdb_sys" } +#cxxrocksdb = { path = "cxxrocksdb" } [dev-dependencies] crc = "1.8" diff --git a/cxxrocksdb/Cargo.toml b/cxxrocksdb/Cargo.toml new file mode 100644 index 000000000..56e15a5ac --- /dev/null +++ b/cxxrocksdb/Cargo.toml @@ -0,0 +1,36 @@ +[package] +edition = "2018" +version = "0.1.0" +name = "cxxrocksdb" +authors = ["Sunny Bains "] + +[lib] +name = "cxxrocksdb" +path = "src/lib.rs" +crate-type = ["staticlib"] + +[dependencies] +once_cell = "1" +async-recursion = "0.3" +tokio = { version = "1", features = ["full"] } +rocksdb = { path = ".." } +#iou = "0.3.3" + +[dependencies.cxx] +version = "1" +cxx-build = "1" +features = ["c++20"] + +[dependencies.futures] +version = "0.3" +features = ["thread-pool"] + +[build-dependencies] +cc = "1.0.3" +cmake = "0.1" +cxx-build = "1" +pkg-config = "0.3" + +[dependencies.snappy-sys] +git = "https://github.com/busyjay/rust-snappy.git" +branch = "static-link" diff --git a/cxxrocksdb/build.rs b/cxxrocksdb/build.rs new file mode 100644 index 000000000..252d25d06 --- /dev/null +++ b/cxxrocksdb/build.rs @@ -0,0 +1,23 @@ +fn main() { + println!("cargo:rerun-if-changed=build.rs"); + println!("cargo:rerun-if-changed=include/cxxrocksdb.h"); + println!("cargo:rerun-if-changed=src/cxxrocksdb.cc"); + println!("cargo:rustc-link-arg=-lcxxbridge1"); + + // FIXME: Use linklib() from top level build.rs + println!("cargo:rustc-link-arg=-lstdc++"); + println!("cargo:rerun-if-changed=src/cxxrocksdb.cc"); + + println!("cargo:rustc-link-lib=static=cxxrocksdb"); + + cxx_build::bridge("src/lib.rs") + .file("src/cxxrocksdb.cc") + .flag("-DCXXASYNC_HAVE_COROUTINE_HEADER") + .flag("-fcoroutines") + .flag("-std=c++20") + .flag_if_supported("-Wall") + .include("include") + .include("../librocksdb_sys/rocksdb/include") + .include("../librocksdb_sys/crocksdb") + .compile("cxxrocksdb"); +} diff --git a/cxxrocksdb/include/cxxrocksdb.h b/cxxrocksdb/include/cxxrocksdb.h new file mode 100644 index 000000000..92e494f2b --- /dev/null +++ b/cxxrocksdb/include/cxxrocksdb.h @@ -0,0 +1,90 @@ +#pragma once + +#include "rocksdb/db.h" +#include "crocksdb/c.h" +#include "rust/cxx.h" +#include "rocksdb/async_future.h" + +#include +#include +#include +#include + +#include + +struct CRocksDB; +struct RustStatus; +struct Async_result; + +using ROCKSDB_NAMESPACE::Async_future; +using ROCKSDB_NAMESPACE::ReadOptions; +using ROCKSDB_NAMESPACE::PinnableSlice; +using ReadTier = ROCKSDB_NAMESPACE::ReadTier; +using Submit_queue = Async_future::Submit_queue; +using Return_type = Async_future::Promise_type::Return_type; + +struct Async_reader { + Async_reader() = delete; + Async_reader(Async_reader&&) = delete; + Async_reader(const Async_reader&) = delete; + Async_reader& operator=(Async_reader&&) = delete; + Async_reader& operator=(const Async_reader&) = delete; + + Async_reader(rocksdb::DB *db, size_t io_uring_size); + + ~Async_reader() noexcept; + + /** Reap entries from the io_uring completion queue (CQ). + @return Number of processed CQEs */ + uint32_t io_uring_reap_cq() const; + + /** Peek and check if there are any CQEs to process. + @return true if there are CQEs in the CQ. */ + bool io_uring_peek_cq() const; + + Async_result get(const ReadOptions *ropts, rust::String k) const; + + void setup_io_uring_sq_handler(ReadOptions *ropts) const; + + uint32_t pending_io_uring_sqe_count() const { + return m_n_pending_sqe.load(); + } + + static RustStatus get_result(Async_result async_result, rust::String &v); + + private: + using Promise = Async_future::promise_type; + + static void schedule_task(Promise* promise) noexcept; + +private: + struct IO_key { + bool operator==(const IO_key& rhs) const { + return m_fd == rhs.m_fd && m_off == rhs.m_off; + } + + int m_fd{-1}; + off_t m_off{}; + }; + + struct IO_key_hash { + size_t operator()(const IO_key &io_key) const noexcept { + return io_key.m_fd ^ io_key.m_off; + } + }; + + using IO_value = std::unordered_set; + + /** All data members are mutable so that we can use const functions. + This allows us to use std::shared_ptr from Rust with an immutable + reference. */ + mutable rocksdb::DB *m_db{}; + mutable std::atomic m_n_pending_sqe{}; + mutable std::shared_ptr m_io_uring{}; + mutable std::shared_ptr m_submit_queue{}; + mutable std::unordered_map m_pending_io{}; +}; + +std::shared_ptr new_async_reader(CRocksDB* db, uint32_t io_uring_size); + +RustStatus get_async_result(Async_result async_result, rust::String &v); diff --git a/cxxrocksdb/src/cxxrocksdb.cc b/cxxrocksdb/src/cxxrocksdb.cc new file mode 100644 index 000000000..3649ce4b5 --- /dev/null +++ b/cxxrocksdb/src/cxxrocksdb.cc @@ -0,0 +1,230 @@ +#include "cxxrocksdb.h" +#include "cxxrocksdb/src/lib.rs.h" + +#include + +using ROCKSDB_NAMESPACE::Async_future; +using ROCKSDB_NAMESPACE::ReadOptions; + +using Submit_queue = rocksdb::Async_future::Submit_queue; + +Async_reader::~Async_reader() noexcept { + io_uring_queue_exit(m_io_uring.get()); +} + +Async_reader::Async_reader(rocksdb::DB* db, size_t io_uring_size) + : m_db(db), + m_io_uring(new io_uring) { + auto ret = io_uring_queue_init(io_uring_size, m_io_uring.get(), 0); + + if (ret < 0) { + throw "io_uring_queue_init failed"; + } + + m_submit_queue = std::make_shared( + [this](Async_future::IO_ctx *ctx, int fd, off_t off, Submit_queue::Ops op) -> Async_future { + using Status = ROCKSDB_NAMESPACE::Status; + using SubCode = Status::SubCode; + using IOStatus = ROCKSDB_NAMESPACE::IOStatus; + + ctx->m_fd = fd; + ctx->m_off = off; + + assert(op == Submit_queue::Ops::Read); + + IO_key io_key{fd, off}; + auto &iovs{ctx->m_iov}; + std::vector new_iov{}; + auto it{m_pending_io.find(io_key)}; + + if (it == m_pending_io.end()) { + IO_value len_to_read{}; + + for (const auto &io : iovs) { + len_to_read.insert(io.iov_len); + } + + m_pending_io.insert(it, {io_key, len_to_read}); + new_iov = std::move(iovs); + } else { + IO_value new_io_len{}; + auto &pending_io_len{it->second}; + + new_iov.resize(iovs.size()); + + for (const auto &iov : iovs) { + for (auto len : pending_io_len) { + if (iov.iov_len != len) { + new_iov.push_back(iov); + new_io_len.insert(iov.iov_len); + } + } + } + + pending_io_len.insert(new_io_len.begin(), new_io_len.end()); + } + + auto io_uring{m_io_uring.get()}; + auto sqe{io_uring_get_sqe(io_uring)}; + + if (sqe == nullptr) { + co_return IOStatus::IOError(SubCode::kIOUringSqeFull); + } else { + if (!new_iov.empty()) { + io_uring_prep_readv(sqe, fd, new_iov.data(), new_iov.size(), off); + io_uring_sqe_set_data(sqe, ctx); + } else { + // FIXME: Get rid of the nop. We should not be doing any + // io_uring operation at all for duplicate requests. + io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, reinterpret_cast(((uintptr_t) ctx) | 1)); + } + + const auto ret = io_uring_submit(io_uring); + + if (ret < 0) { + // FIXME: Error handling. + auto msg{strerror(-ret)}; + std::cout << "error: " << msg << std::endl; + co_return IOStatus::IOError(SubCode::kIOUringSubmitError, msg); + } else { + m_n_pending_sqe.fetch_add(1, std::memory_order_seq_cst); + co_await Async_future(true, ctx); + co_return IOStatus::OK(); + } + } + }); +} + +void Async_reader::setup_io_uring_sq_handler(ReadOptions *ropts) const { + ropts->submit_queue = m_submit_queue; + + // FIXME: Hack it for now. + ropts->verify_checksums = true; + ropts->read_tier = ReadTier::kPersistedTier; +} + +void Async_reader::schedule_task(Promise* promise) noexcept { + auto h{std::coroutine_handle::from_promise(*promise)}; + h.resume(); +} + +bool Async_reader::io_uring_peek_cq() const { + io_uring_cqe* cqe{}; + auto io_uring{m_io_uring.get()}; + + return io_uring_peek_cqe(io_uring, &cqe) == 0; +} + +uint32_t Async_reader::io_uring_reap_cq() const { + io_uring_cqe* cqe{}; + uint32_t n_processed{}; + auto io_uring{m_io_uring.get()}; + + while (io_uring_peek_cqe(io_uring, &cqe) == 0) { + // FIXME: Error handling, short reads etc. + if (cqe->res >= 0) { + Async_future::IO_ctx *ctx{}; + auto c = (uintptr_t) io_uring_cqe_get_data(cqe); + + // To catch reuse of the CQE. + cqe->user_data = 0xdeadbeef; + + if (c & 1) { + ctx = reinterpret_cast(c & ~1); + + const auto &iov{ctx->m_iov}; + IO_key io_key{ctx->m_fd, ctx->m_off}; + auto it{m_pending_io.find(io_key)}; + + if (it != m_pending_io.end()) { + auto &pending_lens{it->second}; + + for (const auto &io : iov) { + // FIXME: This is very inefficient, we need to do this in-situ + std::vector to_erase{}; + + for (const auto &pending_len : pending_lens) { + // FIXME: Check is too simple + if (pending_len == io.iov_len) { + to_erase.push_back(pending_len); + } + } + for (auto len : to_erase) { + pending_lens.erase(len); + } + } + if (pending_lens.empty()) { + m_pending_io.erase(it); + } + } + } else { + ctx = reinterpret_cast(c); + } + + io_uring_cqe_seen(io_uring, cqe); + + auto promise = ctx->m_promise; + + delete ctx; + + if (promise != nullptr) { + schedule_task(promise); + } + + ++n_processed; + } else { + assert(false); + } + + auto r = m_n_pending_sqe.fetch_sub(1, std::memory_order_seq_cst); + assert(r >= 1); + } + + return n_processed; +} + +Async_result Async_reader::get(const ReadOptions *ropts, rust::String k) const { + + std::string key{k}; + + Async_result async_result{}; + + async_result.m_pinnable = new(std::nothrow) PinnableSlice(); + assert(async_result.m_pinnable != nullptr); + + async_result.m_async_reader = this; + async_result.m_async_future = new Async_future(); + + *async_result.m_async_future = m_db->AsyncGet( + *ropts, m_db->DefaultColumnFamily(), key, async_result.m_pinnable, nullptr); + + return async_result; + +} + +RustStatus Async_reader::get_result(Async_result async_result, rust::String &v) { + v = async_result.m_pinnable->ToString(); + + const auto status = async_result.m_async_future->status(); + + delete async_result.m_pinnable; + delete async_result.m_async_future; + + return RustStatus{ + (StatusCode) status.code(), + (StatusSubCode) status.subcode(), + (StatusSeverity) status.severity() + }; +} + +std::shared_ptr new_async_reader(CRocksDB* rust_db, uint32_t io_uring_size) { + auto crocksdb = reinterpret_cast(rust_db); + auto db = reinterpret_cast(crocksdb_get_instance(crocksdb)); + return std::make_shared(db, io_uring_size); +} + +RustStatus get_async_result(Async_result async_result, rust::String &v) { + return Async_reader::get_result(async_result, v); +} + diff --git a/cxxrocksdb/src/lib.rs b/cxxrocksdb/src/lib.rs new file mode 100644 index 000000000..f51906c50 --- /dev/null +++ b/cxxrocksdb/src/lib.rs @@ -0,0 +1,309 @@ +use cxx::SharedPtr; +use futures::executor::block_on; +use rocksdb::crocksdb_ffi::DBReadOptions; +use rocksdb::rocksdb::*; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[cxx::bridge] +mod ffi { + /// Enums must match the enums in include/rocksdb/status.h and + /// include/rocksdb/io_status.h + #[repr(u8)] + #[derive(Copy, Clone, Debug)] + enum StatusCode { + kOk = 0, + kNotFound = 1, + kCorruption = 2, + kNotSupported = 3, + kInvalidArgument = 4, + kIOError = 5, + kMergeInProgress = 6, + kIncomplete = 7, + kShutdownInProgress = 8, + kTimedOut = 9, + kAborted = 10, + kBusy = 11, + kExpired = 12, + kTryAgain = 13, + kCompactionTooLarge = 14, + kColumnFamilyDropped = 15, + kMaxCode, + } + + #[repr(u8)] + #[derive(Copy, Clone, Debug)] + enum StatusSubCode { + kNone = 0, + kMutexTimeout = 1, + kLockTimeout = 2, + kLockLimit = 3, + kNoSpace = 4, + kDeadlock = 5, + kStaleFile = 6, + kMemoryLimit = 7, + kSpaceLimit = 8, + kPathNotFound = 9, + KMergeOperandsInsufficientCapacity = 10, + kManualCompactionPaused = 11, + kOverwritten = 12, + kTxnNotPrepared = 13, + kIOFenced = 14, + kIOUringSqeFull = 15, + kIOUringSubmitError = 16, + kMaxSubCode, + } + + #[repr(u8)] + #[derive(Copy, Clone, Debug)] + enum StatusSeverity { + kNoError = 0, + kSoftError = 1, + kHardError = 2, + kFatalError = 3, + kUnrecoverableError = 4, + kMaxSeverity, + } + + #[derive(Copy, Clone, Debug)] + struct RustStatus { + code: StatusCode, + sub_code: StatusSubCode, + severity: StatusSeverity, + } + + #[repr(u8)] + #[derive(Copy, Clone, Debug)] + enum IOErrorScope { + kIOErrorScopeFileSystem, + kIOErrorScopeFile, + kIOErrorScopeRange, + kIOErrorScopeMax, + } + + #[derive(Copy, Clone, Debug)] + struct RustIOStatus { + status: RustStatus, + retryable: bool, + data_loss: bool, + scope: IOErrorScope, + } + + #[derive(Copy, Clone, Debug)] + struct Async_result { + /// These are opaque values that are dereferened by the + /// C++ code. + m_pinnable: *mut PinnableSlice, + m_async_future: *mut Async_future, + m_async_reader: *const Async_reader, + } + + unsafe extern "C++" { + include!("rocksdb/db.h"); + include!("crocksdb/c.h"); + include!("cxxrocksdb.h"); + + type CRocksDB; + + #[namespace = "rocksdb"] + type ReadOptions; + + #[namespace = "rocksdb"] + type PinnableSlice; + + #[namespace = "rocksdb"] + type Async_future; + + type Async_reader; + + unsafe fn new_async_reader( + db: *mut CRocksDB, + io_uring_size: u32, + ) -> SharedPtr; + + unsafe fn setup_io_uring_sq_handler(self: &Async_reader, ropts: *mut ReadOptions); + + unsafe fn get( + self: &Async_reader, + ropts: *const ReadOptions, + arg: String,) -> Async_result; + + unsafe fn get_async_result(ar: Async_result, v: &mut String) -> RustStatus; + + fn io_uring_peek_cq(self: &Async_reader) -> bool; + fn io_uring_reap_cq(self: &Async_reader) -> u32; + fn pending_io_uring_sqe_count(self: &Async_reader) -> u32; + } +} + +impl ffi::RustStatus { + pub fn new() -> Self { + Self { + code: ffi::StatusCode::kOk, + sub_code: ffi::StatusSubCode::kNone, + severity: ffi::StatusSeverity::kNoError, + } + } + + pub fn new_error(code: ffi::StatusCode) -> Self { + Self { + code: code, + sub_code: ffi::StatusSubCode::kNone, + severity: ffi::StatusSeverity::kNoError, + } + } + + pub fn ok(&self) -> bool { + self.code == ffi::StatusCode::kOk + } + + pub fn err(&self) -> (ffi::StatusCode, ffi::StatusSubCode, ffi::StatusSeverity) { + (self.code, self.sub_code, self.severity) + } +} + +impl ffi::RustIOStatus { + pub fn new() -> Self { + let status = ffi::RustStatus::new(); + Self { + status, + retryable: false, + data_loss: false, + scope: ffi::IOErrorScope::kIOErrorScopeFile, + } + } + + pub fn new_error() -> Self { + Self { + status: ffi::RustStatus::new_error(ffi::StatusCode::kIOError), + retryable: false, + data_loss: false, + scope: ffi::IOErrorScope::kIOErrorScopeFile, + } + } + + pub fn ok(&self) -> bool { + self.status.ok() + } + + pub fn err(&self) -> Self { + Self { + status: self.status, + retryable: self.retryable, + data_loss: self.data_loss, + scope: self.scope, + } + } +} + +impl std::fmt::Debug for ffi::Async_reader { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:p}", self) + } +} + +#[derive(Debug)] +struct RocksDB { + async_reader: SharedPtr, +} + +impl RocksDB { + pub fn new(db: &mut rocksdb::DB) -> Self { + let db: *mut ffi::CRocksDB = db.get_inner() as *mut _ as *mut ffi::CRocksDB; + let async_reader = unsafe { ffi::new_async_reader(db, 2) }; + + Self { + async_reader: async_reader, + } + } + + fn setup_io_uring_sq_handler(&self, ropts: *mut DBReadOptions) { + let ropts: *mut ffi::ReadOptions = ropts as *mut _ as *mut ffi::ReadOptions; + unsafe { + self.async_reader.setup_io_uring_sq_handler(ropts); + } + } + + fn get( + &self, + ropts: *const DBReadOptions, + k: String, + ) -> impl Future { + let ropts: *const ffi::ReadOptions = ropts as *const _ as *const ffi::ReadOptions; + + unsafe { self.async_reader.get(ropts, k) } + } + + fn get_result(ar: ffi::Async_result) -> (String, ffi::RustStatus) { + let mut v: String = Default::default(); + let s = unsafe { ffi::get_async_result(ar, &mut v) }; + + (v, s) + } +} + +impl Future for ffi::Async_result { + type Output = (String, ffi::RustStatus); + + fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let async_reader = unsafe { (*self).m_async_reader.as_ref().unwrap() }; + + if async_reader.pending_io_uring_sqe_count() == 0 { + return Poll::Ready( unsafe { RocksDB::get_result(*self) }); + } else if async_reader.io_uring_peek_cq() { + let n_reaped = async_reader.io_uring_reap_cq(); + assert!(n_reaped > 0); + println!("n_reaped: {}", n_reaped); + return Poll::Ready( unsafe { RocksDB::get_result(*self) }); + } else { + println!("pending"); + return Poll::Pending; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn populate() { + let mut db = rocksdb::rocksdb::DB::open_default("/tmp/rocksdb/storage").unwrap(); + for i in 1..1001 { + if i == 3 { + continue; + } + let k = format!("k{}", i); + let v = format!("v{}", i); + + db.put(k.as_bytes(), v.as_bytes()); + } + } + + #[test] + fn async_get_key_test() { + populate(); + + let mut db = rocksdb::rocksdb::DB::open_default("/tmp/rocksdb/storage").unwrap(); + let ropts = unsafe { rocksdb::crocksdb_ffi::crocksdb_readoptions_create() }; + + let db = RocksDB::new(&mut db); + + db.setup_io_uring_sq_handler(ropts); + + let mut futures = vec![]; + + for i in 1..1001 { + let k = format!("k{}", i); + futures.push(db.get(ropts, k)); + } + + let f = async { futures::future::join_all(futures).await }; + let results = block_on(f); + + for r in results { + // println!("v: {:?}", r); + } + } +} diff --git a/cxxrocksdb/src/mod.rs b/cxxrocksdb/src/mod.rs new file mode 100644 index 000000000..1950f7242 --- /dev/null +++ b/cxxrocksdb/src/mod.rs @@ -0,0 +1,2 @@ +extern crate rocksdb; +extern crate Async_reader; diff --git a/librocksdb_sys/Cargo.toml b/librocksdb_sys/Cargo.toml index f7596e03c..de19e24d1 100644 --- a/librocksdb_sys/Cargo.toml +++ b/librocksdb_sys/Cargo.toml @@ -30,6 +30,8 @@ static_libcpp = [] [build-dependencies] cc = "1.0.3" cmake = "0.1" +cxx-build = "1" +#pkg-config = "0.3" bindgen = { version = "0.57", default-features = false, features = ["runtime"] } [dependencies.tikv-jemalloc-sys] diff --git a/librocksdb_sys/build.rs b/librocksdb_sys/build.rs index 22e9e4030..9f0ada77c 100644 --- a/librocksdb_sys/build.rs +++ b/librocksdb_sys/build.rs @@ -15,7 +15,7 @@ extern crate bindgen; extern crate cc; extern crate cmake; -use cc::Build; +use cc::{Build, Tool}; use cmake::Config; use std::path::{Path, PathBuf}; use std::{env, str}; @@ -76,27 +76,31 @@ fn main() { build.cpp(true).file("crocksdb/c.cc"); if !cfg!(target_os = "windows") { - build.flag("-std=c++11"); build.flag("-fno-rtti"); } - link_cpp(&mut build); - build.warnings(false).compile("libcrocksdb.a"); -} -fn link_cpp(build: &mut Build) { let tool = build.get_compiler(); - let stdlib = if tool.is_like_gnu() { - "libstdc++.a" + + if tool.is_like_gnu() { + link_lib("libstdc++.a", &tool); + build.cpp_link_stdlib(None); } else if tool.is_like_clang() { - "libc++.a" + link_lib("libc++.a", &tool); + build.cpp_link_stdlib(None); } else { // Don't link to c++ statically on windows. - return; }; + + link_lib("liburing.a", &tool); + + build.warnings(false).compile("libcrocksdb.a"); +} + +fn link_lib(lib: &str, tool: &Tool) { let output = tool .to_command() .arg("--print-file-name") - .arg(stdlib) + .arg(lib) .output() .unwrap(); if !output.status.success() || output.stdout.is_empty() { @@ -111,7 +115,7 @@ fn link_cpp(build: &mut Build) { return; } // remove lib prefix and .a postfix. - let libname = &stdlib[3..stdlib.len() - 2]; + let libname = &lib[3..lib.len() - 2]; // optional static linking if cfg!(feature = "static_libcpp") { println!("cargo:rustc-link-lib=static={}", &libname); @@ -122,7 +126,6 @@ fn link_cpp(build: &mut Build) { "cargo:rustc-link-search=native={}", path.parent().unwrap().display() ); - build.cpp_link_stdlib(None); } fn build_rocksdb() -> Build { diff --git a/librocksdb_sys/crocksdb/c.cc b/librocksdb_sys/crocksdb/c.cc index 095d0d1c9..af4726b18 100644 --- a/librocksdb_sys/crocksdb/c.cc +++ b/librocksdb_sys/crocksdb/c.cc @@ -409,6 +409,10 @@ struct crocksdb_map_property_t { std::map rep; }; +void *crocksdb_get_instance(crocksdb_t *crocksdb) { + return crocksdb->rep; +} + struct crocksdb_compactionfilter_t : public CompactionFilter { void* state_; void (*destructor_)(void*); diff --git a/librocksdb_sys/crocksdb/crocksdb/c.h b/librocksdb_sys/crocksdb/crocksdb/c.h index 6cfa97e3c..1a7d4b4d0 100644 --- a/librocksdb_sys/crocksdb/crocksdb/c.h +++ b/librocksdb_sys/crocksdb/crocksdb/c.h @@ -230,6 +230,8 @@ typedef struct crocksdb_file_system_inspector_t /* DB operations */ +extern C_ROCKSDB_LIBRARY_API void* crocksdb_get_instance(crocksdb_t *ptr); + extern C_ROCKSDB_LIBRARY_API crocksdb_t* crocksdb_open( const crocksdb_options_t* options, const char* name, char** errptr); diff --git a/librocksdb_sys/libtitan_sys/titan b/librocksdb_sys/libtitan_sys/titan index 87d3bd577..853b2e454 160000 --- a/librocksdb_sys/libtitan_sys/titan +++ b/librocksdb_sys/libtitan_sys/titan @@ -1 +1 @@ -Subproject commit 87d3bd5776ac53db936e3559fa26b083be5dc2a8 +Subproject commit 853b2e454ca51fded7e44f6f1adf092242cbac96 diff --git a/librocksdb_sys/rocksdb b/librocksdb_sys/rocksdb index 8899a3606..6cf09165e 160000 --- a/librocksdb_sys/rocksdb +++ b/librocksdb_sys/rocksdb @@ -1 +1 @@ -Subproject commit 8899a360623988adeec1d865bef9d24568ddd32c +Subproject commit 6cf09165e74af84437fe6a14368f791239c5365f diff --git a/src/rocksdb.rs b/src/rocksdb.rs index 1c299932e..b53297ed7 100644 --- a/src/rocksdb.rs +++ b/src/rocksdb.rs @@ -162,6 +162,12 @@ impl Debug for DB { } } +impl DB { + pub fn get_inner(&mut self) -> *mut DBInstance { + self.inner + } +} + unsafe impl Send for DB {} unsafe impl Sync for DB {}