Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 127 additions & 92 deletions docs/pub-sub.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

This document describes how Dragonfly implements the Publish-Subscribe (Pub/Sub) messaging
paradigm within its shared-nothing, multi-threaded architecture. It covers the global
subscription registry, the Read-Copy-Update (RCU) mechanism used to prevent lock contention
on the publish path, the asynchronous message delivery pipeline, and the backpressure system
that protects the server from slow-subscriber OOM.
subscription registry backed by a `ShardedHashMap` with fine-grained per-shard locking,
the RCU-style pointer swap used for lock-free reads on the publish path, the asynchronous
Copy link
Copy Markdown

@augmentcode augmentcode Bot Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/pub-sub.md:6: This mentions “lock-free reads on the publish path”, but FetchSubscribers does take read_mu_ in shared mode via ShardedHashMap::FindIf/ForEachShared. Consider rephrasing to avoid implying the publish path is fully lock-free (it’s lock-free only for the UpdatablePointer load).

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

message delivery pipeline, and the backpressure system that protects the server from
slow-subscriber OOM.
Comment on lines +6 to +8
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intro says the RCU pointer swap enables “lock-free reads on the publish path”, but FetchSubscribers takes a per-shard shared lock (ShardedHashMap::FindIf/ForEachShared acquire read_mu_). Consider rewording to reflect that reads are concurrent but not lock-free (shared read_mu_ + atomic UpdatablePointer::Get()).

Suggested change
the RCU-style pointer swap used for lock-free reads on the publish path, the asynchronous
message delivery pipeline, and the backpressure system that protects the server from
slow-subscriber OOM.
the RCU-style pointer swap and per-shard shared locking used for concurrent reads on the
publish path, the asynchronous message delivery pipeline, and the backpressure system that
protects the server from slow-subscriber OOM.

Copilot uses AI. Check for mistakes.

## Overview

Expand All @@ -13,19 +14,21 @@ unique challenge: subscriptions must be globally addressable across all threads,
global lock on every `PUBLISH` would create a severe bottleneck. A single popular channel
with thousands of subscribers could serialize all publish operations onto one shard thread.

Dragonfly solves this by using a **centralized `ChannelStore` updated via RCU
(Read-Copy-Update)**:
Dragonfly solves this with a **`ChannelStore` backed by `ShardedHashMap`** — a hash map
split into 16 independently-locked shards:

- **Reads (`PUBLISH` / `SPUBLISH`)** are lock-free and use a thread-local pointer to the
most recent `ChannelStore` snapshot.
- **Writes (`SUBSCRIBE` / `UNSUBSCRIBE` / `PSUBSCRIBE` / `PUNSUBSCRIBE`)** are serialized
by a single mutex, performed by copying the necessary routing maps, applying the mutation,
and atomically swapping the global pointer.
- **Reads (`PUBLISH` / `SPUBLISH`)** acquire a per-shard shared read lock (`read_mu_`) and
read the `SubscribeMap` pointer via an atomic acquire load (`UpdatablePointer::Get()`).
Multiple readers on the same shard proceed concurrently.
- **Writes (`SUBSCRIBE` / `UNSUBSCRIBE` / `PSUBSCRIBE` / `PUNSUBSCRIBE`)** acquire the
shard's exclusive write lock (`write_mu_`), copy the `SubscribeMap`, apply the mutation,
and atomically swap the pointer — all without blocking readers. Concurrent writes to
*different* shards are fully independent.

This design avoids contention on a single shard thread for heavy throughput on a single
channel and seamlessly scales across multiple threads even with a small number of channels.
Publish latency is lower than a shard-routed design because no inter-thread hop is required
to look up subscribers — the caller reads its local copy directly.
Publish latency is low because no inter-thread hop is required to look up subscribers — any
thread can read the global `ChannelStore` directly.

Dragonfly supports three flavors of Pub/Sub:

Expand All @@ -39,11 +42,11 @@ Dragonfly supports three flavors of Pub/Sub:

