Skip to content

Commit

Permalink
twom: use different locations for foreach
Browse files Browse the repository at this point in the history
  • Loading branch information
brong committed Dec 10, 2024
1 parent af25800 commit 74d9e42
Showing 1 changed file with 39 additions and 40 deletions.
79 changes: 39 additions & 40 deletions lib/cyrusdb_twom.c
Original file line number Diff line number Diff line change
Expand Up @@ -790,12 +790,11 @@ static void _setloc(struct dbengine *db, struct skiprecord *record,
/* finds a record, either an exact match or the record
* immediately before */
#ifdef HAVE_DECLARE_OPTIMIZE
static int relocate(struct dbengine *db)
static int relocate(struct dbengine *db, struct skiploc *loc)
__attribute__((optimize("-O3")));
#endif
static int relocate(struct dbengine *db)
static int relocate(struct dbengine *db, struct skiploc *loc)
{
struct skiploc *loc = &db->loc;
struct skiprecord *rec = &db->recs[0];
struct skiprecord *next = &db->recs[1];
size_t offset;
Expand Down Expand Up @@ -883,9 +882,8 @@ static int relocate(struct dbengine *db)

/* helper function to find a location, either by using the existing
* location if it's close enough, or using the full relocate above */
static int find_loc(struct dbengine *db, const char *key, size_t keylen)
static int find_loc(struct dbengine *db, struct skiploc *loc, const char *key, size_t keylen)
{
struct skiploc *loc = &db->loc;
struct skiprecord *rec = &db->recs[0];
int cmp, i, r;

Expand Down Expand Up @@ -945,20 +943,19 @@ static int find_loc(struct dbengine *db, const char *key, size_t keylen)
/* if we fell out here, it's not a "local" record, just search */
}

return relocate(db);
return relocate(db, loc);
}

/* helper function to advance to the "next" record. Used by foreach,
* fetchnext, and internal functions */
static int advance_loc(struct dbengine *db)
static int advance_loc(struct dbengine *db, struct skiploc *loc)
{
struct skiploc *loc = &db->loc;
uint8_t i;
int r;

/* has another session made changes? Need to re-find the location */
if (loc->end != db->end || loc->generation != db->header.generation) {
r = relocate(db);
r = relocate(db, loc);
if (r) return r;
}

Expand All @@ -973,7 +970,7 @@ static int advance_loc(struct dbengine *db)
/* reached the end? */
if (!loc->record.offset) {
buf_reset(&loc->keybuf);
return relocate(db);
return relocate(db, loc);
}

/* update forward pointers */
Expand All @@ -995,9 +992,8 @@ static int advance_loc(struct dbengine *db)
* after appending a new record, either create or delete. The
* caller must set forwardloc[] correctly for each level it has
* changed */
static int stitch(struct dbengine *db, uint8_t maxlevel, size_t newoffset)
static int stitch(struct dbengine *db, struct skiploc *loc, uint8_t maxlevel, size_t newoffset)
{
struct skiploc *loc = &db->loc;
struct skiprecord *rec = &db->recs[0];
uint8_t i;
int r;
Expand Down Expand Up @@ -1078,7 +1074,7 @@ static int store_here(struct dbengine *db, const char *val, size_t vallen)
loc->forwardloc[i] = newrecord.offset;

/* update all backpointers */
r = stitch(db, newrecord.level, newrecord.offset);
r = stitch(db, loc, newrecord.level, newrecord.offset);
if (r) return r;

/* update header to know details of new record */
Expand Down Expand Up @@ -1526,6 +1522,7 @@ static int myfetch(struct dbengine *db,
{
int r = 0;
struct txn *localtid = NULL;
struct skiploc *loc = &db->loc;

assert(db);
if (datalen) assert(data);
Expand Down Expand Up @@ -1560,16 +1557,16 @@ static int myfetch(struct dbengine *db,

(*tidptr)->counter++;

r = find_loc(db, key, keylen);
r = find_loc(db, loc, key, keylen);
if (r) goto done;

if (fetchnext) {
r = advance_loc(db);
r = advance_loc(db, loc);
if (r) goto done;
}

if (foundkey) *foundkey = db->loc.keybuf.s;
if (foundkeylen) *foundkeylen = db->loc.keybuf.len;
if (foundkey) *foundkey = loc->keybuf.s;
if (foundkeylen) *foundkeylen = loc->keybuf.len;

// if there's no match, this key never existed
if (!db->loc.is_exactmatch) {
Expand All @@ -1578,7 +1575,7 @@ static int myfetch(struct dbengine *db,
goto done;
}

struct skiprecord *rec = &db->loc.record;
struct skiprecord *rec = &loc->record;
while (rec->offset >= (*tidptr)->end) {
// no ancestor, it's not found
if (!HASANCESTOR(rec->type)) {
Expand Down Expand Up @@ -1628,6 +1625,7 @@ static int myforeach(struct dbengine *db,
const char *val;
size_t vallen;
struct buf keybuf = BUF_INITIALIZER;
struct skiploc myloc;

assert(db);
assert(cb);
Expand Down Expand Up @@ -1657,20 +1655,23 @@ static int myforeach(struct dbengine *db,
if (r) return r;
}

r = find_loc(db, prefix, prefixlen);
memset(&myloc, 0, sizeof(struct skiploc));
struct skiploc *loc = &myloc;

r = find_loc(db, loc, prefix, prefixlen);
if (r) goto done;

if (!db->loc.is_exactmatch) {
if (!loc->is_exactmatch) {
/* advance to the first match */
r = advance_loc(db);
r = advance_loc(db, loc);
if (r) goto done;
}

while (db->loc.is_exactmatch) {
while (loc->is_exactmatch) {
/* does it match prefix? */
if (prefixlen) {
if (db->loc.record.keylen < prefixlen) break;
if (bsearch_ncompare_raw(KEY(db, &db->loc.record), prefixlen, prefix, prefixlen)) break;
if (loc->record.keylen < prefixlen) break;
if (bsearch_ncompare_raw(KEY(db, &loc->record), prefixlen, prefix, prefixlen)) break;
}

// release locks every N records
Expand All @@ -1684,13 +1685,13 @@ static int myforeach(struct dbengine *db,
(*tidptr)->counter = 0;

/* should be cheap if we're already here */
r = relocate(db);
r = relocate(db, loc);
if (r) goto done;
}

(*tidptr)->counter++;

struct skiprecord *rec = &db->loc.record;
struct skiprecord *rec = &loc->record;
while (rec->offset >= (*tidptr)->end) {
if (!HASANCESTOR(rec->type)) goto next;
r = read_onerecord(db, rec->ancestor, &db->recs[0]);
Expand All @@ -1703,18 +1704,15 @@ static int myforeach(struct dbengine *db,
val = VAL(db, rec);
vallen = rec->vallen;

if ((!goodp || goodp(rock, db->loc.keybuf.s, db->loc.keybuf.len,
if ((!goodp || goodp(rock, loc->keybuf.s, loc->keybuf.len,
val, vallen))) {
/* take a copy of they key - just in case cb does actions on this database
* and clobbers loc */
buf_copy(&keybuf, &db->loc.keybuf);
if (localtid) {
r = mycommit(db, localtid);
localtid = NULL;
if (r) goto done;

/* make callback */
cb_r = cb(rock, db->loc.keybuf.s, db->loc.keybuf.len,
cb_r = cb(rock, loc->keybuf.s, loc->keybuf.len,
val, vallen);
if (cb_r) break;

Expand All @@ -1723,25 +1721,25 @@ static int myforeach(struct dbengine *db,
}
else {
/* just make the callback */
cb_r = cb(rock, db->loc.keybuf.s, db->loc.keybuf.len,
cb_r = cb(rock, loc->keybuf.s, loc->keybuf.len,
val, vallen);
if (cb_r) break;
}

/* should be cheap if we're already here */
r = find_loc(db, keybuf.s, keybuf.len);
r = find_loc(db, loc, loc->keybuf.s, loc->keybuf.len);
if (r) goto done;
}

next:
/* move to the next one */
r = advance_loc(db);
r = advance_loc(db, loc);
if (r) goto done;
}

done:

buf_free(&keybuf);
buf_free(&myloc.keybuf);

if (localtid) {
/* release read lock */
Expand Down Expand Up @@ -1798,17 +1796,18 @@ static int skipwrite(struct dbengine *db,
const char *data, size_t datalen,
int force)
{
int r = find_loc(db, key, keylen);
struct skiploc *loc = &db->loc;
int r = find_loc(db, loc, key, keylen);
if (r) return r;

/* could be a delete or a replace */
if (db->loc.is_exactmatch && HASVALUE(db->loc.record.type)) {
if (loc->is_exactmatch && HASVALUE(loc->record.type)) {
if (!data) return store_here(db, NULL, 0);
if (!force) return CYRUSDB_EXISTS;
/* unchanged? Save the IO */
if (!bsearch_ncompare_raw(data, datalen,
VAL(db, &db->loc.record),
db->loc.record.vallen))
VAL(db, &loc->record),
loc->record.vallen))
return 0;
return store_here(db, data, datalen);
}
Expand Down Expand Up @@ -2134,7 +2133,7 @@ static int mycheckpoint(struct dbengine *db)
assert(cr.db->header.generation == 1);

// set up the pointers so copy_cb logic can work
relocate(cr.db);
relocate(cr.db, &cr.db->loc);

// mvcc process all the existing records
r = myforeach(db, NULL, 0, NULL, copy_cb, &cr, &txn);
Expand Down

0 comments on commit 74d9e42

Please sign in to comment.