Skip to content

Commit

Permalink
Improving iterator using prefetch
Browse files Browse the repository at this point in the history
Signed-off-by: NadavGigi <[email protected]>
  • Loading branch information
NadavGigi committed Jan 5, 2025
1 parent 33b8241 commit 05d93e2
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 79 deletions.
269 changes: 191 additions & 78 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,31 @@ struct hashtable {
void *metadata[];
};

struct bucketPrefetchInfo {
bucket *curr_bucket;
long index;
size_t last_seen_size; /* Safe iterator temporary storage for bucket chain compaction */
uint8_t table;
uint8_t pos;
enum {
BUCKET_INIT,
BUCKET_PREFETCH_ENTRIES,
BUCKET_ENTRIES_READY,
BUCKET_HASHTABLE_DONE,
} state;
};

typedef struct {
hashtable *hashtable;
bucket *bucket;
long index;
uint16_t pos_in_bucket;
uint8_t table;
uint8_t safe;
union {
/* Unsafe iterator fingerprint for misuse detection. */
uint64_t fingerprint;
/* Safe iterator temporary storage for bucket chain compaction. */
uint64_t last_seen_size;
};
uint8_t num_of_buckets; /* Number of buckets being processed in parallel */
uint8_t cur_bucket_index; /* Index of the current bucket being processed in the round-robin cycle */
uint8_t num_of_active_buckets; /* Number of buckets that are still actively iterating */
uint64_t fingerprint; /* Unsafe iterator fingerprint for misuse detection */
/* Array of bucket prefetch info structures */
struct bucketPrefetchInfo buckets_prefetch_info[HASHTABLE_ITER_WIDTH];
} iter;

/* The opaque hashtableIterator is defined as a blob of bytes. */
Expand Down Expand Up @@ -934,6 +946,75 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin
return (incrementalFind *)(void *)state;
}

static void initBucketPrefetchInfo(struct bucketPrefetchInfo *info, iter *iter) {
if (iter->safe) {
info->last_seen_size = iter->hashtable->used[iter->table];
}
info->curr_bucket = &iter->hashtable->tables[iter->table][iter->index];
info->index = iter->index;
info->table = iter->table;
}

static void initIteratorFields(iter *iter, hashtable *ht, uint8_t safe) {
iter->hashtable = ht;
iter->table = 0;
iter->index = -1;
iter->safe = safe;
iter->num_of_buckets = HASHTABLE_ITER_WIDTH;
iter->num_of_active_buckets = iter->num_of_buckets;
iter->cur_bucket_index = 0;
for (int i = 0; i < iter->num_of_buckets; i++) {
iter->buckets_prefetch_info[i].state = BUCKET_INIT;
iter->buckets_prefetch_info[i].curr_bucket = NULL;
iter->buckets_prefetch_info[i].pos = 0;
iter->buckets_prefetch_info[i].index = 0;
iter->buckets_prefetch_info[i].table = 0;
iter->buckets_prefetch_info[i].last_seen_size = 0;
}
}

static void prefetchBucketEntries(bucket *b) {
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
valkey_prefetch(b->entries[pos]);
}
}
if (b->chained) {
valkey_prefetch(bucketNext(b));
}
}

static int getNextEntryFromBucket(struct bucketPrefetchInfo *info, void **elemptr) {
bucket *b = info->curr_bucket;
int found = 0;
while ((int)info->pos < numBucketPositions(b) && !found) {
if (isPositionFilled(b, info->pos)) {
*elemptr = b->entries[info->pos];
found = 1;
}
info->pos++;
}
return found;
}

static void compactBucketChainIfNeeded(iter *iter, struct bucketPrefetchInfo *info) {
/* If entries in current bucket chain have been deleted,
* they've left empty spaces in the buckets. The chain is
* not automatically compacted when rehashing is paused. If
* this iterator is the only reason for pausing rehashing,
* we can do the compaction now when we're done with a
* bucket chain, before we move on to the next index. */
if (iter->safe &&
iter->hashtable->pause_rehash == 1 &&
iter->hashtable->used[iter->table] < info->last_seen_size) {
compactBucketChain(iter->hashtable, info->index, info->table);
}
}