| Type | Location | Role |
|------|----------|------|
| `ChannelStore` | `src/server/channel_store.h` | Centralized registry mapping channels/patterns to subscribers. Updated via RCU. |
| `ChannelStoreUpdater` | `src/server/channel_store.h` | Orchestrates RCU mutations (add/remove) to the `ChannelStore`. |
| `ChannelStore` | `src/server/channel_store.h` | Global registry mapping channels/patterns to subscribers. Backed by two `ShardedHashMap` instances (channels and patterns). |
| `ChannelStoreUpdater` | `src/server/channel_store.h` | Batches subscribe/unsubscribe operations per shard and applies them via `ShardedHashMap::Mutate`. |
| `ChannelStore::Subscriber` | `src/server/channel_store.h` | Represents a subscribed client. Wraps `facade::ConnectionRef` plus a pattern string. |
| `ChannelStore::ControlBlock` | `src/server/channel_store.h` | Holds the `most_recent` atomic pointer and `update_mu` mutex. Prevents overlapping structural updates. |
| `ChannelStore::ChannelMap` | `src/server/channel_store.h` | `flat_hash_map<string, UpdatablePointer>` — maps channel/pattern names to subscriber lists. |
| `ShardedHashMap` | `src/core/sharded_hash_map.h` | Thread-safe hash map split into 16 shards, each with independent `write_mu_` (Mutex) and `read_mu_` (SharedMutex). |
Copy link
Copy Markdown

@augmentcode augmentcode Bot Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/pub-sub.md:48: ShardedHashMap is templated on the shard count (default is 32 in sharded_hash_map.h), while ChannelStore uses 16. Consider wording this row as “N shards (ChannelStore uses 16)” to avoid implying the container is always 16-way sharded.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

| `ChannelStore::ChannelMap` | `src/server/channel_store.h` | `ShardedHashMap<string, UpdatablePointer, 16>` — maps channel/pattern names to subscriber lists across 16 independently-locked shards. |
Comment on lines +48 to +49
Copy link

Copilot AI Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ShardedHashMap type itself is generic (default NUM_SHARDS=32), so describing it as “split into 16 shards” is only true for the ChannelStore::ChannelMap instantiation. Also, ChannelMap is defined with a transparent hash/equality (StringViewHash, std::equal_to<>) to support std::string_view lookups used by FindIf; consider reflecting that in the type/description to avoid implying the simpler 3-parameter template.

Suggested change
| `ShardedHashMap` | `src/core/sharded_hash_map.h` | Thread-safe hash map split into 16 shards, each with independent `write_mu_` (Mutex) and `read_mu_` (SharedMutex). |
| `ChannelStore::ChannelMap` | `src/server/channel_store.h` | `ShardedHashMap<string, UpdatablePointer, 16>` — maps channel/pattern names to subscriber lists across 16 independently-locked shards. |
| `ShardedHashMap` | `src/core/sharded_hash_map.h` | Generic thread-safe sharded hash map template with independently locked shards (`write_mu_` and `read_mu_`); shard count depends on the template instantiation. |
| `ChannelStore::ChannelMap` | `src/server/channel_store.h` | `ShardedHashMap<string, UpdatablePointer, 16, StringViewHash, std::equal_to<>>` — maps channel/pattern names to subscriber lists across 16 independently locked shards and supports transparent `std::string_view` lookups. |

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

@augmentcode augmentcode Bot Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/pub-sub.md:49: The ChannelMap alias in code includes StringViewHash + std::equal_to<> for heterogeneous lookup (e.g. std::string_view in FindIf). Consider either spelling out the full alias or noting the transparent hash/equality requirement so the later FindIf(channel, ...) examples stay accurate.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

| `ChannelStore::SubscribeMap` | `src/server/channel_store.h` | `flat_hash_map<ConnectionContext*, ThreadId>` — maps subscriber contexts to their owning thread. |
| `ChannelStore::UpdatablePointer` | `src/server/channel_store.h` | Atomic wrapper around `SubscribeMap*`. Supports lock-free reads (`acquire`) and RCU-style swaps (`release`). |
Copy link
Copy Markdown

@augmentcode augmentcode Bot Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/pub-sub.md:51: Similar to the intro, “lock-free reads” is a bit ambiguous here since readers still acquire read_mu_ shared; only the UpdatablePointer::Get() atomic load itself is lock-free. Consider tightening the wording to reflect that distinction.

Severity: low

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

| `ConnectionState::SubscribeInfo` | `src/server/conn_context.h` | Per-connection set of subscribed channels and patterns. Created lazily on first subscription. |
Expand All @@ -58,89 +61,116 @@ Dragonfly supports three flavors of Pub/Sub:
<img src="pubsub/pubsub_data_flow_overview.svg" alt="Pub/Sub Data Flow" width="1000"/>
</div>

## Subscription Management (RCU)
## Subscription Management (Shard-Locked ChannelStore)

### Data Structure Layout

Each `ChannelStore` instance holds two `ChannelMap` pointers:
The `ChannelStore` holds two `ChannelMap` instances — one for exact-channel subscriptions and
one for pattern subscriptions. Each `ChannelMap` is a `ShardedHashMap<string, UpdatablePointer, 16>`
backed by 16 independently-locked shards:

