diff --git a/src/hashtable.c b/src/hashtable.c index 11ba360800..2f174dbefd 100644 --- a/src/hashtable.c +++ b/src/hashtable.c @@ -294,19 +294,31 @@ struct hashtable { void *metadata[]; }; +struct bucketPrefetchInfo { + bucket *curr_bucket; + long index; + size_t last_seen_size; /* Safe iterator temporary storage for bucket chain compaction */ + uint8_t table; + uint8_t pos; + enum { + BUCKET_INIT, + BUCKET_PREFETCH_ENTRIES, + BUCKET_ENTRIES_READY, + BUCKET_HASHTABLE_DONE, + } state; +}; + typedef struct { hashtable *hashtable; - bucket *bucket; long index; - uint16_t pos_in_bucket; uint8_t table; uint8_t safe; - union { - /* Unsafe iterator fingerprint for misuse detection. */ - uint64_t fingerprint; - /* Safe iterator temporary storage for bucket chain compaction. */ - uint64_t last_seen_size; - }; + uint8_t num_of_buckets; /* Number of buckets being processed in parallel */ + uint8_t cur_bucket_index; /* Index of the current bucket being processed in the round-robin cycle */ + uint8_t num_of_active_buckets; /* Number of buckets that are still actively iterating */ + uint64_t fingerprint; /* Unsafe iterator fingerprint for misuse detection */ + /* Array of bucket prefetch info structures */ + struct bucketPrefetchInfo buckets_prefetch_info[HASHTABLE_ITER_WIDTH]; } iter; /* The opaque hashtableIterator is defined as a blob of bytes. */ @@ -934,6 +946,75 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin return (incrementalFind *)(void *)state; } +static void initBucketPrefetchInfo(struct bucketPrefetchInfo *info, iter *iter) { + if (iter->safe) { + info->last_seen_size = iter->hashtable->used[iter->table]; + } + info->curr_bucket = &iter->hashtable->tables[iter->table][iter->index]; + info->index = iter->index; + info->table = iter->table; +} + +static void initIteratorFields(iter *iter, hashtable *ht, uint8_t safe) { + iter->hashtable = ht; + iter->table = 0; + iter->index = -1; + iter->safe = safe; + iter->num_of_buckets = HASHTABLE_ITER_WIDTH; + iter->num_of_active_buckets = iter->num_of_buckets; + iter->cur_bucket_index = 0; + for (int i = 0; i < iter->num_of_buckets; i++) { + iter->buckets_prefetch_info[i].state = BUCKET_INIT; + iter->buckets_prefetch_info[i].curr_bucket = NULL; + iter->buckets_prefetch_info[i].pos = 0; + iter->buckets_prefetch_info[i].index = 0; + iter->buckets_prefetch_info[i].table = 0; + iter->buckets_prefetch_info[i].last_seen_size = 0; + } +} + +static void prefetchBucketEntries(bucket *b) { + for (int pos = 0; pos < numBucketPositions(b); pos++) { + if (isPositionFilled(b, pos)) { + valkey_prefetch(b->entries[pos]); + } + } + if (b->chained) { + valkey_prefetch(bucketNext(b)); + } +} + +static int getNextEntryFromBucket(struct bucketPrefetchInfo *info, void **elemptr) { + bucket *b = info->curr_bucket; + int found = 0; + while ((int)info->pos < numBucketPositions(b) && !found) { + if (isPositionFilled(b, info->pos)) { + *elemptr = b->entries[info->pos]; + found = 1; + } + info->pos++; + } + return found; +} + +static void compactBucketChainIfNeeded(iter *iter, struct bucketPrefetchInfo *info) { + /* If entries in current bucket chain have been deleted, + * they've left empty spaces in the buckets. The chain is + * not automatically compacted when rehashing is paused. If + * this iterator is the only reason for pausing rehashing, + * we can do the compaction now when we're done with a + * bucket chain, before we move on to the next index. */ + if (iter->safe && + iter->hashtable->pause_rehash == 1 && + iter->hashtable->used[iter->table] < info->last_seen_size) { + compactBucketChain(iter->hashtable, info->index, info->table); + } +} + +static int isFirstCallToNext(const iter *iter) { + return (iter->index == -1 && iter->table == 0); +} + /* --- API functions --- */ /* Allocates and initializes a new hashtable specified by the given type. */ @@ -1754,12 +1835,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f * hashtableResetIterator when you are done. See also * hashtableInitSafeIterator. */ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) { - iter *iter; - iter = iteratorFromOpaque(iterator); - iter->hashtable = ht; - iter->table = 0; - iter->index = -1; - iter->safe = 0; + initIteratorFields(iteratorFromOpaque(iterator), ht, 0); } /* Initialize a safe iterator, which is allowed to modify the hash table while @@ -1786,9 +1862,7 @@ void hashtableInitIterator(hashtableIterator *iterator, hashtable *ht) { * by the iterator. */ void hashtableInitSafeIterator(hashtableIterator *iterator, hashtable *ht) { - hashtableInitIterator(iterator, ht); - iter *iter = iteratorFromOpaque(iterator); - iter->safe = 1; + initIteratorFields(iteratorFromOpaque(iterator), ht, 1); } /* Resets a stack-allocated iterator. */ @@ -1828,77 +1902,116 @@ void hashtableReleaseIterator(hashtableIterator *iterator) { zfree(iter); } -/* Points elemptr to the next entry and returns 1 if there is a next entry. - * Returns 0 if there are no more entries. */ -int hashtableNext(hashtableIterator *iterator, void **elemptr) { - iter *iter = iteratorFromOpaque(iterator); +/* Manages the state machine for a single bucket prefetch info in the hashtable iterator. + * Processes the current state, transitions to the next appropriate state, + * and returns 1 if an element is found, 0 otherwise. */ +static int handleBucketState(iter *iter, struct bucketPrefetchInfo *info, void **elemptr) { + int found = 0; while (1) { - if (iter->index == -1 && iter->table == 0) { - /* It's the first call to next. */ - if (iter->safe) { - hashtablePauseRehashing(iter->hashtable); - iter->last_seen_size = iter->hashtable->used[iter->table]; - } else { - iter->fingerprint = hashtableFingerprint(iter->hashtable); + switch (info->state) { + case BUCKET_HASHTABLE_DONE: + break; + case BUCKET_PREFETCH_ENTRIES: + if (info->curr_bucket->presence) { + prefetchBucketEntries(info->curr_bucket); + info->state = BUCKET_ENTRIES_READY; + break; } - if (iter->hashtable->tables[0] == NULL) { - /* Empty hashtable. We're done. */ + info->state = BUCKET_INIT; + continue; + case BUCKET_ENTRIES_READY: + if (getNextEntryFromBucket(info, elemptr)) { + found = 1; break; } - iter->index = 0; - /* Skip already rehashed buckets. */ - if (hashtableIsRehashing(iter->hashtable)) { - iter->index = iter->hashtable->rehash_idx; + if (info->curr_bucket->chained) { + info->curr_bucket = bucketNext(info->curr_bucket); + info->state = BUCKET_PREFETCH_ENTRIES; + } else { + info->state = BUCKET_INIT; } - iter->bucket = &iter->hashtable->tables[iter->table][iter->index]; - iter->pos_in_bucket = 0; - } else { - /* Advance to the next position within the bucket, or to the next - * child bucket in a chain, or to the next bucket index, or to the - * next table. */ - iter->pos_in_bucket++; - if (iter->bucket->chained && iter->pos_in_bucket >= ENTRIES_PER_BUCKET - 1) { - iter->pos_in_bucket = 0; - iter->bucket = bucketNext(iter->bucket); - } else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) { - /* Bucket index done. */ - if (iter->safe) { - /* If entries in this bucket chain have been deleted, - * they've left empty spaces in the buckets. The chain is - * not automatically compacted when rehashing is paused. If - * this iterator is the only reason for pausing rehashing, - * we can do the compaction now when we're done with a - * bucket chain, before we move on to the next index. */ - if (iter->hashtable->pause_rehash == 1 && - iter->hashtable->used[iter->table] < iter->last_seen_size) { - compactBucketChain(iter->hashtable, iter->index, iter->table); - } - iter->last_seen_size = iter->hashtable->used[iter->table]; - } - iter->pos_in_bucket = 0; - iter->index++; - if ((size_t)iter->index >= numBuckets(iter->hashtable->bucket_exp[iter->table])) { - if (hashtableIsRehashing(iter->hashtable) && iter->table == 0) { - iter->index = 0; - iter->table++; - } else { - /* Done. */ - break; - } + info->pos = 0; + continue; + case BUCKET_INIT: + compactBucketChainIfNeeded(iter, info); + iter->index++; + if ((size_t)iter->index >= numBuckets(iter->hashtable->bucket_exp[iter->table])) { + if (!hashtableIsRehashing(iter->hashtable) || iter->table > 0) { + info->state = BUCKET_HASHTABLE_DONE; + iter->num_of_active_buckets--; + break; } - iter->bucket = &iter->hashtable->tables[iter->table][iter->index]; + iter->index = 0; + iter->table++; } + initBucketPrefetchInfo(info, iter); + valkey_prefetch(info->curr_bucket); + info->state = BUCKET_PREFETCH_ENTRIES; + break; + default: + iter->num_of_active_buckets--; + assert(0); } - bucket *b = iter->bucket; - if (!isPositionFilled(b, iter->pos_in_bucket)) { - /* No entry here. */ - continue; + return found; + } +} + +/* Points elemptr to the next entry and returns 1 if there is a next entry. + * Returns 0 if there are no more entries. + * + * This iterator processes multiple buckets in parallel to improve performance: + * - It initializes a fixed number (num_of_buckets) of bucket prefetch infos. + * - Each hashtableNext call transitions the states of these bucket prefetch infos in a round-robin manner. + * - The iterator continues until an entry is found or there are no more entries in the hastable. + * + * Key optimization: When a bucket prefetch info reaches the ENTRIES_READY state, + * the entries of the corresponding bucket are already in the cache, minimizing memory access latency. + * + * State machine diagram for each bucket prefetch info: + * + * (empty bucket) + * +-------------------------+ + * | | (all entries in + * v (new bucket found) | bucket prefetched) + * +--------+ +------------------+ +-----------------+ + * | INIT | -------------->| PREFETCH_ENTRIES | -------------> | ENTRIES_READY | + * +--------+ +------------------+ +-----------------+ + * | ^ ^ | + * | | | | + * | | | (chained | + * | | | bucket) | + * | | | | + * | +-------------------------+------------------------------------+ + * | (find next + * | bucket in table) + * | + * v + * +------+ + * | DONE | + * +------+ + * (no more buckets) + */ +int hashtableNext(hashtableIterator *iterator, void **elemptr) { + iter *iter = iteratorFromOpaque(iterator); + struct bucketPrefetchInfo *info; + if (isFirstCallToNext(iter)) { + if (iter->safe) { + hashtablePauseRehashing(iter->hashtable); + } else { + iter->fingerprint = hashtableFingerprint(iter->hashtable); } - /* Return the entry at this position. */ - if (elemptr) { - *elemptr = b->entries[iter->pos_in_bucket]; + if (iter->hashtable->tables[0] == NULL) return 0; /* Empty hashtable */ + if (hashtableIsRehashing(iter->hashtable)) { + iter->index = iter->hashtable->rehash_idx - 1; } - return 1; + } + /* When a bucket prefetch info attempts to pick up a new bucket + * and the iterator index exceeds the hashtable size, + * that bucket prefetch info finishes, decrementing num_of_active_buckets. */ + while (iter->num_of_active_buckets) { + info = &iter->buckets_prefetch_info[iter->cur_bucket_index]; + if (handleBucketState(iter, info, elemptr)) return 1; + iter->cur_bucket_index = (iter->cur_bucket_index + 1) % iter->num_of_buckets; } return 0; } diff --git a/src/hashtable.h b/src/hashtable.h index 4291cf5a5d..ca8deb7acc 100644 --- a/src/hashtable.h +++ b/src/hashtable.h @@ -38,7 +38,7 @@ typedef struct hashtable hashtable; typedef struct hashtableStats hashtableStats; /* Can types that can be stack allocated. */ -typedef uint64_t hashtableIterator[5]; +typedef uint64_t hashtableIterator[30]; typedef uint64_t hashtablePosition[2]; typedef uint64_t hashtableIncrementalFindState[5]; @@ -87,6 +87,7 @@ typedef void (*hashtableScanFunction)(void *privdata, void *entry); /* Constants */ #define HASHTABLE_BUCKET_SIZE 64 /* bytes, the most common cache line size */ +#define HASHTABLE_ITER_WIDTH 6 /* Number of parallel "threads" for batch iteration */ /* Scan flags */ #define HASHTABLE_SCAN_EMIT_REF (1 << 0)