diff --git a/docs/pub-sub.md b/docs/pub-sub.md index ab300ce8989f..4086a424881a 100644 --- a/docs/pub-sub.md +++ b/docs/pub-sub.md @@ -2,9 +2,10 @@ This document describes how Dragonfly implements the Publish-Subscribe (Pub/Sub) messaging paradigm within its shared-nothing, multi-threaded architecture. It covers the global -subscription registry, the Read-Copy-Update (RCU) mechanism used to prevent lock contention -on the publish path, the asynchronous message delivery pipeline, and the backpressure system -that protects the server from slow-subscriber OOM. +subscription registry backed by a `ShardedHashMap` with fine-grained per-shard locking, +the RCU-style pointer swap used for lock-free reads on the publish path, the asynchronous +message delivery pipeline, and the backpressure system that protects the server from +slow-subscriber OOM. ## Overview @@ -13,19 +14,21 @@ unique challenge: subscriptions must be globally addressable across all threads, global lock on every `PUBLISH` would create a severe bottleneck. A single popular channel with thousands of subscribers could serialize all publish operations onto one shard thread. -Dragonfly solves this by using a **centralized `ChannelStore` updated via RCU -(Read-Copy-Update)**: +Dragonfly solves this with a **`ChannelStore` backed by `ShardedHashMap`** — a hash map +split into 16 independently-locked shards: -- **Reads (`PUBLISH` / `SPUBLISH`)** are lock-free and use a thread-local pointer to the - most recent `ChannelStore` snapshot. -- **Writes (`SUBSCRIBE` / `UNSUBSCRIBE` / `PSUBSCRIBE` / `PUNSUBSCRIBE`)** are serialized - by a single mutex, performed by copying the necessary routing maps, applying the mutation, - and atomically swapping the global pointer. +- **Reads (`PUBLISH` / `SPUBLISH`)** acquire a per-shard shared read lock (`read_mu_`) and + read the `SubscribeMap` pointer via an atomic acquire load (`UpdatablePointer::Get()`). + Multiple readers on the same shard proceed concurrently. +- **Writes (`SUBSCRIBE` / `UNSUBSCRIBE` / `PSUBSCRIBE` / `PUNSUBSCRIBE`)** acquire the + shard's exclusive write lock (`write_mu_`), copy the `SubscribeMap`, apply the mutation, + and atomically swap the pointer — all without blocking readers. Concurrent writes to + *different* shards are fully independent. This design avoids contention on a single shard thread for heavy throughput on a single channel and seamlessly scales across multiple threads even with a small number of channels. -Publish latency is lower than a shard-routed design because no inter-thread hop is required -to look up subscribers — the caller reads its local copy directly. +Publish latency is low because no inter-thread hop is required to look up subscribers — any +thread can read the global `ChannelStore` directly. Dragonfly supports three flavors of Pub/Sub: @@ -39,11 +42,11 @@ Dragonfly supports three flavors of Pub/Sub: | Type | Location | Role | |------|----------|------| -| `ChannelStore` | `src/server/channel_store.h` | Centralized registry mapping channels/patterns to subscribers. Updated via RCU. | -| `ChannelStoreUpdater` | `src/server/channel_store.h` | Orchestrates RCU mutations (add/remove) to the `ChannelStore`. | +| `ChannelStore` | `src/server/channel_store.h` | Global registry mapping channels/patterns to subscribers. Backed by two `ShardedHashMap` instances (channels and patterns). | +| `ChannelStoreUpdater` | `src/server/channel_store.h` | Batches subscribe/unsubscribe operations per shard and applies them via `ShardedHashMap::Mutate`. | | `ChannelStore::Subscriber` | `src/server/channel_store.h` | Represents a subscribed client. Wraps `facade::ConnectionRef` plus a pattern string. | -| `ChannelStore::ControlBlock` | `src/server/channel_store.h` | Holds the `most_recent` atomic pointer and `update_mu` mutex. Prevents overlapping structural updates. | -| `ChannelStore::ChannelMap` | `src/server/channel_store.h` | `flat_hash_map` — maps channel/pattern names to subscriber lists. | +| `ShardedHashMap` | `src/core/sharded_hash_map.h` | Thread-safe hash map split into 16 shards, each with independent `write_mu_` (Mutex) and `read_mu_` (SharedMutex). | +| `ChannelStore::ChannelMap` | `src/server/channel_store.h` | `ShardedHashMap` — maps channel/pattern names to subscriber lists across 16 independently-locked shards. | | `ChannelStore::SubscribeMap` | `src/server/channel_store.h` | `flat_hash_map` — maps subscriber contexts to their owning thread. | | `ChannelStore::UpdatablePointer` | `src/server/channel_store.h` | Atomic wrapper around `SubscribeMap*`. Supports lock-free reads (`acquire`) and RCU-style swaps (`release`). | | `ConnectionState::SubscribeInfo` | `src/server/conn_context.h` | Per-connection set of subscribed channels and patterns. Created lazily on first subscription. | @@ -58,51 +61,45 @@ Dragonfly supports three flavors of Pub/Sub: Pub/Sub Data Flow -## Subscription Management (RCU) +## Subscription Management (Shard-Locked ChannelStore) ### Data Structure Layout -Each `ChannelStore` instance holds two `ChannelMap` pointers: +The `ChannelStore` holds two `ChannelMap` instances — one for exact-channel subscriptions and +one for pattern subscriptions. Each `ChannelMap` is a `ShardedHashMap` +backed by 16 independently-locked shards:
Data Structure Layout
+Each shard carries two fiber-aware locks: + +| Lock | Type | Purpose | +|------|------|---------| +| `write_mu_` | `util::fb2::Mutex` | Serializes writers within the shard. Readers never acquire it. | +| `read_mu_` | `util::fb2::SharedMutex` | Acquired shared by readers (`FindIf`, `ForEachShared`); acquired exclusively only for structural map changes (insert/erase) and for safe deletion of old `SubscribeMap` pointers (draining in-flight readers). | + `UpdatablePointer` wraps a `std::atomic` with `memory_order_acquire` on read and `memory_order_release` on write. This ensures that when a thread reads the pointer, it also sees the fully constructed `SubscribeMap` that the writer published. -### Two Levels of RCU - -The `ChannelStoreUpdater` implements two granularities of copy-on-write: +### Two Granularities of Update -1. **ChannelMap-level copy** — triggered when a channel slot must be added (first subscriber) - or removed (last subscriber leaves). The entire `ChannelMap` is shallow-copied, the slot is - added/removed on the copy, a new `ChannelStore` is allocated pointing to the new map, and - the global `control_block.most_recent` is swapped. +The `ChannelStoreUpdater` groups pending operations by shard index and processes each shard +in a single `Mutate()` call, minimizing lock acquisitions: -2. **SubscribeMap-level RCU** — triggered when adding/removing a subscriber to an existing - channel (the map slot already exists). Only the `SubscribeMap` for that channel is copied, - the mutation is applied, and the `UpdatablePointer` is atomically swapped. No new - `ChannelStore` or `ChannelMap` is needed. +1. **RCU pointer swap (existing channel)** — when a subscriber is added/removed from a channel + that already exists in the map. The `SubscribeMap` is copied, the mutation is applied, and + the `UpdatablePointer` is atomically swapped via `Set()`. This happens under `write_mu_` + only — readers are NOT blocked. The old `SubscribeMap` is placed in a `freelist_` and + deleted after acquiring `read_mu_` exclusively (draining in-flight readers). -This two-level scheme is implemented in `ChannelStoreUpdater::GetTargetMap()`: - -```cpp -pair ChannelStoreUpdater::GetTargetMap(ChannelStore* store) { - auto* target = pattern_ ? store->patterns_ : store->channels_; - - for (auto key : ops_) { - auto it = target->find(key); - DCHECK(it != target->end() || to_add_); - // We need to make a copy, if we are going to add or delete a new map slot. - if ((to_add_ && it == target->end()) || (!to_add_ && it->second->size() == 1)) - return {new ChannelStore::ChannelMap{*target}, true}; - } - - return {target, false}; -} -``` +2. **Structural map change (new channel / last subscriber leaves)** — when a channel slot must + be inserted (first subscriber) or erased (last subscriber leaves). Inside the `Mutate()` + callback, the `AcquireReaderExclusiveLock` callable is invoked, which acquires `read_mu_` + exclusively, blocking all readers on that shard while the key is inserted or erased. + Writers on *other* shards are unaffected. ### Apply() Flow @@ -110,37 +107,70 @@ pair ChannelStoreUpdater::GetTargetMap(ChannelS Apply Flow -Step 8 uses `AwaitBrief` (non-preempting dispatch) to update each thread's local pointer. -The `seq_cst` load in the callback ensures the thread reads the latest pointer value _and_ -the memory published behind it. +The `ChannelStoreUpdater::Apply()` method iterates over each shard that has pending +operations and calls `map.Mutate(ShardId{sid}, ...)` to acquire `write_mu_` once per shard. +Inside the callback: + +1. **Phase 1 — RCU pointer swaps (under `write_mu_` only):** For each key where the channel + already exists, the `SubscribeMap` is copied, the subscriber is added/removed, and the + `UpdatablePointer` is atomically swapped. Old `SubscribeMap` pointers are saved in a + per-shard `freelist_`. Readers continue concurrently on the shared `read_mu_`. + +2. **Phase 2 — Structural map changes (under `read_mu_` exclusive):** If any keys require + inserting a new slot or erasing the last subscriber's slot, `AcquireReaderExclusiveLock()` + is called. This acquires `read_mu_` exclusively, draining any in-flight readers on this + shard. The insert/erase is then performed on the mutable map reference. + +3. **Phase 3 — Freelist cleanup:** After `Mutate()` returns (releasing `write_mu_`), + `WithReadExclusiveLock(ShardId{sid}, ...)` is called to acquire `read_mu_` exclusively + once more, ensuring all readers that may have loaded old `SubscribeMap` pointers have + completed. The old `SubscribeMap` pointers in the freelist are then safely deleted. -### Modify() — Per-Key Mutation +### Per-Key Mutation Logic -For each key in the pending operations: +Inside the `Mutate()` callback, each key in the shard's pending operations is processed: ``` -Modify(target, key) - it = target->find(key) - - Case 1: Adding, key not in map (new channel) - → target->emplace(key, new SubscribeMap{{cntx_, thread_id_}}) - - Case 2: Removing, last subscriber (channel disappears) - → freelist_.push_back(it->second.Get()) // defer deletion - → target->erase(it) - - Case 3: Existing channel, add/remove subscriber (RCU on SubscribeMap) - → replacement = new SubscribeMap{*it->second} - → if to_add_: replacement->emplace(cntx_, thread_id_) - else: replacement->erase(cntx_) - → freelist_.push_back(it->second.Get()) // old map, defer deletion - → it->second.Set(replacement) // atomic release-store +Phase 1 — RCU swaps (write_mu_ held, read_mu_ NOT held): + + For each key in shard_keys: + it = m.find(key) + + Case 1: Adding, key exists (existing channel) + → old_sm = it->second.Get() + → new_sm = new SubscribeMap{*old_sm} + → new_sm->emplace(cntx_, thread_id_) + → it->second.Set(new_sm) // atomic release-store + → freelist_.push_back(old_sm) // defer deletion + + Case 2: Removing, key exists, >1 subscriber + → old_sm = it->second.Get() + → new_sm = new SubscribeMap{*old_sm} + → new_sm->erase(cntx_) + → it->second.Set(new_sm) // atomic release-store + → freelist_.push_back(old_sm) // defer deletion + + Case 3: Adding, key NOT in map (new channel) + → mark needs_map_change[i] = true // deferred to Phase 2 + + Case 4: Removing, last subscriber (channel disappears) + → mark needs_map_change[i] = true // deferred to Phase 2 + +Phase 2 — Structural changes (read_mu_ acquired exclusive): + + If any needs_map_change: + locked_map = AcquireReaderExclusiveLock() + + Case 3: → locked_map.map.emplace(key, new SubscribeMap{{cntx_, thread_id_}}) + Case 4: → delete it->second.Get() + → locked_map.map.erase(it) ``` -Old `SubscribeMap` pointers are not immediately deleted because concurrent `PUBLISH` -operations on other threads may still be reading them. They are placed in a `freelist_` and -deleted only after `AwaitBrief` completes — at which point every thread has acknowledged the -new state and no reader can hold a reference to the old maps. +Old `SubscribeMap` pointers from Phase 1 are not immediately deleted because concurrent +readers on other threads may still hold references obtained via `UpdatablePointer::Get()`. +They are placed in a `freelist_` and deleted only after `WithReadExclusiveLock` completes — +at which point every reader on that shard has released `read_mu_` (shared) and no reader can +hold a reference to the old maps. ### Connection-Level Subscription State @@ -176,9 +206,9 @@ When a client issues `PUBLISH channel message` (or `SPUBLISH`): ``` SendMessages(channel, messages, sharded) 1. subscribers = FetchSubscribers(channel) - → exact match: channels_->find(channel) - → pattern match: for each (pat, subs) in *patterns_: - if GlobMatcher{pat}.Matches(channel): Fill(subs, pat, &result) + → exact match: channels_.FindIf(channel, ...) (shared read lock on shard) + → pattern match: patterns_.ForEachShared(...) + for each (pat, subs): if GlobMatcher{pat}.Matches(channel): Fill(subs, pat, &result) → sort result by thread_id (enables efficient per-thread dispatch) 2. If subscribers empty → return 0 @@ -221,10 +251,12 @@ string allocations. ``` FetchSubscribers(channel) - 1. Exact match: channels_->find(channel) + 1. Exact match: channels_.FindIf(channel, ...) + → acquires read_mu_ shared on the channel's shard → if found, Fill() creates Subscriber entries from the SubscribeMap - 2. Pattern match: iterate ALL patterns + 2. Pattern match: patterns_.ForEachShared(...) + → iterates ALL pattern shards (each read_mu_ shared independently) → for each (pat, subs): GlobMatcher{pat, case_sensitive=true}.Matches(channel) → matching subscribers are added with their pattern string @@ -232,6 +264,10 @@ FetchSubscribers(channel) → enables O(log n) per-thread lookup during dispatch ``` +Note: `FetchSubscribers` is not a global snapshot — each shard is locked independently, so +the result may not reflect a fully consistent state. This trade-off is acceptable for pub/sub +use cases. + The `Fill` helper reads the `SubscribeMap` (via `UpdatablePointer::Get()` — acquire load) and creates `Subscriber` structs that hold a `ConnectionRef` (weak reference) obtained via `conn->Borrow()`. @@ -411,20 +447,19 @@ is called: ``` UnsubscribeAfterClusterSlotMigration(deleted_slots) - for each (channel, _) in *channels_: - if deleted_slots.Contains(KeySlot(channel)): - csu.Record(channel) - csu.ApplyAndUnsubscribe() + 1. Collect: ForEachShared over channels_, collect subscribers for channels + whose slot is in deleted_slots + 2. Remove: RemoveAllSubscribers(channel) for each matched channel + → uses Mutate() to erase under exclusive read_mu_ + 3. Notify: AwaitFiberOnAll dispatches force-unsubscribe messages + to affected connections on their owning threads ``` -`ApplyAndUnsubscribe()` differs from `Apply()`: -1. It deep-copies the `ChannelMap` and removes the migrated channels. -2. It calls `FetchSubscribers` for each removed channel _before_ updating the store - (since `FetchSubscribers` reads from the current active store). -3. It uses `AwaitFiberOnAll` (fiber-based, may preempt) instead of `AwaitBrief` to dispatch - both the store update and unsubscription messages. -4. On each thread, `UnsubscribeConnectionsFromDeletedSlots` sends `PubMessage`s with - `force_unsubscribe=true`, which triggers `sunsubscribe` push messages to affected clients. +`RemoveAllSubscribers` calls `map.Mutate(channel, ...)` which acquires `write_mu_` and then +`read_mu_` exclusively to erase the channel entry and delete its `SubscribeMap`. + +On each thread, `UnsubscribeConnectionsFromDeletedSlots` sends `PubMessage`s with +`force_unsubscribe=true`, which triggers `sunsubscribe` push messages to affected clients. ## Keyspace Event Notifications @@ -439,8 +474,7 @@ When enabled: 3. At the end of `DeleteExpiredStep`, batched events are published: ```cpp -ChannelStore* store = ServerState::tlocal()->channel_store(); -store->SendMessages( +channel_store->SendMessages( absl::StrCat("__keyevent@", cntx.db_index, "__:expired"), events, false); events.clear(); @@ -476,11 +510,12 @@ Notable flags: | Purpose | File Path | |---------|-----------| | ChannelStore & ChannelStoreUpdater | `src/server/channel_store.h`, `src/server/channel_store.cc` | +| ShardedHashMap (shard-locked backing store) | `src/core/sharded_hash_map.h` | | Pub/Sub command handlers | `src/server/main_service.cc` (`Publish`, `Subscribe`, `Unsubscribe`, `PSubscribe`, `PUnsubscribe`, `Pubsub`) | | Connection-level subscription state | `src/server/conn_context.h`, `src/server/conn_context.cc` (`ChangeSubscriptions`, `UnsubscribeAll`, `PUnsubscribeAll`) | | PubMessage, AsyncFiber, backpressure | `src/facade/dragonfly_connection.h`, `src/facade/dragonfly_connection.cc` | | ConnectionRef (weak subscriber refs) | `src/facade/connection_ref.h` | -| ServerState channel_store_ pointer | `src/server/server_state.h`, `src/server/server_state.cc` | +| Global `channel_store` pointer | `src/server/channel_store.h` (extern), `src/server/main_service.cc` (lifecycle) | | Keyspace event integration | `src/server/db_slice.cc` (`DeleteExpiredStep`) | -| Cluster slot migration unsub | `src/server/channel_store.cc` (`UnsubscribeAfterClusterSlotMigration`, `ApplyAndUnsubscribe`) | +| Cluster slot migration unsub | `src/server/channel_store.cc` (`UnsubscribeAfterClusterSlotMigration`, `RemoveAllSubscribers`) | | GlobMatcher for pattern matching | `src/core/glob_matcher.h` |