static int isFirstCallToNext(const iter *iter) {
return (iter->index == -1 && iter->table == 0);
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand Down Expand Up @@ -1754,12 +1835,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
* hashtableResetIterator when you are done. See also
* hashtableInitSafeIterator. */
void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
iter *iter;
iter = iteratorFromOpaque(iterator);
iter->hashtable = ht;
iter->table = 0;
iter->index = -1;
iter->safe = 0;
initIteratorFields(iteratorFromOpaque(iterator), ht, 0);
}

/* Initialize a safe iterator, which is allowed to modify the hash table while
Expand All @@ -1786,9 +1862,7 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
* by the iterator.
*/
void hashtableInitSafeIterator(hashtableIterator *iterator, hashtable *ht) {
hashtableInitIterator(iterator, ht);
iter *iter = iteratorFromOpaque(iterator);
iter->safe = 1;
initIteratorFields(iteratorFromOpaque(iterator), ht, 1);
}

/* Resets a stack-allocated iterator. */
Expand Down Expand Up @@ -1828,77 +1902,116 @@ void hashtableReleaseIterator(hashtableIterator *iterator) {
zfree(iter);
}

/* Points elemptr to the next entry and returns 1 if there is a next entry.
* Returns 0 if there are no more entries. */
int hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter *iter = iteratorFromOpaque(iterator);
/* Manages the state machine for a single bucket prefetch info in the hashtable iterator.
* Processes the current state, transitions to the next appropriate state,
* and returns 1 if an element is found, 0 otherwise. */
static int handleBucketState(iter *iter, struct bucketPrefetchInfo *info, void **elemptr) {
int found = 0;
while (1) {
if (iter->index == -1 && iter->table == 0) {
/* It's the first call to next. */
if (iter->safe) {
hashtablePauseRehashing(iter->hashtable);
iter->last_seen_size = iter->hashtable->used[iter->table];
} else {
iter->fingerprint = hashtableFingerprint(iter->hashtable);
switch (info->state) {
case BUCKET_HASHTABLE_DONE:
break;
case BUCKET_PREFETCH_ENTRIES:
if (info->curr_bucket->presence) {
prefetchBucketEntries(info->curr_bucket);
info->state = BUCKET_ENTRIES_READY;
break;
}
if (iter->hashtable->tables[0] == NULL) {
/* Empty hashtable. We're done. */
info->state = BUCKET_INIT;
continue;
case BUCKET_ENTRIES_READY:
if (getNextEntryFromBucket(info, elemptr)) {
found = 1;
break;
}
iter->index = 0;
/* Skip already rehashed buckets. */
if (hashtableIsRehashing(iter->hashtable)) {
iter->index = iter->hashtable->rehash_idx;
if (info->curr_bucket->chained) {
info->curr_bucket = bucketNext(info->curr_bucket);
info->state = BUCKET_PREFETCH_ENTRIES;
} else {
info->state = BUCKET_INIT;
}
iter->bucket = &iter->hashtable->tables[iter->table][iter->index];
iter->pos_in_bucket = 0;
} else {
/* Advance to the next position within the bucket, or to the next
* child bucket in a chain, or to the next bucket index, or to the
* next table. */
iter->pos_in_bucket++;
if (iter->bucket->chained && iter->pos_in_bucket >= ENTRIES_PER_BUCKET - 1) {
iter->pos_in_bucket = 0;
iter->bucket = bucketNext(iter->bucket);
} else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) {
/* Bucket index done. */
if (iter->safe) {
/* If entries in this bucket chain have been deleted,
* they've left empty spaces in the buckets. The chain is
* not automatically compacted when rehashing is paused. If
* this iterator is the only reason for pausing rehashing,
* we can do the compaction now when we're done with a
* bucket chain, before we move on to the next index. */
if (iter->hashtable->pause_rehash == 1 &&
iter->hashtable->used[iter->table] < iter->last_seen_size) {
compactBucketChain(iter->hashtable, iter->index, iter->table);
}
iter->last_seen_size = iter->hashtable->used[iter->table];
}
iter->pos_in_bucket = 0;
iter->index++;
if ((size_t)iter->index >= numBuckets(iter->hashtable->bucket_exp[iter->table])) {
if (hashtableIsRehashing(iter->hashtable) && iter->table == 0) {
iter->index = 0;
iter->table++;
} else {
/* Done. */
break;
}
info->pos = 0;
continue;
case BUCKET_INIT:
compactBucketChainIfNeeded(iter, info);
iter->index++;
if ((size_t)iter->index >= numBuckets(iter->hashtable->bucket_exp[iter->table])) {
if (!hashtableIsRehashing(iter->hashtable) || iter->table > 0) {
info->state = BUCKET_HASHTABLE_DONE;
iter->num_of_active_buckets--;
break;
}
iter->bucket = &iter->hashtable->tables[iter->table][iter->index];
iter->index = 0;
iter->table++;
}
initBucketPrefetchInfo(info, iter);
valkey_prefetch(info->curr_bucket);
info->state = BUCKET_PREFETCH_ENTRIES;
break;
default:
iter->num_of_active_buckets--;
assert(0);
}
bucket *b = iter->bucket;
if (!isPositionFilled(b, iter->pos_in_bucket)) {
/* No entry here. */
continue;
return found;
}
}

/* Points elemptr to the next entry and returns 1 if there is a next entry.
* Returns 0 if there are no more entries.
*
* This iterator processes multiple buckets in parallel to improve performance:
* - It initializes a fixed number (num_of_buckets) of bucket prefetch infos.
* - Each hashtableNext call transitions the states of these bucket prefetch infos in a round-robin manner.
* - The iterator continues until an entry is found or there are no more entries in the hastable.
*
* Key optimization: When a bucket prefetch info reaches the ENTRIES_READY state,
* the entries of the corresponding bucket are already in the cache, minimizing memory access latency.
*
* State machine diagram for each bucket prefetch info:
*
* (empty bucket)
* +-------------------------+
* | | (all entries in
* v (new bucket found) | bucket prefetched)
* +--------+ +------------------+ +-----------------+
* | INIT | -------------->| PREFETCH_ENTRIES | -------------> | ENTRIES_READY |
* +--------+ +------------------+ +-----------------+
* | ^ ^ |
* | | | |
* | | | (chained |
* | | | bucket) |
* | | | |
* | +-------------------------+------------------------------------+
* | (find next
* | bucket in table)
* |
* v
* +------+
* | DONE |
* +------+
* (no more buckets)
*/
int hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter *iter = iteratorFromOpaque(iterator);
struct bucketPrefetchInfo *info;
if (isFirstCallToNext(iter)) {
if (iter->safe) {
hashtablePauseRehashing(iter->hashtable);
} else {
iter->fingerprint = hashtableFingerprint(iter->hashtable);
}
/* Return the entry at this position. */
if (elemptr) {
*elemptr = b->entries[iter->pos_in_bucket];
if (iter->hashtable->tables[0] == NULL) return 0; /* Empty hashtable */
if (hashtableIsRehashing(iter->hashtable)) {
iter->index = iter->hashtable->rehash_idx - 1;
}
return 1;
}
/* When a bucket prefetch info attempts to pick up a new bucket
* and the iterator index exceeds the hashtable size,
* that bucket prefetch info finishes, decrementing num_of_active_buckets. */
while (iter->num_of_active_buckets) {
info = &iter->buckets_prefetch_info[iter->cur_bucket_index];
if (handleBucketState(iter, info, elemptr)) return 1;
iter->cur_bucket_index = (iter->cur_bucket_index + 1) % iter->num_of_buckets;
}
return 0;
}
Expand Down
3 changes: 2 additions & 1 deletion src/hashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ typedef struct hashtable hashtable;
typedef struct hashtableStats hashtableStats;

/* Can types that can be stack allocated. */
typedef uint64_t hashtableIterator[5];
typedef uint64_t hashtableIterator[30];
typedef uint64_t hashtablePosition[2];
typedef uint64_t hashtableIncrementalFindState[5];

Expand Down Expand Up @@ -87,6 +87,7 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry);

/* Constants */
#define HASHTABLE_BUCKET_SIZE 64 /* bytes, the most common cache line size */
#define HASHTABLE_ITER_WIDTH 6 /* Number of parallel "threads" for batch iteration */

/* Scan flags */
#define HASHTABLE_SCAN_EMIT_REF (1 << 0)
Expand Down

0 comments on commit 05d93e2

Please sign in to comment.