<div align="center">
<img src="pubsub/pubsub_data_structure_layout.svg" alt="Data Structure Layout" width="700"/>
</div>

Each shard carries two fiber-aware locks:

| Lock | Type | Purpose |
|------|------|---------|
| `write_mu_` | `util::fb2::Mutex` | Serializes writers within the shard. Readers never acquire it. |
| `read_mu_` | `util::fb2::SharedMutex` | Acquired shared by readers (`FindIf`, `ForEachShared`); acquired exclusively only for structural map changes (insert/erase) and for safe deletion of old `SubscribeMap` pointers (draining in-flight readers). |

`UpdatablePointer` wraps a `std::atomic<SubscribeMap*>` with `memory_order_acquire` on read
and `memory_order_release` on write. This ensures that when a thread reads the pointer, it
also sees the fully constructed `SubscribeMap` that the writer published.

### Two Levels of RCU

The `ChannelStoreUpdater` implements two granularities of copy-on-write:
### Two Granularities of Update

1. **ChannelMap-level copy** — triggered when a channel slot must be added (first subscriber)
or removed (last subscriber leaves). The entire `ChannelMap` is shallow-copied, the slot is
added/removed on the copy, a new `ChannelStore` is allocated pointing to the new map, and
the global `control_block.most_recent` is swapped.
The `ChannelStoreUpdater` groups pending operations by shard index and processes each shard
in a single `Mutate()` call, minimizing lock acquisitions:

2. **SubscribeMap-level RCU** — triggered when adding/removing a subscriber to an existing
channel (the map slot already exists). Only the `SubscribeMap` for that channel is copied,
the mutation is applied, and the `UpdatablePointer` is atomically swapped. No new
`ChannelStore` or `ChannelMap` is needed.
1. **RCU pointer swap (existing channel)** — when a subscriber is added/removed from a channel
that already exists in the map. The `SubscribeMap` is copied, the mutation is applied, and
the `UpdatablePointer` is atomically swapped via `Set()`. This happens under `write_mu_`
only — readers are NOT blocked. The old `SubscribeMap` is placed in a `freelist_` and
deleted after acquiring `read_mu_` exclusively (draining in-flight readers).

This two-level scheme is implemented in `ChannelStoreUpdater::GetTargetMap()`:

```cpp
pair<ChannelStore::ChannelMap*, bool> ChannelStoreUpdater::GetTargetMap(ChannelStore* store) {
auto* target = pattern_ ? store->patterns_ : store->channels_;

for (auto key : ops_) {
auto it = target->find(key);
DCHECK(it != target->end() || to_add_);
// We need to make a copy, if we are going to add or delete a new map slot.
if ((to_add_ && it == target->end()) || (!to_add_ && it->second->size() == 1))
return {new ChannelStore::ChannelMap{*target}, true};
}

return {target, false};
}
```
2. **Structural map change (new channel / last subscriber leaves)** — when a channel slot must
be inserted (first subscriber) or erased (last subscriber leaves). Inside the `Mutate()`
callback, the `AcquireReaderExclusiveLock` callable is invoked, which acquires `read_mu_`
exclusively, blocking all readers on that shard while the key is inserted or erased.
Writers on *other* shards are unaffected.

### Apply() Flow

<div align="center">
<img src="pubsub/pubsub_apply.svg" alt="Apply Flow" width="1000"/>
</div>

Step 8 uses `AwaitBrief` (non-preempting dispatch) to update each thread's local pointer.
The `seq_cst` load in the callback ensures the thread reads the latest pointer value _and_
the memory published behind it.
The `ChannelStoreUpdater::Apply()` method iterates over each shard that has pending
operations and calls `map.Mutate(ShardId{sid}, ...)` to acquire `write_mu_` once per shard.
Inside the callback:

1. **Phase 1 — RCU pointer swaps (under `write_mu_` only):** For each key where the channel
already exists, the `SubscribeMap` is copied, the subscriber is added/removed, and the
`UpdatablePointer` is atomically swapped. Old `SubscribeMap` pointers are saved in a
per-shard `freelist_`. Readers continue concurrently on the shared `read_mu_`.

2. **Phase 2 — Structural map changes (under `read_mu_` exclusive):** If any keys require
inserting a new slot or erasing the last subscriber's slot, `AcquireReaderExclusiveLock()`
is called. This acquires `read_mu_` exclusively, draining any in-flight readers on this
shard. The insert/erase is then performed on the mutable map reference.

