feat(core): Add ZSTD dictionary compression for finalized stream nodes#7108
feat(core): Add ZSTD dictionary compression for finalized stream nodes#7108
Conversation
🤖 Augment PR SummarySummary: Adds optional ZSTD compression for finalized Redis stream listpack nodes to reduce memory usage while preserving stream semantics. Changes:
Technical Notes: Compression is disabled by default (threshold=0), uses trained dictionaries to improve ratios, and keeps raw nodes when data is small or incompressible. 🤖 Was this summary useful? React with 👍 or 👎 |
|
Synthetic use-case with Running dfly_bench: Results:
|
85ce029 to
cf4bae9
Compare
0a4f05d to
ea88316
Compare
2e978ad to
0d3e65e
Compare
ea88316 to
286bfb4
Compare
442a1cb to
1c78db8
Compare
|
I was running this artitical benchmark (in PR) on Run stream_benchmark.py with/without compression on single key with following arguments
Result without compression: Result with compression
Result without compression: Result with compression |
0c0507d to
7aa4fe5
Compare
|
interesting. I would expect to see much higher compression ratio with a good dictionary. Did you check if compressing without dictionary makes compression worse? i.e. wether it moves the needle at all? |
7aa4fe5 to
9699696
Compare
Could be that this payload randomness part contribute to not be highly compressible. Asked for quick analysis |
I have run with level 0 compression instead of dictionary so results: There is no visible difference. |
|
Tested with celery from
Results: No compression: With compression With direct level 0 compression:
|
e3bf2c2 to
eb3874f
Compare
eb3874f to
52ecde6
Compare
7f211a9 to
b344487
Compare
52ecde6 to
3b1e7e6
Compare
3b1e7e6 to
3512d3a
Compare
There was a problem hiding this comment.
Pull request overview
This PR introduces optional ZSTD dictionary compression for finalized Redis stream nodes to reduce memory usage for compressible stream data, and adds a Python-based stream benchmark suite plus an analyzer to compare benchmark runs.
Changes:
- Add
StreamNodeObjsupport for storing stream nodes as either raw listpacks or ZSTD-compressed buffers, with transparent decompression via a thread-local reuse buffer. - Compress finalized stream nodes during
XADDnode finalization (gated by--stream_node_zstd_dict_threshold) and materialize listpacks before mutating paths (XDEL/XTRIM). - Add
StreamNodeCompressTestcoverage and introducetools/streambenchmark + analyzer scripts with documentation.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
src/core/stream_node.h |
Defines raw vs compressed stream node representation and new APIs (compress/materialize). |
src/core/stream_node.cc |
Implements per-thread ZSTD dictionary training, compression, and transparent decompression. |
src/server/stream_family.cc |
Integrates node compression on finalization and materialization on mutation paths. |
src/server/stream_family_test.cc |
Adds tests validating XRANGE/XDEL/XTRIM behavior with compressed nodes enabled. |
tools/stream/stream_benchmark.py |
Adds a stream benchmark runner measuring throughput/latency/memory across scenarios. |
tools/stream/stream_benchmark_analyzer.py |
Adds a CLI tool to compare multiple benchmark result CSVs. |
tools/stream/README.md |
Documents running benchmarks and analyzing results. |
| def benchmark_xread(self, num_ops: int = 10000, num_entries: int = 5000) -> BenchmarkResult: | ||
| """Benchmark XREAD command (reading entries)""" | ||
| print(f"\nBenchmarking XREAD ({num_ops} ops on {num_entries} entries)...") | ||
|
|
||
| # Pre-populate stream | ||
| print(f" Populating stream with {num_entries} entries...") | ||
| for i in range(num_entries): | ||
| self.r.xadd( | ||
| self.stream_key, | ||
| _make_payload(i), | ||
| ) | ||
|
|
| return lp; | ||
| } | ||
|
|
||
| void StreamNodeObj::Free() const { |
| static const uint32_t dict_threshold = absl::GetFlag(FLAGS_stream_node_zstd_dict_threshold); | ||
| DCHECK(dict_threshold > 0); | ||
|
|
||
| if (!tl_zstd_ctx) { | ||
| tl_zstd_ctx = std::make_unique<ZstdCompressionCtx>(dict_threshold); |
Introduce per-thread ZSTD dictionary compression for stream node listpacks, reducing memory for compressible stream data. - Add ZstdCompressionCtx: accumulates listpack samples until the configured threshold, trains a ZSTD dictionary, and holds CCtx/DCtx/CDict/DDict state. - StreamNodeObj gains TryCompress() (compress on node finalization in StreamAppendItem) and MaterializeListpack() (copy out of the decompression buffer before in-place mutation in XDEL/XTRIM paths). - GetListpack() decompresses transparently into a thread-local reuse buffer. - Gated by --stream_node_zstd_dict_threshold (0 = disabled); only nodes >=512 bytes that achieve >=30% size reduction are compressed. - Add StreamNodeCompressTest covering XRANGE round-trip, XDEL, and XTRIM. Signed-off-by: mkaruza <mario@dragonflydb.io>
* Syntethic tool stream_benchmark.py stream_benchmark_analyzer.py
3512d3a to
c7272b3
Compare
Introduce per-thread ZSTD dictionary compression for stream node listpacks,
reducing memory for compressible stream data.
threshold, trains a ZSTD dictionary, and holds CCtx/DCtx/CDict/DDict state.
StreamAppendItem) and MaterializeListpack() (copy out of the decompression
buffer before in-place mutation in XDEL/XTRIM paths).
bigger than 512 bytes that achieve >=30% size reduction are compressed.
Add stream benchmark suite that includes performance metrics for XADD, XREAD,
XRANGE, and consumer groups with throughput, latency, and memory tracking.