Skip to content

Commit

Permalink
twom: use pointers to a pair of records rather than copying
Browse files Browse the repository at this point in the history
  • Loading branch information
brong committed Dec 10, 2024
1 parent 816e822 commit 2a4c0a9
Showing 1 changed file with 40 additions and 38 deletions.
78 changes: 40 additions & 38 deletions lib/cyrusdb_twom.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ struct dbengine {

struct db_header header;
struct skiploc loc;
struct skiprecord recs[2];

/* tracking info */
size_t end;
Expand Down Expand Up @@ -2267,8 +2268,8 @@ static int consistent(struct dbengine *db)
/* perform some basic consistency checks */
static int myconsistent(struct dbengine *db, struct txn *tid)
{
struct skiprecord prevrecord;
struct skiprecord record;
struct skiprecord *prevp = &db->recs[0];
struct skiprecord *curp = &db->recs[1];
size_t fwd[MAXLEVEL];
size_t num_records = 0;
size_t dirty_size = 0;
Expand All @@ -2279,81 +2280,82 @@ static int myconsistent(struct dbengine *db, struct txn *tid)
assert(db->current_txn == tid); /* could both be null */

/* read in the dummy */
r = read_onerecord(db, DUMMY_OFFSET, &prevrecord);
r = read_onerecord(db, DUMMY_OFFSET, prevp);
if (r) return r;

/* set up the location pointers */
for (i = 0; i < MAXLEVEL; i++)
fwd[i] = _getloc(db, &prevrecord, i);
fwd[i] = _getloc(db, prevp, i);

while (fwd[0]) {
r = read_onerecord(db, fwd[0], &record);
r = read_onerecord(db, fwd[0], curp);
if (r) {
xsyslog(LOG_ERR, "DBERROR: failed to read record for consistent",
"fname=<%s> offset=<%08llX>",
FNAME(db), (LLU)fwd[0]);
return r;
}

size_t ancestor = record.ancestor;
cmp = bsearch_ncompare_raw(KEY(db, curp), curp->keylen,
KEY(db, prevp), prevp->keylen);
if (cmp <= 0) {
xsyslog(LOG_ERR, "DBERROR: twom out of order",
"fname=<%s> key=<%.*s> offset=<%08llX>"
" prevkey=<%.*s> prevoffset=<%08llX)",
FNAME(db), (int)curp->keylen, KEY(db, curp),
(LLU)curp->offset,
(int)prevp->keylen, KEY(db, prevp),
(LLU)prevp->offset);
return CYRUSDB_INTERNAL;
}

size_t ancestor = curp->ancestor;
while (ancestor) {
struct skiprecord parent;
r = read_onerecord(db, ancestor, &parent);
r = read_onerecord(db, ancestor, prevp);
if (r) {
xsyslog(LOG_ERR, "DBERROR: failed to read ancestor for consistent",
"fname=<%s> key=<%.*s> offset=<%08llX>",
FNAME(db), (int)record.keylen, KEY(db, &record),
FNAME(db), (int)curp->keylen, KEY(db, curp),
(LLU)ancestor);
return r;
}
cmp = bsearch_ncompare_raw(KEY(db, &record), record.keylen,
KEY(db, &parent), parent.keylen);
cmp = bsearch_ncompare_raw(KEY(db, curp), curp->keylen,
KEY(db, prevp), prevp->keylen);
if (cmp) {
xsyslog(LOG_ERR, "DBERROR: twom mismatched ancestor",
"fname=<%s> key=<%.*s> offset=<%08llX>"
" parentkey=<%.*s> parentoffset=<%08llX)",
FNAME(db), (int)record.keylen, KEY(db, &record),
(LLU)record.offset,
(int)parent.keylen, KEY(db, &parent),
(LLU)parent.offset);
FNAME(db), (int)curp->keylen, KEY(db, curp),
(LLU)curp->offset,
(int)prevp->keylen, KEY(db, prevp),
(LLU)prevp->offset);
return CYRUSDB_INTERNAL;
}
ancestor = parent.ancestor;
dirty_size += parent.len;
ancestor = prevp->ancestor;
dirty_size += prevp->len;
}

cmp = bsearch_ncompare_raw(KEY(db, &record), record.keylen,
KEY(db, &prevrecord), prevrecord.keylen);
if (cmp <= 0) {
xsyslog(LOG_ERR, "DBERROR: twom out of order",
"fname=<%s> key=<%.*s> offset=<%08llX>"
" prevkey=<%.*s> prevoffset=<%08llX)",
FNAME(db), (int)record.keylen, KEY(db, &record),
(LLU)record.offset,
(int)prevrecord.keylen, KEY(db, &prevrecord),
(LLU)prevrecord.offset);
return CYRUSDB_INTERNAL;
}

for (i = 0; i < record.level; i++) {
for (i = 0; i < curp->level; i++) {
/* check the old pointer was to here */
if (fwd[i] != record.offset) {
if (fwd[i] != curp->offset) {
xsyslog(LOG_ERR, "DBERROR: twom broken linkage",
"filename=<%s> offset=<%08llX> level=<%d>"
" expected=<%08llX>",
FNAME(db), (LLU)record.offset, i, (LLU)fwd[i]);
FNAME(db), (LLU)curp->offset, i, (LLU)fwd[i]);
return CYRUSDB_INTERNAL;
}
/* and advance to the new pointer */
fwd[i] = _getloc(db, &record, i);
fwd[i] = _getloc(db, curp, i);
}

/* keep a copy for comparison purposes */
prevrecord = record;

// count if record or tombstone
if (record.type == DELETE) dirty_size += record.len;
if (curp->type == DELETE) dirty_size += curp->len;
else num_records++;

/* switch pointers for next time */
struct skiprecord *temp = prevp;
prevp = curp;
curp = temp;
}

for (i = 0; i < MAXLEVEL; i++) {
Expand Down

0 comments on commit 2a4c0a9

Please sign in to comment.