Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerate hash table iterator with prefetching #1501

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 188 additions & 78 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,19 +294,31 @@ struct hashtable {
void *metadata[];
};

struct bucketPrefetchInfo {
bucket *curr_bucket;
long index;
size_t last_seen_size; /* Safe iterator temporary storage for bucket chain compaction */
uint8_t table;
uint8_t pos;
enum {
BUCKET_INIT,
BUCKET_PREFETCH_ENTRIES,
BUCKET_ENTRIES_READY,
BUCKET_HASHTABLE_DONE,
} state;
};

typedef struct {
hashtable *hashtable;
bucket *bucket;
long index;
uint16_t pos_in_bucket;
uint8_t table;
uint8_t safe;
union {
/* Unsafe iterator fingerprint for misuse detection. */
uint64_t fingerprint;
/* Safe iterator temporary storage for bucket chain compaction. */
uint64_t last_seen_size;
};
uint8_t num_of_buckets; /* Number of parallel buckets being processed for batch iteration */
uint8_t cur_bucket_index; /* Index of the current bucket being processed in the round-robin cycle */
uint8_t num_of_active_buckets; /* Number of buckets that are still actively iterating */
uint64_t fingerprint; /* Unsafe iterator fingerprint for misuse detection */
/* Array of bucket prefetch info structures */
struct bucketPrefetchInfo buckets_prefetch_info[HASHTABLE_ITER_WIDTH];
} iter;

/* The opaque hashtableIterator is defined as a blob of bytes. */
Expand Down Expand Up @@ -934,6 +946,75 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin
return (incrementalFind *)(void *)state;
}

static void initBucketPrefetchInfo(struct bucketPrefetchInfo *info, iter *iter) {
if (iter->safe) {
info->last_seen_size = iter->hashtable->used[iter->table];
}
info->curr_bucket = &iter->hashtable->tables[iter->table][iter->index];
info->index = iter->index;
info->table = iter->table;
}

static void initIteratorFields(iter *iter, hashtable *ht, uint8_t safe) {
iter->hashtable = ht;
iter->table = 0;
iter->index = -1;
iter->safe = safe;
iter->num_of_buckets = HASHTABLE_ITER_WIDTH;
iter->num_of_active_buckets = iter->num_of_buckets;
iter->cur_bucket_index = 0;
for (int i = 0; i < iter->num_of_buckets; i++) {
iter->buckets_prefetch_info[i].state = BUCKET_INIT;
iter->buckets_prefetch_info[i].curr_bucket = NULL;
iter->buckets_prefetch_info[i].pos = 0;
iter->buckets_prefetch_info[i].index = 0;
iter->buckets_prefetch_info[i].table = 0;
iter->buckets_prefetch_info[i].last_seen_size = 0;
}
}

static void prefetchBucketEntries(bucket *b) {
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
valkey_prefetch(b->entries[pos]);
}
}
if (b->chained) {
valkey_prefetch(bucketNext(b));
}
}

static int getNextEntryFromBucket(struct bucketPrefetchInfo *info, void **elemptr) {
bucket *b = info->curr_bucket;
int found = 0;
while ((int)info->pos < numBucketPositions(b) && !found) {
if (isPositionFilled(b, info->pos)) {
*elemptr = b->entries[info->pos];
found = 1;
}
info->pos++;
}
return found;
}

static void compactBucketChainIfNeeded(iter *iter, struct bucketPrefetchInfo *info) {
/* If entries in current bucket chain have been deleted,
* they've left empty spaces in the buckets. The chain is
* not automatically compacted when rehashing is paused. If
* this iterator is the only reason for pausing rehashing,
* we can do the compaction now when we're done with a
* bucket chain, before we move on to the next index. */
if (iter->safe &&
iter->hashtable->pause_rehash == 1 &&
iter->hashtable->used[iter->table] < info->last_seen_size) {
compactBucketChain(iter->hashtable, info->index, info->table);
}
}

static int isFirstCallToNext(const iter *iter) {
return (iter->index == -1 && iter->table == 0);
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand Down Expand Up @@ -1754,12 +1835,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
* hashtableResetIterator when you are done. See also
* hashtableInitSafeIterator. */
void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
iter *iter;
iter = iteratorFromOpaque(iterator);
iter->hashtable = ht;
iter->table = 0;
iter->index = -1;
iter->safe = 0;
initIteratorFields(iteratorFromOpaque(iterator), ht, 0);
}