3. **Phase 3 — Freelist cleanup:** After `Mutate()` returns (releasing `write_mu_`),
`WithReadExclusiveLock(ShardId{sid}, ...)` is called to acquire `read_mu_` exclusively
once more, ensuring all readers that may have loaded old `SubscribeMap` pointers have
completed. The old `SubscribeMap` pointers in the freelist are then safely deleted.

### Modify() — Per-Key Mutation
### Per-Key Mutation Logic

For each key in the pending operations:
Inside the `Mutate()` callback, each key in the shard's pending operations is processed:

```
Modify(target, key)
it = target->find(key)

Case 1: Adding, key not in map (new channel)
→ target->emplace(key, new SubscribeMap{{cntx_, thread_id_}})

Case 2: Removing, last subscriber (channel disappears)
→ freelist_.push_back(it->second.Get()) // defer deletion
→ target->erase(it)

Case 3: Existing channel, add/remove subscriber (RCU on SubscribeMap)
→ replacement = new SubscribeMap{*it->second}
→ if to_add_: replacement->emplace(cntx_, thread_id_)
else: replacement->erase(cntx_)
→ freelist_.push_back(it->second.Get()) // old map, defer deletion
→ it->second.Set(replacement) // atomic release-store
Phase 1 — RCU swaps (write_mu_ held, read_mu_ NOT held):

For each key in shard_keys:
it = m.find(key)

Case 1: Adding, key exists (existing channel)
→ old_sm = it->second.Get()
→ new_sm = new SubscribeMap{*old_sm}
→ new_sm->emplace(cntx_, thread_id_)
→ it->second.Set(new_sm) // atomic release-store
→ freelist_.push_back(old_sm) // defer deletion

Case 2: Removing, key exists, >1 subscriber
→ old_sm = it->second.Get()
→ new_sm = new SubscribeMap{*old_sm}
→ new_sm->erase(cntx_)
→ it->second.Set(new_sm) // atomic release-store
→ freelist_.push_back(old_sm) // defer deletion

Case 3: Adding, key NOT in map (new channel)
→ mark needs_map_change[i] = true // deferred to Phase 2

Case 4: Removing, last subscriber (channel disappears)
→ mark needs_map_change[i] = true // deferred to Phase 2

Phase 2 — Structural changes (read_mu_ acquired exclusive):

If any needs_map_change:
locked_map = AcquireReaderExclusiveLock()

Case 3: → locked_map.map.emplace(key, new SubscribeMap{{cntx_, thread_id_}})
Case 4: → delete it->second.Get()
→ locked_map.map.erase(it)
```

Old `SubscribeMap` pointers are not immediately deleted because concurrent `PUBLISH`
operations on other threads may still be reading them. They are placed in a `freelist_` and
deleted only after `AwaitBrief` completes — at which point every thread has acknowledged the
new state and no reader can hold a reference to the old maps.
Old `SubscribeMap` pointers from Phase 1 are not immediately deleted because concurrent
readers on other threads may still hold references obtained via `UpdatablePointer::Get()`.
They are placed in a `freelist_` and deleted only after `WithReadExclusiveLock` completes —
at which point every reader on that shard has released `read_mu_` (shared) and no reader can
hold a reference to the old maps.

### Connection-Level Subscription State

Expand Down Expand Up @@ -176,9 +206,9 @@ When a client issues `PUBLISH channel message` (or `SPUBLISH`):
```
SendMessages(channel, messages, sharded)
1. subscribers = FetchSubscribers(channel)
→ exact match: channels_->find(channel)
→ pattern match: for each (pat, subs) in *patterns_:
if GlobMatcher{pat}.Matches(channel): Fill(subs, pat, &result)
→ exact match: channels_.FindIf(channel, ...) (shared read lock on shard)
→ pattern match: patterns_.ForEachShared(...)
for each (pat, subs): if GlobMatcher{pat}.Matches(channel): Fill(subs, pat, &result)
→ sort result by thread_id (enables efficient per-thread dispatch)

2. If subscribers empty → return 0
Expand Down Expand Up @@ -221,17 +251,23 @@ string allocations.

```
FetchSubscribers(channel)
1. Exact match: channels_->find(channel)
1. Exact match: channels_.FindIf(channel, ...)
→ acquires read_mu_ shared on the channel's shard
→ if found, Fill() creates Subscriber entries from the SubscribeMap

