Skip to content

docs: update pub-sub.md for parallel hashmap architecture (PR #7174)#7184

Closed
Copilot wants to merge 46 commits intomainfrom
copilot/update-docs-pub-sub-architecture
Closed

docs: update pub-sub.md for parallel hashmap architecture (PR #7174)#7184
Copilot wants to merge 46 commits intomainfrom
copilot/update-docs-pub-sub-architecture

Conversation

Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 19, 2026

  • Rebase on top of main (clean history based on current main)
  • Update pub-sub.md to match the merged ShardedHashMap implementation:
    • Overview: ShardedHashMap with 16 shards, two-lock RCU per shard
    • Data Structures: remove ControlBlock, add ShardedHashMap, update ChannelMap typedef
    • Subscription Management: ChannelStoreUpdater batches by shard, Mutate + RCU pointer swap + freelist
    • FetchSubscribers: FindIf/ForEachShared API, non-atomic note
    • SendMessages: update pseudocode for new API
    • Cluster slot migration: RemoveAllSubscribers + ForEachShared + AwaitFiberOnAll
    • Keyspace events: global channel_store-> pointer (no more ServerState)
    • Key Files Reference: add ShardedHashMap, remove ServerState, update migration refs

Copilot AI requested a review from mkaruza April 19, 2026 14:01
@mkaruza mkaruza marked this pull request as ready for review April 19, 2026 14:03
Copilot AI review requested due to automatic review settings April 19, 2026 14:03
@augmentcode
Copy link
Copy Markdown

augmentcode Bot commented Apr 19, 2026

🤖 Augment PR Summary

Summary: Updates docs/pub-sub.md to describe the post-#7174 Pub/Sub subscription registry design.

Changes:

  • Rewrites the overview to replace the prior RCU snapshot narrative with a parallel-hashmap-backed global ChannelStore and per-submap locking
  • Updates the primary data structures table (removing RCU-specific components and documenting the new map types / global instance)
  • Replaces the old RCU subscription-management sections with Add() (try_emplace_l) and Remove() (erase_if) flows
  • Updates the publish routing description to the new if_contains/for_each APIs and lock semantics
  • Simplifies the cluster slot migration unsubscribe flow to a single scan plus AwaitFiberOnAll dispatch
  • Adjusts keyspace-event and file references to use the new channel_store access pattern
Technical Notes: The updated doc assumes fine-grained contention via per-submap locks rather than a single serialized update path.

🤖 Was this summary useful? React with 👍 or 👎

Copy link
Copy Markdown

@augmentcode augmentcode Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 1 suggestion posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

Comment thread docs/pub-sub.md Outdated

Dragonfly solves this by using a **centralized `ChannelStore` updated via RCU
(Read-Copy-Update)**:
Dragonfly solves this with a **single global `ChannelStore`** backed by
Copy link
Copy Markdown

@augmentcode augmentcode Bot Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc now describes a phmap::parallel_flat_hash_map-backed global channel_store with Add()/Remove() and per-submap locking, but the current code in src/server/channel_store.h/.cc is still RCU-based with ChannelStoreUpdater and ServerState::tlocal()->channel_store(). Can we ensure this doc update lands only once the corresponding code change (PR #7174) is actually present in this branch, otherwise readers will be misled?

Severity: medium

Other Locations
  • docs/pub-sub.md:45
  • docs/pub-sub.md:49
  • docs/pub-sub.md:82
  • docs/pub-sub.md:105
  • docs/pub-sub.md:155
  • docs/pub-sub.md:200
  • docs/pub-sub.md:420
  • docs/pub-sub.md:446

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates the Pub/Sub architecture documentation to describe a new subscription registry design that uses a single global ChannelStore backed by a sharded concurrent hashmap, replacing the prior RCU-based description.

Changes:

  • Replace RCU-based ChannelStore description with a phmap::parallel_flat_hash_map design (per-submap locking).
  • Update subscription management and publish-path sections to reference new map APIs (try_emplace_l, erase_if, if_contains, for_each) and simplified cluster slot migration flow.
  • Update examples/references to use a global channel_store instead of ServerState::tlocal()->channel_store().

Comment thread docs/pub-sub.md Outdated
Comment thread docs/pub-sub.md Outdated
Comment thread docs/pub-sub.md Outdated
Comment thread docs/pub-sub.md Outdated
Comment thread docs/pub-sub.md
```cpp
ChannelStore* store = ServerState::tlocal()->channel_store();
store->SendMessages(
channel_store->SendMessages(
Copy link

Copilot AI Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This snippet uses channel_store->SendMessages(...) as if there is a global channel_store, but the code currently calls ServerState::tlocal()->channel_store() (e.g. DbSlice::DeleteExpiredStep). Please align the example with the actual API used in the codebase.

Suggested change
channel_store->SendMessages(
ServerState::tlocal()->channel_store()->SendMessages(

Copilot uses AI. Check for mistakes.
kostasrim and others added 18 commits April 20, 2026 11:17
Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>
* add info acl subcommand
* add memory metrics for acl registry

Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>
…mprovements (#7012)

* chore(facade): rename predicates, add IoBuf& params, add ParseFromBuffer

Rename parsed-command predicates for clarity:
- HasDispatchedCommands() -> HasInFlightCommands()
- HeadReadyToDispatch() -> HasCommandToExecute()

Generalize parse methods to accept an explicit IoBuf& parameter
(ParseRedis, ParseMCBatch, ParseRedisBatch, CheckIoBufCapacity),
decoupling them from io_buf_ so they can operate on any buffer.
All callers pass io_buf_ so behavior is unchanged.

Add ParseFromBuffer(IoBuf&): protocol-agnostic helper that dispatches
to ParseMCBatch or ParseRedisBatch based on the current protocol.

Finally, move to clearer and simpler ReadBufTracker instead of UpdateIoBufCapacity monstrosity.
I can critisize it - I added it.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* chore: comments

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
…T.SEARCH/FT.AGGREGATE (#7181)

* feat(search): add WITHSCORES, SCORER, ADDSCORES command support for FT.SEARCH/FT.AGGREGATE

* fix: review comments

* fix: review comments
Fix typos in transaction.md

Corrected typos and improved clarity in transaction documentation.

Signed-off-by: Roy Touw <168636339+RoyTouw77@users.noreply.github.com>
* fix: OOB in BITCOUNT and some refactoring
* chore: add logs to detect OOM during replication restart process
…#7109)

* server: add chunk budgeting which is no-op (unlimited budget)
* server: add SBF partial filter loading and append support
* server: avoid splitting stream after final listpack

Skip PushToConsumerIfNeeded after the last listpack in SaveStreamObject.
The stream metadata tail (length, IDs, consumer groups) must stay
bundled with the last listpack chunk so the loader can read it in the
same chunk.
* feat: add Apply method to CmdArgParser
…bort (#7203)

* fix(server): reject negative numkeys in EVAL/EVALSHA to avoid CHECK abort

* fix: review comments
* feat(search): add TFIDF and TFIDF.DOCNORM scorers

* fix: review comments

* fix: review comments
* server: Add LOADCHUNK command

The loadchunk command is the counterpart to the previously added
scandump command and loads a SBF from chunks produced by the scandump
output.

* server(bloom): Guard commands on a bloom filter which has no filters yet

Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
…contrib/charts/dragonfly (#7175)

chore(deps): bump github.com/moby/spdystream

Bumps [github.com/moby/spdystream](https://github.com/moby/spdystream) from 0.5.0 to 0.5.1.
- [Release notes](https://github.com/moby/spdystream/releases)
- [Commits](moby/spdystream@v0.5.0...v0.5.1)

---
updated-dependencies:
- dependency-name: github.com/moby/spdystream
  dependency-version: 0.5.1
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
* fix: bullmq update

* fix: update workflow
…trib/charts/dragonfly (#7207)

* chore(deps): bump github.com/jackc/pgx/v5 in /contrib/charts/dragonfly

Bumps [github.com/jackc/pgx/v5](https://github.com/jackc/pgx) from 5.7.6 to 5.9.2.
- [Changelog](https://github.com/jackc/pgx/blob/master/CHANGELOG.md)
- [Commits](jackc/pgx@v5.7.6...v5.9.2)

---
updated-dependencies:
- dependency-name: github.com/jackc/pgx/v5
  dependency-version: 5.9.2
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

* chore: bump go.work to go 1.25.0 to match pgx v5.9.2 requirement

The pgx v5.9.2 bump in contrib/charts/dragonfly requires go 1.25.0,
which conflicts with the workspace-level go 1.24.0. Update go.work
and run go work sync to align all modules.

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Abhijat Malviya <abhijat@dragonflydb.io>
dependabot Bot and others added 20 commits April 24, 2026 12:13
…trib/charts/dragonfly (#6678)

build(deps): bump filippo.io/edwards25519 in /contrib/charts/dragonfly

Bumps [filippo.io/edwards25519](https://github.com/FiloSottile/edwards25519) from 1.1.0 to 1.1.1.
- [Commits](FiloSottile/edwards25519@v1.1.0...v1.1.1)

---
updated-dependencies:
- dependency-name: filippo.io/edwards25519
  dependency-version: 1.1.1
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Abhijat Malviya <abhijat@dragonflydb.io>
to avoid security issue

https://github.com/dragonflydb/dragonfly/security/dependabot/64

Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
* feat: new methods in CmdArgParser
* feat: add stricter cluster config validation
* feat: DEBUG TRAFFIC per-listener recording and memcache replay

* fix: review comments

* fix: review comments

* fix: review comments
…7217)

feat: DEBUG TRAFFIC REPLICA — record replication stream on replicas
Introduce ShardedHashMap<K, V, NUM_SHARDS, HASH, EQ>, a general-purpose
thread-safe hash map sharded across independent shards.

Each shard holds an absl::flat_hash_map guarded by two fiber-aware locks:
- write_mu_ (fb2::Mutex): serializes writers within a shard.
- read_mu_ (fb2::SharedMutex): taken shared for reads, exclusively only
  for structural map changes.

The two-lock design keeps readers unblocked while a writer prepares its
mutation; read_mu_ is acquired exclusively only at the moment of commit,
minimizing reader stall time. Each shard is aligned to a CPU cache line
(alignas(64)) to prevent false sharing under concurrent access.

Add sharded_hash_map_test with 15 tests covering single-threaded
correctness and concurrent reader/writer fiber scenarios.

Signed-off-by: mkaruza <mario@dragonflydb.io>
Signed-off-by: Kostas Kyrimis <kostas@dragonflydb.io>
Co-authored-by: Borys <borys@dragonflydb.io>
* test(connection): add reproduction for FIN_WAIT_2 leak with BLPOP

Closing a client socket while the connection's reader fiber is parked on
a blocking command (BLPOP) does not always wake the fiber to observe EOF.

Under sustained concurrent open+close churn, a fraction of FDs linger on
the server in CLOSE_WAIT (client in FIN_WAIT_2) until the command times
out or the client is CLIENT KILL'd, matching the reaper-driven leak
reported in production. valkey closes its half immediately
on FIN even mid-BLPOP.

The bug is fixed in helio by romange/helio#580

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* chore: small fixes

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
In collect monitor, wait for monitor process to set up and confirm at
least one command is registered before proceeding with the test body.

Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
server: Add mem buf controller

The mem buf controller wraps the buffer which will be written to by the
serializer, and hides the state transition of the buffer from the
serializer. It also maintains states necessary for transition.

Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
perf(facade): defer V2 flush to reduce syscalls

The Problem:
Under heavy pipeline saturation (50 clients, pipeline=500), V2 was
issuing ~92,600 sendmsg syscalls vs V1's ~7,500 for the same
workload — over 12x more kernel transitions for identical traffic.

Root Cause:
ParseLoop() processes pipelines in chunks bounded by
max_busy_cycles. The old ReplyBatch() called Flush() after
every chunk unconditionally. Since the parser outruns the socket,
this triggered a kernel sendmsg on every parse iteration instead
of coalescing replies across the full pipeline.

The Fix:
Remove the unconditional flush from ReplyBatch() for V2 and
defer it to four explicit points in IoLoopV2:

- Idle await (no input): flush before sleeping — handles single
  commands and end-of-pipeline.
- Backpressure await: flush so the client can drain its recv
  buffer and relieve memory pressure.
- Half-close (io_ec_): flush before returning the read error —
  ensures pipelined replies reach the client before teardown.
- Migration: flush before cross-thread handoff to avoid data
  corruption or crash on the new thread.

SinkReplyBuilder::Flush() now returns immediately when nothing
is buffered and no resize is requested, avoiding a redundant
state-reset on no-op flushes.

Memory Safety:
No OOM risk is introduced because:
- SinkReplyBuilder auto-flushes its iovec when full (~64KB).
- The backpressure guard in IoLoopV2 forces a flush when the
  pipeline queue grows too large.

Results (throughput mode, pipeline=500, 5-run median, local bench):
  V1 baseline : 583,118 RPS | 41.76ms lat | 7,506 syscalls
  V2 before : 513,640 RPS | 47.88ms lat | 92,696 syscalls
  V2 after : 589,165 RPS | 43.05ms lat | 5,721 syscalls
                                             (~94% syscall reduction)

V2 now slightly outperforms V1 at pipeline=500. The primary goal
of this patch is CPU efficiency via syscall reduction, not raw RPS.

Architectural Note on Fragmentation:
The over-flushing also affected fragmented workloads. At
pipeline=500 fragmentation mode, the old V2 issued up to ~27x
more syscalls than V1 (e.g. 1,847 vs 68 in a single run;
median 1,848 vs 72). This patch reduces that to ~5.7x
(427 vs 75, 5-run median).

The remaining gap is inherent to the single-fiber io_uring
design: V1 uses a 50ms busy-spin (max_busy_read_usec) to
artificially batch fragments, while V2 flushes as soon as the
kernel delivers data. V2 will never reach V1 flush parity in
fragmented workloads, and that is an acceptable tradeoff.

Signed-off-by: Gil Levkovich <69595609+glevkovich@users.noreply.github.com>
Replace the global RCU mechanism (per-thread channel_store_ pointers and
a centralized control block) with a ShardedHashMap that provides fine-grained
locking per shard. Writers only block readers for the duration of a pointer
swap, and concurrent writes to different shards are fully independent, reducing
contention under high subscribe/unsubscribe rates.

The old design required broadcasting a new ChannelStore instance to every
thread on each structural change, serializing all updates through a single
control block. The new design eliminates the per-thread pointers and the
broadcast entirely — channels and patterns are now global shard-locked
structures that any thread can read or update directly.

Closes #7056

Signed-off-by: mkaruza <mario@dragonflydb.io>
…/tests/dragonfly (#7228)

build(deps): update meta-memcache requirement in /tests/dragonfly

Updates the requirements on [meta-memcache](https://github.com/RevenueCat/meta-memcache-py) to permit the latest version.
- [Release notes](https://github.com/RevenueCat/meta-memcache-py/releases)
- [Changelog](https://github.com/RevenueCat/meta-memcache-py/blob/main/CHANGELOG.md)
- [Commits](RevenueCat/meta-memcache-py@v2.0.0...v2.2.0)

---
updated-dependencies:
- dependency-name: meta-memcache
  dependency-version: 2.2.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…s/dragonfly (#7231)

build(deps): update pyyaml requirement in /tests/dragonfly

Updates the requirements on [pyyaml](https://github.com/yaml/pyyaml) to permit the latest version.
- [Release notes](https://github.com/yaml/pyyaml/releases)
- [Changelog](https://github.com/yaml/pyyaml/blob/6.0.3/CHANGES)
- [Commits](yaml/pyyaml@6.0...6.0.3)

---
updated-dependencies:
- dependency-name: pyyaml
  dependency-version: 6.0.3
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…/tests/dragonfly (#7230)

build(deps): update pytest-icdiff requirement in /tests/dragonfly

Updates the requirements on [pytest-icdiff](https://github.com/hjwp/pytest-icdiff) to permit the latest version.
- [Changelog](https://github.com/hjwp/pytest-icdiff/blob/main/HISTORY.rst)
- [Commits](https://github.com/hjwp/pytest-icdiff/commits)

---
updated-dependencies:
- dependency-name: pytest-icdiff
  dependency-version: '0.9'
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
…ests/dragonfly (#7229)

build(deps): update packaging requirement in /tests/dragonfly

Updates the requirements on [packaging](https://github.com/pypa/packaging) to permit the latest version.
- [Release notes](https://github.com/pypa/packaging/releases)
- [Changelog](https://github.com/pypa/packaging/blob/main/CHANGELOG.rst)
- [Commits](pypa/packaging@23.1...26.2)

---
updated-dependencies:
- dependency-name: packaging
  dependency-version: '26.2'
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Copilot AI and others added 2 commits April 29, 2026 06:19
…itecture' into copilot/update-docs-pub-sub-architecture

Co-authored-by: mkaruza <3676457+mkaruza@users.noreply.github.com>
Copilot AI and others added 2 commits April 29, 2026 07:51
Co-authored-by: mkaruza <3676457+mkaruza@users.noreply.github.com>
Co-authored-by: mkaruza <3676457+mkaruza@users.noreply.github.com>
@mkaruza
Copy link
Copy Markdown
Contributor

mkaruza commented Apr 29, 2026

Closing. New PR is generated in #7239

@mkaruza mkaruza closed this Apr 29, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.