/* Initialize a safe iterator, which is allowed to modify the hash table while
Expand All @@ -1786,9 +1862,7 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) {
* by the iterator.
*/
void hashtableInitSafeIterator(hashtableIterator *iterator, hashtable *ht) {
hashtableInitIterator(iterator, ht);
iter *iter = iteratorFromOpaque(iterator);
iter->safe = 1;
initIteratorFields(iteratorFromOpaque(iterator), ht, 1);
}

/* Resets a stack-allocated iterator. */
Expand Down Expand Up @@ -1828,77 +1902,113 @@ void hashtableReleaseIterator(hashtableIterator *iterator) {
zfree(iter);
}

/* Points elemptr to the next entry and returns 1 if there is a next entry.
* Returns 0 if there are no more entries. */
int hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter *iter = iteratorFromOpaque(iterator);
/* Manages the state machine for a single bucket prefetch info in the hashtable iterator.
* Processes the current state, transitions to the next appropriate state,
* and returns 1 if an element is found, 0 otherwise. */
static int handleBucketState(iter *iter, struct bucketPrefetchInfo *info, void **elemptr) {
int found = 0;
while (1) {
if (iter->index == -1 && iter->table == 0) {
/* It's the first call to next. */
if (iter->safe) {
hashtablePauseRehashing(iter->hashtable);
iter->last_seen_size = iter->hashtable->used[iter->table];
} else {
iter->fingerprint = hashtableFingerprint(iter->hashtable);
switch (info->state) {
case BUCKET_HASHTABLE_DONE:
break;
case BUCKET_PREFETCH_ENTRIES:
if (info->curr_bucket->presence) {
prefetchBucketEntries(info->curr_bucket);
info->state = BUCKET_ENTRIES_READY;
break;
}
if (iter->hashtable->tables[0] == NULL) {
/* Empty hashtable. We're done. */
info->state = BUCKET_INIT;
continue;
case BUCKET_ENTRIES_READY:
if (getNextEntryFromBucket(info, elemptr)) {
found = 1;
break;
}
iter->index = 0;
/* Skip already rehashed buckets. */
if (hashtableIsRehashing(iter->hashtable)) {
iter->index = iter->hashtable->rehash_idx;
if (info->curr_bucket->chained) {
info->curr_bucket = bucketNext(info->curr_bucket);
info->state = BUCKET_PREFETCH_ENTRIES;
} else {
info->state = BUCKET_INIT;
}
iter->bucket = &iter->hashtable->tables[iter->table][iter->index];
iter->pos_in_bucket = 0;
} else {
/* Advance to the next position within the bucket, or to the next
* child bucket in a chain, or to the next bucket index, or to the
* next table. */
iter->pos_in_bucket++;
if (iter->bucket->chained && iter->pos_in_bucket >= ENTRIES_PER_BUCKET - 1) {
iter->pos_in_bucket = 0;
iter->bucket = bucketNext(iter->bucket);
} else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) {
/* Bucket index done. */
if (iter->safe) {
/* If entries in this bucket chain have been deleted,
* they've left empty spaces in the buckets. The chain is
* not automatically compacted when rehashing is paused. If
* this iterator is the only reason for pausing rehashing,
* we can do the compaction now when we're done with a
* bucket chain, before we move on to the next index. */
if (iter->hashtable->pause_rehash == 1 &&
iter->hashtable->used[iter->table] < iter->last_seen_size) {
compactBucketChain(iter->hashtable, iter->index, iter->table);
}
iter->last_seen_size = iter->hashtable->used[iter->table];
}
iter->pos_in_bucket = 0;
iter->index++;
if ((size_t)iter->index >= numBuckets(iter->hashtable->bucket_exp[iter->table])) {
if (hashtableIsRehashing(iter->hashtable) && iter->table == 0) {
iter->index = 0;
iter->table++;
} else {
/* Done. */
break;
}
info->pos = 0;
continue;
case BUCKET_INIT:
compactBucketChainIfNeeded(iter, info);
iter->index++;
if ((size_t)iter->index >= numBuckets(iter->hashtable->bucket_exp[iter->table])) {
if (!hashtableIsRehashing(iter->hashtable) || iter->table > 0) {
info->state = BUCKET_HASHTABLE_DONE;
iter->num_of_active_buckets--;
break;
}
iter->bucket = &iter->hashtable->tables[iter->table][iter->index];
iter->index = 0;
iter->table++;
}
initBucketPrefetchInfo(info, iter);
valkey_prefetch(info->curr_bucket);
info->state = BUCKET_PREFETCH_ENTRIES;
break;
default:
assert(0);
}
bucket *b = iter->bucket;
if (!isPositionFilled(b, iter->pos_in_bucket)) {
/* No entry here. */
continue;
return found;
}
}

/* Points elemptr to the next entry and returns 1 if there is a next entry.
* Returns 0 if there are no more entries.
* The iterator processes multiple buckets in parallel to improve performance:
* - It initializes a fixed number (num_of_buckets) of bucket prefetch infos.
* - Each hashtableNext call transitions the states of these bucket prefetch infos in a round-robin manner.
* - The iterator continues until an entry is found or until all entries in the hashtable have been returned.
* Key optimization: When a bucket prefetch info reaches the ENTRIES_READY state,
* the entries of the corresponding bucket are already in the cache, minimizing memory access latency.
*
* State machine diagram for each bucket prefetch info:
*
* (empty bucket)
* +-------------------------+
* | | (all entries in
* v (new bucket found) | bucket prefetched)
* +--------+ +------------------+ +-----------------+
* | INIT | -------------->| PREFETCH_ENTRIES | -------------> | ENTRIES_READY |
* +--------+ +------------------+ +-----------------+
* | ^ ^ |
* | | | |
* | | | (chained |
* | | | bucket) |
* | | | |
* | +-------------------------+------------------------------------+
* | (find next
* | bucket in table)
* |
* v
* +------+
* | DONE |
* +------+
* (no more buckets)
*/
int hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter *iter = iteratorFromOpaque(iterator);
struct bucketPrefetchInfo *info;
if (isFirstCallToNext(iter)) {
if (iter->safe) {
hashtablePauseRehashing(iter->hashtable);
} else {
iter->fingerprint = hashtableFingerprint(iter->hashtable);
}
/* Return the entry at this position. */
if (elemptr) {
*elemptr = b->entries[iter->pos_in_bucket];
if (iter->hashtable->tables[0] == NULL) return 0; /* Empty hashtable */
if (hashtableIsRehashing(iter->hashtable)) {
iter->index = iter->hashtable->rehash_idx - 1;
}
return 1;
}
/* When a bucket prefetch info attempts to pick up a new bucket
* and the iterator index exceeds the hashtable size,
* that bucket prefetch info finishes, decrementing num_of_active_buckets. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding

iter->num_of_active_buckets = min(iter->num_of_active_buckets, iter->num_of_buckets);

to avoid repeatedly calling handleBucketState for the same bucket when iter->num_of_buckets is < iter->num_of_active_buckets.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not a possible scenario. iter->num_of_active_buckets gets the value of iter->num_of_bucket during iterator init and then only decremented while we never changing iter->num_of_bucket.

while (iter->num_of_active_buckets) {
info = &iter->buckets_prefetch_info[iter->cur_bucket_index];
if (handleBucketState(iter, info, elemptr)) return 1;
iter->cur_bucket_index = (iter->cur_bucket_index + 1) % iter->num_of_buckets;
}
return 0;
}
Expand Down
3 changes: 2 additions & 1 deletion src/hashtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ typedef struct hashtable hashtable;
typedef struct hashtableStats hashtableStats;

/* Can types that can be stack allocated. */
typedef uint64_t hashtableIterator[5];
typedef uint64_t hashtableIterator[30];
typedef uint64_t hashtablePosition[2];
typedef uint64_t hashtableIncrementalFindState[5];

Expand Down Expand Up @@ -87,6 +87,7 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry);

/* Constants */
#define HASHTABLE_BUCKET_SIZE 64 /* bytes, the most common cache line size */
#define HASHTABLE_ITER_WIDTH 6 /* Number of parallel buckets being processed for batch iteration */

/* Scan flags */
#define HASHTABLE_SCAN_EMIT_REF (1 << 0)
Expand Down
Loading