Skip to content

Commit d10a04f

Browse files
authored
iterator: fix error handling about valid (#411)
Signed-off-by: qupeng <[email protected]>
1 parent 6af348b commit d10a04f

12 files changed

+163
-156
lines changed

src/lib.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ pub use metadata::{ColumnFamilyMetaData, LevelMetaData, SstFileMetaData};
3535
pub use perf_context::{get_perf_level, set_perf_level, IOStatsContext, PerfContext, PerfLevel};
3636
pub use rocksdb::{
3737
load_latest_options, run_ldb_tool, set_external_sst_file_global_seq_no, BackupEngine, CFHandle,
38-
Cache, DBIterator, DBVector, Env, ExternalSstFileInfo, Kv, MapProperty, MemoryAllocator, Range,
38+
Cache, DBIterator, DBVector, Env, ExternalSstFileInfo, MapProperty, MemoryAllocator, Range,
3939
SeekKey, SequentialFile, SstFileReader, SstFileWriter, Writable, WriteBatch, DB,
4040
};
4141
pub use rocksdb_options::{
@@ -54,6 +54,9 @@ pub use table_properties_collector::TablePropertiesCollector;
5454
pub use table_properties_collector_factory::TablePropertiesCollectorFactory;
5555
pub use titan::{TitanBlobIndex, TitanDBOptions};
5656

57+
#[allow(deprecated)]
58+
pub use rocksdb::Kv;
59+
5760
mod compaction_filter;
5861
pub mod comparator;
5962
mod event_listener;

src/perf_context.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -422,10 +422,8 @@ mod test {
422422
let mut ctx = PerfContext::get();
423423

424424
let mut iter = db.iter();
425-
assert!(iter.seek(SeekKey::Start));
426-
while iter.valid() {
427-
iter.next();
428-
}
425+
assert!(iter.seek(SeekKey::Start).unwrap());
426+
while iter.next().unwrap() {}
429427
assert_eq!(ctx.internal_key_skipped_count(), n);
430428
assert_eq!(ctx.internal_delete_skipped_count(), n / 2);
431429
assert_eq!(ctx.seek_internal_seek_time(), 0);
@@ -439,9 +437,9 @@ mod test {
439437
assert_eq!(get_perf_level(), PerfLevel::EnableTime);
440438

441439
let mut iter = db.iter();
442-
assert!(iter.seek(SeekKey::End));
443-
while iter.valid() {
444-
iter.prev();
440+
assert!(iter.seek(SeekKey::End).unwrap());
441+
while iter.valid().unwrap() {
442+
iter.prev().unwrap();
445443
}
446444
assert_eq!(ctx.internal_key_skipped_count(), n + n / 2);
447445
assert_eq!(ctx.internal_delete_skipped_count(), n / 2);

src/rocksdb.rs

+30-16
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ impl<D: Deref<Target = DB>> DBIterator<D> {
236236
}
237237

238238
impl<D> DBIterator<D> {
239-
pub fn seek(&mut self, key: SeekKey) -> bool {
239+
pub fn seek(&mut self, key: SeekKey) -> Result<bool, String> {
240240
unsafe {
241241
match key {
242242
SeekKey::Start => crocksdb_ffi::crocksdb_iter_seek_to_first(self.inner),
@@ -249,7 +249,7 @@ impl<D> DBIterator<D> {
249249
self.valid()
250250
}
251251

252-
pub fn seek_for_prev(&mut self, key: SeekKey) -> bool {
252+
pub fn seek_for_prev(&mut self, key: SeekKey) -> Result<bool, String> {
253253
unsafe {
254254
match key {
255255
SeekKey::Start => crocksdb_ffi::crocksdb_iter_seek_to_first(self.inner),
@@ -264,23 +264,24 @@ impl<D> DBIterator<D> {
264264
self.valid()
265265
}
266266

267-
pub fn prev(&mut self) -> bool {
267+
pub fn prev(&mut self) -> Result<bool, String> {
268268
unsafe {
269269
crocksdb_ffi::crocksdb_iter_prev(self.inner);
270270
}
271271
self.valid()
272272
}
273273

274274
#[allow(clippy::should_implement_trait)]
275-
pub fn next(&mut self) -> bool {
275+
pub fn next(&mut self) -> Result<bool, String> {
276276
unsafe {
277277
crocksdb_ffi::crocksdb_iter_next(self.inner);
278278
}
279279
self.valid()
280280
}
281281

282+
/// Get the key pointed by the iterator. Must be called when `self.valid() == Ok(true)`.
282283
pub fn key(&self) -> &[u8] {
283-
assert!(self.valid());
284+
debug_assert_eq!(self.valid(), Ok(true));
284285
let mut key_len: size_t = 0;
285286
let key_len_ptr: *mut size_t = &mut key_len;
286287
unsafe {
@@ -289,8 +290,9 @@ impl<D> DBIterator<D> {
289290
}
290291
}
291292

293+
/// Get the value pointed by the iterator. Must be called when `self.valid() == Ok(true)`.
292294
pub fn value(&self) -> &[u8] {
293-
assert!(self.valid());
295+
debug_assert_eq!(self.valid(), Ok(true));
294296
let mut val_len: size_t = 0;
295297
let val_len_ptr: *mut size_t = &mut val_len;
296298
unsafe {
@@ -299,37 +301,49 @@ impl<D> DBIterator<D> {
299301
}
300302
}
301303

304+
#[deprecated]
302305
pub fn kv(&self) -> Option<(Vec<u8>, Vec<u8>)> {
303-
if self.valid() {
306+
if self.valid().unwrap() {
304307
Some((self.key().to_vec(), self.value().to_vec()))
305308
} else {
306309
None
307310
}
308311
}
309312

310-
pub fn valid(&self) -> bool {
311-
unsafe { crocksdb_ffi::crocksdb_iter_valid(self.inner) }
313+
pub fn valid(&self) -> Result<bool, String> {
314+
let valid = unsafe { crocksdb_ffi::crocksdb_iter_valid(self.inner) };
315+
if !valid {
316+
self.status()?;
317+
}
318+
Ok(valid)
312319
}
313320

314-
pub fn status(&self) -> Result<(), String> {
321+
fn status(&self) -> Result<(), String> {
315322
unsafe {
316323
ffi_try!(crocksdb_iter_get_error(self.inner));
317324
}
318325
Ok(())
319326
}
320327
}
321328

329+
#[deprecated]
322330
pub type Kv = (Vec<u8>, Vec<u8>);
323331

332+
#[deprecated]
324333
impl<'b, D> Iterator for &'b mut DBIterator<D> {
334+
#[allow(deprecated)]
325335
type Item = Kv;
326336

327-
fn next(&mut self) -> Option<Kv> {
328-
let kv = self.kv();
329-
if kv.is_some() {
330-
DBIterator::next(self);
337+
fn next(&mut self) -> Option<Self::Item> {
338+
match self.valid() {
339+
Ok(true) => {}
340+
Ok(false) => return None,
341+
Err(e) => panic!("invalid iterator: {}", e),
331342
}
332-
kv
343+
let k = self.key().to_vec();
344+
let v = self.value().to_vec();
345+
let _ = DBIterator::next(self);
346+
Some((k, v))
333347
}
334348
}
335349

@@ -2778,7 +2792,7 @@ mod test {
27782792
db.put(b"k2", b"v2222").expect("");
27792793
db.put(b"k3", b"v3333").expect("");
27802794
let mut iter = db.iter();
2781-
iter.seek(SeekKey::Start);
2795+
iter.seek(SeekKey::Start).unwrap();
27822796
for (k, v) in &mut iter {
27832797
println!(
27842798
"Hello {}: {}",

tests/cases/test_delete_files_in_range.rs

+15-16
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ fn test_delete_files_in_range_with_iter() {
6666
db.delete_files_in_range(b"key2", b"key7", false).unwrap();
6767

6868
let mut count = 0;
69-
assert!(iter.seek(SeekKey::Start));
70-
while iter.valid() {
71-
iter.next();
69+
assert!(iter.seek(SeekKey::Start).unwrap());
70+
while iter.valid().unwrap() {
71+
iter.next().unwrap();
7272
count = count + 1;
7373
}
7474

@@ -89,11 +89,11 @@ fn test_delete_files_in_range_with_snap() {
8989
db.delete_files_in_range(b"key2", b"key7", false).unwrap();
9090

9191
let mut iter = snap.iter();
92-
assert!(iter.seek(SeekKey::Start));
92+
assert!(iter.seek(SeekKey::Start).unwrap());
9393

9494
let mut count = 0;
95-
while iter.valid() {
96-
iter.next();
95+
while iter.valid().unwrap() {
96+
iter.next().unwrap();
9797
count = count + 1;
9898
}
9999

@@ -159,12 +159,12 @@ fn test_delete_files_in_range_with_delete_range() {
159159
db.compact_range(None, None);
160160

161161
let mut it = db.iter();
162-
it.seek(SeekKey::Start);
163-
assert!(it.valid());
162+
it.seek(SeekKey::Start).unwrap();
163+
assert!(it.valid().unwrap());
164164
assert_eq!(it.key(), b"4");
165-
assert!(it.next());
165+
assert!(it.next().unwrap());
166166
assert_eq!(it.key(), b"5");
167-
assert!(!it.next());
167+
assert!(!it.next().unwrap());
168168
}
169169

170170
#[test]
@@ -186,19 +186,18 @@ fn test_delete_files_in_ranges() {
186186

187187
// Check that ["key0", "key5"] have been deleted, but ["key6", "key8"] still exist.
188188
let mut iter = db.iter();
189-
iter.seek(SeekKey::Start);
189+
iter.seek(SeekKey::Start).unwrap();
190190
for i in 6..9 {
191-
assert!(iter.valid());
191+
assert!(iter.valid().unwrap());
192192
let k = format!("key{}", i);
193193
assert_eq!(iter.key(), k.as_bytes());
194-
iter.next();
194+
iter.next().unwrap();
195195
}
196-
assert!(!iter.valid());
196+
assert!(!iter.valid().unwrap());
197197

198198
// Delete the last file.
199199
let ranges = vec![Range::new(b"key6", b"key8")];
200200
db.delete_files_in_ranges_cf(cf, &ranges, true).unwrap();
201201
let mut iter = db.iter();
202-
iter.seek(SeekKey::Start);
203-
assert!(!iter.valid());
202+
assert!(!iter.seek(SeekKey::Start).unwrap());
204203
}

tests/cases/test_delete_range.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -44,37 +44,37 @@ fn gen_sst_from_db(opt: ColumnFamilyOptions, cf: Option<&CFHandle>, path: &str,
4444
};
4545
writer.open(path).unwrap();
4646
let mut iter = db.iter();
47-
iter.seek(SeekKey::Start);
48-
while iter.valid() {
47+
iter.seek(SeekKey::Start).unwrap();
48+
while iter.valid().unwrap() {
4949
writer.put(iter.key(), iter.value()).unwrap();
50-
iter.next();
50+
iter.next().unwrap();
5151
}
5252
writer.finish().unwrap();
5353
}
5454

5555
fn gen_crc32_from_db(db: &DB) -> u32 {
5656
let mut digest = Digest::new(crc32::IEEE);
5757
let mut iter = db.iter();
58-
iter.seek(SeekKey::Start);
59-
while iter.valid() {
58+
iter.seek(SeekKey::Start).unwrap();
59+
while iter.valid().unwrap() {
6060
digest.write(iter.key());
6161
digest.write(iter.value());
62-
iter.next();
62+
iter.next().unwrap();
6363
}
6464
digest.sum32()
6565
}
6666

6767
fn gen_crc32_from_db_in_range(db: &DB, start_key: &[u8], end_key: &[u8]) -> u32 {
6868
let mut digest = Digest::new(crc32::IEEE);
6969
let mut iter = db.iter();
70-
iter.seek(SeekKey::Key(start_key));
71-
while iter.valid() {
70+
iter.seek(SeekKey::Key(start_key)).unwrap();
71+
while iter.valid().unwrap() {
7272
if iter.key() >= end_key {
7373
break;
7474
}
7575
digest.write(iter.key());
7676
digest.write(iter.value());
77-
iter.next();
77+
iter.next().unwrap();
7878
}
7979
digest.sum32()
8080
}

tests/cases/test_ingest_external_file.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -282,16 +282,16 @@ fn gen_sst_from_cf(opt: ColumnFamilyOptions, db: &DB, cf: &CFHandle, path: &str)
282282
let mut writer = SstFileWriter::new_cf(env_opt, opt, cf);
283283
writer.open(path).unwrap();
284284
let mut iter = db.iter_cf(cf);
285-
iter.seek(SeekKey::Start);
286-
while iter.valid() {
285+
iter.seek(SeekKey::Start).unwrap();
286+
while iter.valid().unwrap() {
287287
writer.put(iter.key(), iter.value()).unwrap();
288-
iter.next();
288+
iter.next().unwrap();
289289
}
290290
let info = writer.finish().unwrap();
291291
assert_eq!(info.file_path().to_str().unwrap(), path);
292-
iter.seek(SeekKey::Start);
292+
iter.seek(SeekKey::Start).unwrap();
293293
assert_eq!(info.smallest_key(), iter.key());
294-
iter.seek(SeekKey::End);
294+
iter.seek(SeekKey::End).unwrap();
295295
assert_eq!(info.largest_key(), iter.key());
296296
assert_eq!(info.sequence_number(), 0);
297297
assert!(info.file_size() > 0);
@@ -522,7 +522,7 @@ fn test_read_sst() {
522522
assert_eq!(props.num_entries(), 3);
523523
});
524524
let mut it = reader.iter();
525-
it.seek(SeekKey::Start);
525+
it.seek(SeekKey::Start).unwrap();
526526
assert_eq!(
527527
it.collect::<Vec<_>>(),
528528
vec![

0 commit comments

Comments
 (0)