Skip to content

Commit b9789d8

Browse files
committed
optimize Get operations by removing CopyString and using PinnableSlice for zero-copy reads
1 parent ac4d563 commit b9789d8

File tree

3 files changed

+286
-25
lines changed

3 files changed

+286
-25
lines changed

db/c.cc

Lines changed: 137 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -624,10 +624,11 @@ static bool SaveError(char** errptr, const Status& s) {
624624
return true;
625625
}
626626

627-
// Copies str to a new malloc()-ed buffer. The buffer is not NUL terminated.
628-
static char* CopyString(const std::string& str) {
629-
char* result = reinterpret_cast<char*>(malloc(sizeof(char) * str.size()));
630-
memcpy(result, str.data(), sizeof(char) * str.size());
627+
// Helper function to copy string data to a malloc'd buffer
628+
// Works with std::string, Slice, and PinnableSlice through implicit conversion
629+
static inline char* CopyString(const Slice& slice) {
630+
char* result = reinterpret_cast<char*>(malloc(slice.size()));
631+
memcpy(result, slice.data(), slice.size());
631632
return result;
632633
}
633634

@@ -1393,11 +1394,14 @@ char* rocksdb_get(rocksdb_t* db, const rocksdb_readoptions_t* options,
13931394
const char* key, size_t keylen, size_t* vallen,
13941395
char** errptr) {
13951396
char* result = nullptr;
1396-
std::string tmp;
1397-
Status s = db->rep->Get(options->rep, Slice(key, keylen), &tmp);
1397+
// Use PinnableSlice to avoid unnecessary copy
1398+
PinnableSlice pinnable_val;
1399+
Status s = db->rep->Get(options->rep, db->rep->DefaultColumnFamily(),
1400+
Slice(key, keylen), &pinnable_val);
13981401
if (s.ok()) {
1399-
*vallen = tmp.size();
1400-
result = CopyString(tmp);
1402+
*vallen = pinnable_val.size();
1403+
// Only one copy: from PinnableSlice to malloc'd buffer
1404+
result = CopyString(pinnable_val);
14011405
} else {
14021406
*vallen = 0;
14031407
if (!s.IsNotFound()) {
@@ -1412,12 +1416,14 @@ char* rocksdb_get_cf(rocksdb_t* db, const rocksdb_readoptions_t* options,
14121416
const char* key, size_t keylen, size_t* vallen,
14131417
char** errptr) {
14141418
char* result = nullptr;
1415-
std::string tmp;
1416-
Status s =
1417-
db->rep->Get(options->rep, column_family->rep, Slice(key, keylen), &tmp);
1419+
// Use PinnableSlice to avoid unnecessary copy
1420+
PinnableSlice pinnable_val;
1421+
Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
1422+
&pinnable_val);
14181423
if (s.ok()) {
1419-
*vallen = tmp.size();
1420-
result = CopyString(tmp);
1424+
*vallen = pinnable_val.size();
1425+
// Only one copy: from PinnableSlice to malloc'd buffer
1426+
result = CopyString(pinnable_val);
14211427
} else {
14221428
*vallen = 0;
14231429
if (!s.IsNotFound()) {
@@ -1498,8 +1504,8 @@ void rocksdb_multi_get(rocksdb_t* db, const rocksdb_readoptions_t* options,
14981504
std::vector<Status> statuses = db->rep->MultiGet(options->rep, keys, &values);
14991505
for (size_t i = 0; i < num_keys; i++) {
15001506
if (statuses[i].ok()) {
1501-
values_list[i] = CopyString(values[i]);
15021507
values_list_sizes[i] = values[i].size();
1508+
values_list[i] = CopyString(values[i]);
15031509
errs[i] = nullptr;
15041510
} else {
15051511
values_list[i] = nullptr;
@@ -1530,10 +1536,10 @@ void rocksdb_multi_get_with_ts(rocksdb_t* db,
15301536
db->rep->MultiGet(options->rep, keys, &values, &timestamps);
15311537
for (size_t i = 0; i < num_keys; i++) {
15321538
if (statuses[i].ok()) {
1533-
values_list[i] = CopyString(values[i]);
15341539
values_list_sizes[i] = values[i].size();
1535-
timestamp_list[i] = CopyString(timestamps[i]);
1540+
values_list[i] = CopyString(values[i]);
15361541
timestamp_list_sizes[i] = timestamps[i].size();
1542+
timestamp_list[i] = CopyString(timestamps[i]);
15371543
errs[i] = nullptr;
15381544
} else {
15391545
values_list[i] = nullptr;
@@ -1566,8 +1572,8 @@ void rocksdb_multi_get_cf(
15661572
db->rep->MultiGet(options->rep, cfs, keys, &values);
15671573
for (size_t i = 0; i < num_keys; i++) {
15681574
if (statuses[i].ok()) {
1569-
values_list[i] = CopyString(values[i]);
15701575
values_list_sizes[i] = values[i].size();
1576+
values_list[i] = CopyString(values[i]);
15711577
errs[i] = nullptr;
15721578
} else {
15731579
values_list[i] = nullptr;
@@ -1600,10 +1606,10 @@ void rocksdb_multi_get_cf_with_ts(
16001606
db->rep->MultiGet(options->rep, cfs, keys, &values, &timestamps);
16011607
for (size_t i = 0; i < num_keys; i++) {
16021608
if (statuses[i].ok()) {
1603-
values_list[i] = CopyString(values[i]);
16041609
values_list_sizes[i] = values[i].size();
1605-
timestamps_list[i] = CopyString(timestamps[i]);
1610+
values_list[i] = CopyString(values[i]);
16061611
timestamps_list_sizes[i] = timestamps[i].size();
1612+
timestamps_list[i] = CopyString(timestamps[i]);
16071613
errs[i] = nullptr;
16081614
} else {
16091615
values_list[i] = nullptr;
@@ -6888,8 +6894,8 @@ void rocksdb_transaction_multi_get(rocksdb_transaction_t* txn,
68886894
txn->rep->MultiGet(options->rep, keys, &values);
68896895
for (size_t i = 0; i < num_keys; i++) {
68906896
if (statuses[i].ok()) {
6891-
values_list[i] = CopyString(values[i]);
68926897
values_list_sizes[i] = values[i].size();
6898+
values_list[i] = CopyString(values[i]);
68936899
errs[i] = nullptr;
68946900
} else {
68956901
values_list[i] = nullptr;
@@ -6917,8 +6923,8 @@ void rocksdb_transaction_multi_get_for_update(
69176923
txn->rep->MultiGetForUpdate(options->rep, keys, &values);
69186924
for (size_t i = 0; i < num_keys; i++) {
69196925
if (statuses[i].ok()) {
6920-
values_list[i] = CopyString(values[i]);
69216926
values_list_sizes[i] = values[i].size();
6927+
values_list[i] = CopyString(values[i]);
69226928
errs[i] = nullptr;
69236929
} else {
69246930
values_list[i] = nullptr;
@@ -6949,8 +6955,8 @@ void rocksdb_transaction_multi_get_cf(
69496955
txn->rep->MultiGet(options->rep, cfs, keys, &values);
69506956
for (size_t i = 0; i < num_keys; i++) {
69516957
if (statuses[i].ok()) {
6952-
values_list[i] = CopyString(values[i]);
69536958
values_list_sizes[i] = values[i].size();
6959+
values_list[i] = CopyString(values[i]);
69546960
errs[i] = nullptr;
69556961
} else {
69566962
values_list[i] = nullptr;
@@ -6981,8 +6987,8 @@ void rocksdb_transaction_multi_get_for_update_cf(
69816987
txn->rep->MultiGetForUpdate(options->rep, cfs, keys, &values);
69826988
for (size_t i = 0; i < num_keys; i++) {
69836989
if (statuses[i].ok()) {
6984-
values_list[i] = CopyString(values[i]);
69856990
values_list_sizes[i] = values[i].size();
6991+
values_list[i] = CopyString(values[i]);
69866992
errs[i] = nullptr;
69876993
} else {
69886994
values_list[i] = nullptr;
@@ -7085,8 +7091,8 @@ void rocksdb_transactiondb_multi_get(rocksdb_transactiondb_t* txn_db,
70857091
txn_db->rep->MultiGet(options->rep, keys, &values);
70867092
for (size_t i = 0; i < num_keys; i++) {
70877093
if (statuses[i].ok()) {
7088-
values_list[i] = CopyString(values[i]);
70897094
values_list_sizes[i] = values[i].size();
7095+
values_list[i] = CopyString(values[i]);
70907096
errs[i] = nullptr;
70917097
} else {
70927098
values_list[i] = nullptr;
@@ -7117,8 +7123,8 @@ void rocksdb_transactiondb_multi_get_cf(
71177123
txn_db->rep->MultiGet(options->rep, cfs, keys, &values);
71187124
for (size_t i = 0; i < num_keys; i++) {
71197125
if (statuses[i].ok()) {
7120-
values_list[i] = CopyString(values[i]);
71217126
values_list_sizes[i] = values[i].size();
7127+
values_list[i] = CopyString(values[i]);
71227128
errs[i] = nullptr;
71237129
} else {
71247130
values_list[i] = nullptr;
@@ -7707,4 +7713,110 @@ uint64_t rocksdb_wait_for_compact_options_get_timeout(
77077713
return opt->rep.timeout.count();
77087714
}
77097715

7716+
/* High-performance zero-copy Get implementations */
7717+
7718+
struct rocksdb_pinnable_handle_t {
7719+
PinnableSlice rep;
7720+
};
7721+
7722+
rocksdb_pinnable_handle_t* rocksdb_get_pinned_v2(
7723+
rocksdb_t* db, const rocksdb_readoptions_t* options, const char* key,
7724+
size_t keylen, char** errptr) {
7725+
rocksdb_pinnable_handle_t* handle = new rocksdb_pinnable_handle_t;
7726+
Status s = db->rep->Get(options->rep, db->rep->DefaultColumnFamily(),
7727+
Slice(key, keylen), &handle->rep);
7728+
if (!s.ok()) {
7729+
delete handle;
7730+
if (!s.IsNotFound()) {
7731+
SaveError(errptr, s);
7732+
}
7733+
return nullptr;
7734+
}
7735+
return handle;
7736+
}
7737+
7738+
rocksdb_pinnable_handle_t* rocksdb_get_pinned_cf_v2(
7739+
rocksdb_t* db, const rocksdb_readoptions_t* options,
7740+
rocksdb_column_family_handle_t* column_family, const char* key,
7741+
size_t keylen, char** errptr) {
7742+
rocksdb_pinnable_handle_t* handle = new rocksdb_pinnable_handle_t;
7743+
Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
7744+
&handle->rep);
7745+
if (!s.ok()) {
7746+
delete handle;
7747+
if (!s.IsNotFound()) {
7748+
SaveError(errptr, s);
7749+
}
7750+
return nullptr;
7751+
}
7752+
return handle;
7753+
}
7754+
7755+
const char* rocksdb_pinnable_handle_get_value(
7756+
const rocksdb_pinnable_handle_t* handle, size_t* vallen) {
7757+
if (!handle) {
7758+
*vallen = 0;
7759+
return nullptr;
7760+
}
7761+
*vallen = handle->rep.size();
7762+
return handle->rep.data();
7763+
}
7764+
7765+
void rocksdb_pinnable_handle_destroy(rocksdb_pinnable_handle_t* handle) {
7766+
delete handle;
7767+
}
7768+
7769+
unsigned char rocksdb_get_into_buffer(rocksdb_t* db,
7770+
const rocksdb_readoptions_t* options,
7771+
const char* key, size_t keylen,
7772+
char* buffer, size_t buffer_size,
7773+
size_t* vallen, unsigned char* found,
7774+
char** errptr) {
7775+
PinnableSlice pinnable_val;
7776+
Status s = db->rep->Get(options->rep, db->rep->DefaultColumnFamily(),
7777+
Slice(key, keylen), &pinnable_val);
7778+
if (s.ok()) {
7779+
*found = 1;
7780+
*vallen = pinnable_val.size();
7781+
if (buffer_size >= pinnable_val.size()) {
7782+
memcpy(buffer, pinnable_val.data(), pinnable_val.size());
7783+
return 1; // Success - data copied
7784+
}
7785+
return 0; // Buffer too small
7786+
} else {
7787+
*found = 0;
7788+
*vallen = 0;
7789+
if (!s.IsNotFound()) {
7790+
SaveError(errptr, s);
7791+
}
7792+
return 0;
7793+
}
7794+
}
7795+
7796+
unsigned char rocksdb_get_into_buffer_cf(
7797+
rocksdb_t* db, const rocksdb_readoptions_t* options,
7798+
rocksdb_column_family_handle_t* column_family, const char* key,
7799+
size_t keylen, char* buffer, size_t buffer_size, size_t* vallen,
7800+
unsigned char* found, char** errptr) {
7801+
PinnableSlice pinnable_val;
7802+
Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
7803+
&pinnable_val);
7804+
if (s.ok()) {
7805+
*found = 1;
7806+
*vallen = pinnable_val.size();
7807+
if (buffer_size >= pinnable_val.size()) {
7808+
memcpy(buffer, pinnable_val.data(), pinnable_val.size());
7809+
return 1; // Success - data copied
7810+
}
7811+
return 0; // Buffer too small
7812+
} else {
7813+
*found = 0;
7814+
*vallen = 0;
7815+
if (!s.IsNotFound()) {
7816+
SaveError(errptr, s);
7817+
}
7818+
return 0;
7819+
}
7820+
}
7821+
77107822
} // end extern "C"

db/c_test.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,6 +1433,53 @@ int main(int argc, char** argv) {
14331433
CheckMultiGetValues(3, vals, vals_sizes, errs, expected);
14341434
}
14351435

1436+
StartPhase("zero_copy_get_pinned_v2");
1437+
{
1438+
// Test new zero-copy get functions
1439+
1440+
// Test rocksdb_get_pinned_v2
1441+
rocksdb_pinnable_handle_t* handle =
1442+
rocksdb_get_pinned_v2(db, roptions, "foo", 3, &err);
1443+
CheckNoError(err);
1444+
CheckCondition(handle != NULL);
1445+
size_t val_len;
1446+
const char* val = rocksdb_pinnable_handle_get_value(handle, &val_len);
1447+
CheckEqual("hello", val, val_len);
1448+
rocksdb_pinnable_handle_destroy(handle);
1449+
1450+
// Test with non-existent key
1451+
handle = rocksdb_get_pinned_v2(db, roptions, "notfound", 8, &err);
1452+
CheckNoError(err);
1453+
CheckCondition(handle == NULL);
1454+
1455+
// Test rocksdb_get_into_buffer
1456+
char buffer[100];
1457+
unsigned char found;
1458+
unsigned char success = rocksdb_get_into_buffer(
1459+
db, roptions, "foo", 3, buffer, sizeof(buffer), &val_len, &found, &err);
1460+
CheckNoError(err);
1461+
CheckCondition(success == 1);
1462+
CheckCondition(found == 1);
1463+
CheckCondition(val_len == 5);
1464+
CheckCondition(memcmp(buffer, "hello", 5) == 0);
1465+
1466+
// Test with buffer too small
1467+
success = rocksdb_get_into_buffer(db, roptions, "foo", 3, buffer,
1468+
2, // Buffer too small
1469+
&val_len, &found, &err);
1470+
CheckNoError(err);
1471+
CheckCondition(success == 0); // Should fail due to small buffer
1472+
CheckCondition(found == 1);
1473+
CheckCondition(val_len == 5); // Should still report actual size
1474+
1475+
// Test with non-existent key
1476+
success = rocksdb_get_into_buffer(db, roptions, "notfound", 8, buffer,
1477+
sizeof(buffer), &val_len, &found, &err);
1478+
CheckNoError(err);
1479+
CheckCondition(success == 0);
1480+
CheckCondition(found == 0);
1481+
}
1482+
14361483
StartPhase("pin_get");
14371484
{
14381485
CheckPinGet(db, roptions, "box", "c");
@@ -1850,6 +1897,55 @@ int main(int argc, char** argv) {
18501897
rocksdb_flush_wal(db, 1, &err);
18511898
CheckNoError(err);
18521899

1900+
// Test column family handle get name
1901+
{
1902+
size_t name_len;
1903+
char* cf_name =
1904+
rocksdb_column_family_handle_get_name(handles[1], &name_len);
1905+
CheckCondition(name_len == 3);
1906+
CheckCondition(memcmp(cf_name, "cf1", 3) == 0);
1907+
rocksdb_free(cf_name);
1908+
}
1909+
1910+
// Test zero-copy get with column families
1911+
{
1912+
rocksdb_pinnable_handle_t* handle =
1913+
rocksdb_get_pinned_cf_v2(db, roptions, handles[1], "box", 3, &err);
1914+
CheckNoError(err);
1915+
CheckCondition(handle != NULL);
1916+
size_t val_len;
1917+
const char* val = rocksdb_pinnable_handle_get_value(handle, &val_len);
1918+
CheckEqual("c", val, val_len);
1919+
rocksdb_pinnable_handle_destroy(handle);
1920+
1921+
// Test with non-existent key
1922+
handle = rocksdb_get_pinned_cf_v2(db, roptions, handles[1], "notfound", 8,
1923+
&err);
1924+
CheckNoError(err);
1925+
CheckCondition(handle == NULL);
1926+
1927+
// Test rocksdb_get_into_buffer_cf
1928+
char buffer[100];
1929+
unsigned char found;
1930+
unsigned char success = rocksdb_get_into_buffer_cf(
1931+
db, roptions, handles[1], "buff", 4, buffer, sizeof(buffer), &val_len,
1932+
&found, &err);
1933+
CheckNoError(err);
1934+
CheckCondition(success == 1);
1935+
CheckCondition(found == 1);
1936+
CheckCondition(val_len == 7);
1937+
CheckCondition(memcmp(buffer, "rocksdb", 7) == 0);
1938+
1939+
// Test with buffer too small
1940+
success = rocksdb_get_into_buffer_cf(db, roptions, handles[1], "buff", 4,
1941+
buffer, 3, // Buffer too small
1942+
&val_len, &found, &err);
1943+
CheckNoError(err);
1944+
CheckCondition(success == 0); // Should fail due to small buffer
1945+
CheckCondition(found == 1);
1946+
CheckCondition(val_len == 7); // Should still report actual size
1947+
}
1948+
18531949
// Test WriteBatchWithIndex iteration with Column Family
18541950
rocksdb_writebatch_wi_t* wbwi = rocksdb_writebatch_wi_create(0, true);
18551951
rocksdb_writebatch_wi_put_cf(wbwi, handles[1], "boat", 4, "row",
@@ -3397,6 +3493,17 @@ int main(int argc, char** argv) {
33973493
rocksdb_transaction_put(txn, "foo", 3, "hello", 5, &err);
33983494
CheckNoError(err);
33993495

3496+
// test transaction get/set name (before commit)
3497+
{
3498+
rocksdb_transaction_set_name(txn, "test_txn", 8, &err);
3499+
CheckNoError(err);
3500+
size_t name_len;
3501+
char* txn_name = rocksdb_transaction_get_name(txn, &name_len);
3502+
CheckCondition(name_len == 8);
3503+
CheckCondition(memcmp(txn_name, "test_txn", 8) == 0);
3504+
rocksdb_free(txn_name);
3505+
}
3506+
34003507
// read from outside transaction, before commit
34013508
CheckTxnDBGet(txn_db, roptions, "foo", NULL);
34023509
CheckTxnDBPinGet(txn_db, roptions, "foo", NULL);

0 commit comments

Comments
 (0)