2. Pattern match: iterate ALL patterns
2. Pattern match: patterns_.ForEachShared(...)
→ iterates ALL pattern shards (each read_mu_ shared independently)
→ for each (pat, subs): GlobMatcher{pat, case_sensitive=true}.Matches(channel)
→ matching subscribers are added with their pattern string

3. Sort by Subscriber::ByThread (thread_id ordering)
→ enables O(log n) per-thread lookup during dispatch
```

Note: `FetchSubscribers` is not a global snapshot — each shard is locked independently, so
the result may not reflect a fully consistent state. This trade-off is acceptable for pub/sub
use cases.

The `Fill` helper reads the `SubscribeMap` (via `UpdatablePointer::Get()` — acquire load)
and creates `Subscriber` structs that hold a `ConnectionRef` (weak reference) obtained via
`conn->Borrow()`.
Expand Down Expand Up @@ -411,20 +447,19 @@ is called:

```
UnsubscribeAfterClusterSlotMigration(deleted_slots)
for each (channel, _) in *channels_:
if deleted_slots.Contains(KeySlot(channel)):
csu.Record(channel)
csu.ApplyAndUnsubscribe()
1. Collect: ForEachShared over channels_, collect subscribers for channels
whose slot is in deleted_slots
2. Remove: RemoveAllSubscribers(channel) for each matched channel
→ uses Mutate() to erase under exclusive read_mu_
3. Notify: AwaitFiberOnAll dispatches force-unsubscribe messages
to affected connections on their owning threads
```

`ApplyAndUnsubscribe()` differs from `Apply()`:
1. It deep-copies the `ChannelMap` and removes the migrated channels.
2. It calls `FetchSubscribers` for each removed channel _before_ updating the store
(since `FetchSubscribers` reads from the current active store).
3. It uses `AwaitFiberOnAll` (fiber-based, may preempt) instead of `AwaitBrief` to dispatch
both the store update and unsubscription messages.
4. On each thread, `UnsubscribeConnectionsFromDeletedSlots` sends `PubMessage`s with
`force_unsubscribe=true`, which triggers `sunsubscribe` push messages to affected clients.
`RemoveAllSubscribers` calls `map.Mutate(channel, ...)` which acquires `write_mu_` and then
`read_mu_` exclusively to erase the channel entry and delete its `SubscribeMap`.

On each thread, `UnsubscribeConnectionsFromDeletedSlots` sends `PubMessage`s with
`force_unsubscribe=true`, which triggers `sunsubscribe` push messages to affected clients.

## Keyspace Event Notifications

Expand All @@ -439,8 +474,7 @@ When enabled:
3. At the end of `DeleteExpiredStep`, batched events are published:

```cpp
ChannelStore* store = ServerState::tlocal()->channel_store();
store->SendMessages(
channel_store->SendMessages(
absl::StrCat("__keyevent@", cntx.db_index, "__:expired"),
events, false);
events.clear();
Expand Down Expand Up @@ -476,11 +510,12 @@ Notable flags:
| Purpose | File Path |
|---------|-----------|
| ChannelStore & ChannelStoreUpdater | `src/server/channel_store.h`, `src/server/channel_store.cc` |
| ShardedHashMap (shard-locked backing store) | `src/core/sharded_hash_map.h` |
| Pub/Sub command handlers | `src/server/main_service.cc` (`Publish`, `Subscribe`, `Unsubscribe`, `PSubscribe`, `PUnsubscribe`, `Pubsub`) |
| Connection-level subscription state | `src/server/conn_context.h`, `src/server/conn_context.cc` (`ChangeSubscriptions`, `UnsubscribeAll`, `PUnsubscribeAll`) |
| PubMessage, AsyncFiber, backpressure | `src/facade/dragonfly_connection.h`, `src/facade/dragonfly_connection.cc` |
| ConnectionRef (weak subscriber refs) | `src/facade/connection_ref.h` |
| ServerState channel_store_ pointer | `src/server/server_state.h`, `src/server/server_state.cc` |
| Global `channel_store` pointer | `src/server/channel_store.h` (extern), `src/server/main_service.cc` (lifecycle) |
| Keyspace event integration | `src/server/db_slice.cc` (`DeleteExpiredStep`) |
| Cluster slot migration unsub | `src/server/channel_store.cc` (`UnsubscribeAfterClusterSlotMigration`, `ApplyAndUnsubscribe`) |
| Cluster slot migration unsub | `src/server/channel_store.cc` (`UnsubscribeAfterClusterSlotMigration`, `RemoveAllSubscribers`) |
| GlobMatcher for pattern matching | `src/core/glob_matcher.h` |