diff --git a/src/core/stream_node.cc b/src/core/stream_node.cc index ceb9a2a559d8..890d3e76421b 100644 --- a/src/core/stream_node.cc +++ b/src/core/stream_node.cc @@ -3,7 +3,19 @@ #include "core/stream_node.h" +#include + +#include +#include +#include + +#include "absl/flags/flag.h" #include "base/logging.h" +#include "core/dict_builder.h" + +ABSL_FLAG(uint32_t, stream_node_zstd_dict_threshold, 0, + "Minimum stream node bytes accumulated before training ZSTD dictionary for stream node " + "compression. 0 disables compression."); extern "C" { #include "redis/listpack.h" @@ -12,20 +24,253 @@ extern "C" { namespace dfly { +namespace { + +constexpr size_t kMinCompressBytesThreshold = 512; + +// Per-thread ZSTD compression state. +struct ZstdCompressionCtx { + ZSTD_CDict* cdict = nullptr; + ZSTD_DDict* ddict = nullptr; + ZSTD_CCtx* cctx = nullptr; + ZSTD_DCtx* dctx = nullptr; + + // Accumulated samples and sizes used for dictionary training. + std::vector training_data_bytes; + std::vector training_sample_sizes; + size_t training_data_size = 0; + + // Temporary buffer used for compression/decompression. + uint8_t* scratch_buffer = nullptr; + size_t scratch_buffer_capacity = 0; + + // Compressed node whose decompressed form currently lives in scratch_buffer. + const uint8_t* last_compressed_node = nullptr; + + explicit ZstdCompressionCtx(uint32_t dict_threshold) { + training_data_bytes.reserve(dict_threshold); + training_sample_sizes.reserve(32); + } + + bool IsDictReady() const { + return cdict != nullptr; + } + + void ResetDict() { + if (cdict) { + ZSTD_freeCDict(cdict); + cdict = nullptr; + } + if (ddict) { + ZSTD_freeDDict(ddict); + ddict = nullptr; + } + if (cctx) { + ZSTD_freeCCtx(cctx); + cctx = nullptr; + } + if (dctx) { + ZSTD_freeDCtx(dctx); + dctx = nullptr; + } + } + + ~ZstdCompressionCtx() { + ResetDict(); + zfree(scratch_buffer); + } +}; + +thread_local std::unique_ptr tl_zstd_ctx; + +bool TrainZstdDict(ZstdCompressionCtx& ctx) { + if (ctx.IsDictReady()) { + return true; + } + + std::vector> pieces; + pieces.reserve(ctx.training_sample_sizes.size()); + const uint8_t* cursor = ctx.training_data_bytes.data(); + for (uint32_t sz : ctx.training_sample_sizes) { + pieces.emplace_back(cursor, sz); + cursor += sz; + } + + // Ratio > 0.6 means the data is too random to compress well; skip training. + double ratio = EstimateCompressibility(absl::MakeSpan(pieces), 2); + if (ratio > 0.6) { + VLOG(2) << "StreamNodeObj data not compressible (ratio=" << ratio << ")"; + return false; + } + + std::string dict_raw = TrainDictionary(absl::MakeSpan(pieces), 4096, 64); + if (dict_raw.empty()) { + return false; + } + + ctx.cdict = ZSTD_createCDict(dict_raw.data(), dict_raw.size(), 1); + ctx.ddict = ZSTD_createDDict(dict_raw.data(), dict_raw.size()); + ctx.cctx = ZSTD_createCCtx(); + ctx.dctx = ZSTD_createDCtx(); + + if (!ctx.cdict || !ctx.ddict || !ctx.cctx || !ctx.dctx) { + ctx.ResetDict(); + return false; + } + + return true; +} + +} // namespace + uint8_t* StreamNodeObj::GetListpack() const { - DCHECK(IsRaw()); - return Ptr(); + if (IsRaw()) { + return Ptr(); + } + + DCHECK(IsCompressed()); + DCHECK(absl::GetFlag(FLAGS_stream_node_zstd_dict_threshold) > 0); + DCHECK(tl_zstd_ctx && tl_zstd_ctx->IsDictReady()); + + const uint8_t* buf = Ptr(); + + if (tl_zstd_ctx->last_compressed_node == buf) { + return tl_zstd_ctx->scratch_buffer; + } + + uint32_t uncompressed_size, csz; + memcpy(&uncompressed_size, buf, sizeof(uncompressed_size)); + memcpy(&csz, buf + sizeof(uncompressed_size), sizeof(csz)); + const uint8_t* compressed_data = buf + sizeof(uncompressed_size) + sizeof(csz); + + if (tl_zstd_ctx->scratch_buffer_capacity < uncompressed_size) { + tl_zstd_ctx->scratch_buffer = + static_cast(zrealloc(tl_zstd_ctx->scratch_buffer, uncompressed_size)); + tl_zstd_ctx->scratch_buffer_capacity = zmalloc_size(tl_zstd_ctx->scratch_buffer); + } + + size_t dsz = + ZSTD_decompress_usingDDict(tl_zstd_ctx->dctx, tl_zstd_ctx->scratch_buffer, uncompressed_size, + compressed_data, csz, tl_zstd_ctx->ddict); + if (ZSTD_isError(dsz)) { + LOG(DFATAL) << "ZSTD decompression error: " << ZSTD_getErrorName(dsz); + return nullptr; + } + + tl_zstd_ctx->last_compressed_node = buf; + + return tl_zstd_ctx->scratch_buffer; } uint32_t StreamNodeObj::UncompressedSize() const { + if (IsRaw()) { + return static_cast(lpBytes(Ptr())); + } + DCHECK(IsCompressed()); + uint32_t sz; + memcpy(&sz, Ptr(), sizeof(sz)); + return sz; +} + +StreamNodeObj StreamNodeObj::TryCompress() const { DCHECK(IsRaw()); - return static_cast(lpBytes(Ptr())); + static const uint32_t dict_threshold = absl::GetFlag(FLAGS_stream_node_zstd_dict_threshold); + DCHECK(dict_threshold > 0); + + if (!tl_zstd_ctx) { + tl_zstd_ctx = std::make_unique(dict_threshold); + } + + uint8_t* lp = Ptr(); + const size_t lp_size = lpBytes(lp); + + if (lp_size < kMinCompressBytesThreshold) { + return *this; + } + + if (!tl_zstd_ctx->IsDictReady()) { + tl_zstd_ctx->training_data_bytes.insert(tl_zstd_ctx->training_data_bytes.end(), lp, + lp + lp_size); + tl_zstd_ctx->training_sample_sizes.push_back(static_cast(lp_size)); + tl_zstd_ctx->training_data_size += lp_size; + if (tl_zstd_ctx->training_data_size < dict_threshold) { + return *this; + } + if (!TrainZstdDict(*tl_zstd_ctx)) { + tl_zstd_ctx->training_data_bytes.clear(); + tl_zstd_ctx->training_sample_sizes.clear(); + tl_zstd_ctx->training_data_size = 0; + return *this; + } + std::vector().swap(tl_zstd_ctx->training_data_bytes); + std::vector().swap(tl_zstd_ctx->training_sample_sizes); + tl_zstd_ctx->training_data_size = 0; + } + + const size_t bound = ZSTD_compressBound(lp_size); + + if (tl_zstd_ctx->scratch_buffer_capacity < bound) { + tl_zstd_ctx->scratch_buffer = + static_cast(zrealloc(tl_zstd_ctx->scratch_buffer, bound)); + tl_zstd_ctx->scratch_buffer_capacity = zmalloc_size(tl_zstd_ctx->scratch_buffer); + } + + tl_zstd_ctx->last_compressed_node = nullptr; + const size_t csz = ZSTD_compress_usingCDict(tl_zstd_ctx->cctx, tl_zstd_ctx->scratch_buffer, bound, + lp, lp_size, tl_zstd_ctx->cdict); + + // Reject if compression failed or saved less than 30%. + if (ZSTD_isError(csz) || csz >= lp_size * 7 / 10) { + return *this; + } + + // Allocate the exact final size and copy header + compressed payload in one shot. + DCHECK_LE(csz, std::numeric_limits::max()); + const uint32_t uncompressed_sz = static_cast(lp_size); + const uint32_t compressed_sz = static_cast(csz); + + // Compressed buffer layout: [4B uncompressed_sz][4B csz][compressed data] + uint8_t* buf = static_cast(zmalloc(sizeof(uint32_t) * 2 + csz)); + memcpy(buf, &uncompressed_sz, sizeof(uncompressed_sz)); + memcpy(buf + sizeof(uncompressed_sz), &compressed_sz, sizeof(compressed_sz)); + memcpy(buf + sizeof(uint32_t) * 2, tl_zstd_ctx->scratch_buffer, csz); + zfree(lp); + + // Create new node object and tag it as compressed + StreamNodeObj compressed_node_obj; + compressed_node_obj.ptr_ = reinterpret_cast(buf) | kCompressedBit; + return compressed_node_obj; +} + +uint8_t* StreamNodeObj::MaterializeListpack(uint8_t* lp) { + DCHECK(lp != nullptr); + DCHECK(tl_zstd_ctx && tl_zstd_ctx->IsDictReady()); + if (lp == tl_zstd_ctx->scratch_buffer) { + const uint32_t sz = static_cast(lpBytes(lp)); + uint8_t* copy = static_cast(zmalloc(sz)); + memcpy(copy, lp, sz); + tl_zstd_ctx->scratch_buffer_capacity = zmalloc_size(lp); + return copy; + } + tl_zstd_ctx->scratch_buffer = nullptr; + tl_zstd_ctx->scratch_buffer_capacity = 0; + tl_zstd_ctx->last_compressed_node = nullptr; + return lp; } void StreamNodeObj::Free() const { zfree(Ptr()); } +void StreamNodeObj::InvalidateDecompressionState() { + DCHECK(tl_zstd_ctx && tl_zstd_ctx->IsDictReady()); + if (tl_zstd_ctx && tl_zstd_ctx->IsDictReady()) { + tl_zstd_ctx->scratch_buffer = nullptr; + tl_zstd_ctx->scratch_buffer_capacity = 0; + tl_zstd_ctx->last_compressed_node = nullptr; + } +} + size_t StreamNodeObj::MallocSize() const { return zmalloc_size(Ptr()); } diff --git a/src/core/stream_node.h b/src/core/stream_node.h index e011a31ee2c7..517c66e8f9f0 100644 --- a/src/core/stream_node.h +++ b/src/core/stream_node.h @@ -9,17 +9,32 @@ namespace dfly { -// StreamNodeObj represents a stream node stored in the rax tree. +// StreamNodeObj is a compact tagged-pointer representation of a stream node +// stored inside a radix tree entry. // -// Each node is: -// - Raw: a pointer to a listpack +// It encodes both the node payload pointer and its representation type +// within a single uintptr_t value using bit 52 as a state flag. // -// The representation is explicit and zero-copy. +// Supported representations: +// +// Raw (bit 52 = 0): +// ptr_ points directly to a listpack containing the stream entry data. +// +// Compressed (bit 52 = 1): +// ptr_ points to a ZSTD-compressed buffer with layout: +// [4B uncompressed size][4B compressed size][compressed payload] +// +// Important invariants: +// - Ptr() always returns a usable pointer with tag bits stripped. +// - Get() returns the raw encoded value and must not be dereferenced. +// - Ownership of the underlying memory depends on the representation: +// * Raw: listpack memory +// * Compressed: allocated compression buffer class StreamNodeObj { public: + static constexpr uintptr_t kCompressedBit = 1ULL << 52; static constexpr uintptr_t kTagMask = 1ULL << 52; - // Construct from a raw tagged value retrieved from the rax tree. explicit StreamNodeObj(void* p = nullptr) : ptr_(reinterpret_cast(p)) { } @@ -40,6 +55,10 @@ class StreamNodeObj { return (ptr_ & kTagMask) == 0; } + bool IsCompressed() const { + return (ptr_ & kTagMask) == kCompressedBit; + } + // Raw pointer with tag bits stripped. uint8_t* Ptr() const { return std::bit_cast(ptr_ & ~kTagMask); @@ -56,9 +75,28 @@ class StreamNodeObj { // Uncompressed listpack size in bytes. uint32_t UncompressedSize() const; - // Frees the node's underlying pointer + // Prerequisite: IsRaw() and flag stream_node_zstd_dict_threshold > 0. + // Attempts compression of the listpack using ZSTD with a trained dictionary. + // Compression is a no-op if: + // 1. the dictionary is not ready (still training or dictionary construction failed), + // 2. raw size is less than 512 bytes, + // 3. the data compression returned error, + // 4. the compressed result does not achieve ≥30% size reduction. + // Returns Compressed StreamNodeObj if compression is applied, otherwise *this. + StreamNodeObj TryCompress() const; + + // Frees the node's underlying pointer. void Free() const; + // Nullifies the thread-local decompression buffer pointer and resets its capacity. + void InvalidateDecompressionState(); + + // Materializes a decompressed listpack into stable, heap-owned memory. + // Must only be called on compressed nodes (tl_zstd_ctx must be ready). + // If `lp` points to the thread-local decompression buffer, allocates a new + // heap buffer and copies the contents. Otherwise returns `lp` unchanged. + static uint8_t* MaterializeListpack(uint8_t* lp); + // Total allocated bytes for this node. size_t MallocSize() const; diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 8859f32ddd80..f9cf133ead04 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -16,6 +16,7 @@ extern "C" { #include "redis/zmalloc.h" } +#include "base/flags.h" #include "base/logging.h" #include "facade/cmd_arg_parser.h" #include "server/acl/acl_commands_def.h" @@ -30,6 +31,8 @@ extern "C" { #include "server/namespaces.h" #include "server/transaction.h" +ABSL_DECLARE_FLAG(uint32_t, stream_node_zstd_dict_threshold); + namespace dfly { using namespace facade; @@ -180,15 +183,30 @@ void StreamIteratorRemoveEntry(streamIterator* si, streamID* current) { aux = LpGetInteger(p); if (aux == 1) { - node.Free(); + zfree(lp); + if (node.IsCompressed()) { + node.InvalidateDecompressionState(); + node.Free(); + } checkedRaxRemove(si->stream->rax, si->ri.key, si->ri.key_len, NULL); } else { lp = lpReplaceInteger(lp, &p, aux - 1); p = lpNext(lp, p); aux = LpGetInteger(p); lp = lpReplaceInteger(lp, &p, aux + 1); - if (si->lp != lp) + bool update_rax_lp = false; + if (node.IsCompressed()) { + lp = StreamNodeObj::MaterializeListpack(lp); + node.Free(); + update_rax_lp = true; + } else if (lp != node.Ptr()) { + update_rax_lp = true; + } + if (update_rax_lp) { raxInsert(si->stream->rax, si->ri.key, si->ri.key_len, lp, nullptr); + si->ri.data = lp; + } + si->lp = lp; CHECK_GT(lpBytes(lp), 0u); } @@ -467,7 +485,17 @@ int64_t StreamTrim(stream* s, streamAddTrimArgs* args) { p = lpNext(lp, p); int64_t marked_deleted = LpGetInteger(p); lp = lpReplaceInteger(lp, &p, marked_deleted + deleted_from_lp); - raxInsert(s->rax, ri.key, ri.key_len, lp, nullptr); + bool update_rax_lp = false; + if (node.IsCompressed()) { + lp = StreamNodeObj::MaterializeListpack(lp); + node.Free(); + update_rax_lp = true; + } else if (lp != node.Ptr()) { + update_rax_lp = true; + } + if (update_rax_lp) { + raxInsert(s->rax, ri.key, ri.key_len, lp, nullptr); + } CHECK_GT(lpBytes(lp), 0u); break; } @@ -916,6 +944,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad if (!raxEOF(&ri)) { /* Get a reference to the tail node listpack. */ + DCHECK(StreamNodeObj(ri.data).IsRaw()); current_node = StreamNodeObj(ri.data).Ptr(); lp = current_node; lp_bytes = lpBytes(current_node); @@ -978,9 +1007,12 @@ int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* ad if (ri.key_len != sizeof(rax_key) || memcmp(ri.key, rax_key, sizeof(rax_key)) != 0) { LOG(DFATAL) << "StreamAppendItem: Key mismatch"; } - if (lp != current_node) { - raxInsert(s->rax, rax_key, sizeof(rax_key), lp, nullptr); + /* Finalize and optionally compress the node. */ + StreamNodeObj finalized_node = StreamNodeObj::Raw(lp); + if (absl::GetFlag(FLAGS_stream_node_zstd_dict_threshold) > 0) { + finalized_node = finalized_node.TryCompress(); } + raxInsert(s->rax, rax_key, sizeof(rax_key), finalized_node.Get(), nullptr); current_node = nullptr; lp = NULL; } diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index 8c79a9f361e6..39dc1a9fe0ea 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -10,6 +10,8 @@ #include "facade/facade_test.h" #include "server/test_utils.h" +ABSL_DECLARE_FLAG(uint32_t, stream_node_zstd_dict_threshold); + using namespace testing; using namespace std; using namespace util; @@ -1601,4 +1603,96 @@ TEST_F(StreamFamilyTest, XAutoClaimEmptyConsumer) { EXPECT_THAT(resp, AnyOf(ErrArg(""), ArgType(RespExpr::ARRAY))); } +// Tests for stream node compression (enable_stream_node_compress flag). +class StreamNodeCompressTest : public StreamFamilyTest { + protected: + void SetUp() override { + StreamFamilyTest::SetUp(); + absl::SetFlag(&FLAGS_stream_node_zstd_dict_threshold, 1u); + } + + void TearDown() override { + absl::SetFlag(&FLAGS_stream_node_zstd_dict_threshold, 0u); + StreamFamilyTest::TearDown(); + } + + // Adds `count` entries to `key` with IDs "-0" and field "f" value "v". + // The padding ensures ZSTD achieves the required >30% compression ratio while keeping + // 100 entries well under the 4096-byte node size limit (count limit triggers the split). + void AddEntries(string_view key, int count) { + for (int i = 1; i <= count; i++) { + Run({"xadd", string(key), absl::StrCat(i, "-0"), "f", absl::StrCat("v", i, kValuePad)}); + } + } + + // 20 repeated chars give ~3600 bytes for 100 entries (< 4096) and compress >30% with ZSTD. + static constexpr std::string_view kValuePad = "xxxxxxxxxxxxxxxxxxxx"; // 20 'x' +}; + +// Verify that XRANGE returns all entries correctly after compression. +TEST_F(StreamNodeCompressTest, RoundTrip) { + // 110 entries: node 0 (100 entries, compressed) + node 1 (10 entries, raw) + AddEntries("s", 110); + + EXPECT_THAT(Run({"xlen", "s"}), IntArg(110)); + + auto resp = Run({"xrange", "s", "-", "+"}); + ASSERT_THAT(resp, ArrLen(110)); + + const auto& entries = resp.GetVec(); + EXPECT_EQ(entries[0].GetVec()[0], "1-0"); + EXPECT_THAT(entries[0].GetVec()[1].GetVec(), ElementsAre("f", absl::StrCat("v1", kValuePad))); + EXPECT_EQ(entries[109].GetVec()[0], "110-0"); + EXPECT_THAT(entries[109].GetVec()[1].GetVec(), ElementsAre("f", absl::StrCat("v110", kValuePad))); +} + +// Verify XDEL on an entry inside a compressed node. +TEST_F(StreamNodeCompressTest, XDelFromCompressedNode) { + // node 0 (entries 1-0..100-0, compressed), node 1 (entry 101-0, raw). + AddEntries("s", 101); + + // Delete an entry from the compressed node 0. + EXPECT_THAT(Run({"xdel", "s", "50-0"}), IntArg(1)); + EXPECT_THAT(Run({"xlen", "s"}), IntArg(100)); + + // The deleted entry must not appear in range. + EXPECT_THAT(Run({"xrange", "s", "50-0", "50-0"}), ArrLen(0)); + + // Neighbours must still be readable. + EXPECT_THAT(Run({"xrange", "s", "49-0", "51-0"}), ArrLen(2)); + + // Full range count must match. + EXPECT_THAT(Run({"xrange", "s", "-", "+"}), ArrLen(100)); +} + +// Verify XTRIM that partially removes entries from a compressed node. +TEST_F(StreamNodeCompressTest, XTrimPartialCompressedNode) { + // node 0 (entries 1-0..100-0, compressed), node 1 (entry 101-0, raw). + AddEntries("s", 101); + + // Trim to 60: removes 41 entries from the start of compressed node 0. + EXPECT_THAT(Run({"xtrim", "s", "maxlen", "=", "60"}), IntArg(41)); + EXPECT_THAT(Run({"xlen", "s"}), IntArg(60)); + + // The first surviving entry is 42-0 (entry 41 removed, 42 is the first kept). + auto resp = Run({"xrange", "s", "-", "+"}); + ASSERT_THAT(resp, ArrLen(60)); + EXPECT_EQ(resp.GetVec()[0].GetVec()[0], "42-0"); +} + +// Verify XTRIM that removes an entire compressed node. +TEST_F(StreamNodeCompressTest, XTrimRemoveCompressedNode) { + // node 0 (entries 1-0..100-0, compressed), node 1 (entry 101-0, raw). + AddEntries("s", 101); + + // Trim to 1: compressed node 0 is removed entirely, only raw node 1 survives. + EXPECT_THAT(Run({"xtrim", "s", "maxlen", "=", "1"}), IntArg(100)); + EXPECT_THAT(Run({"xlen", "s"}), IntArg(1)); + + // Single entry: xrange returns [id, fields] directly (2 elements). + auto resp = Run({"xrange", "s", "-", "+"}); + ASSERT_THAT(resp, ArrLen(2)); + EXPECT_EQ(resp.GetVec()[0], "101-0"); +} + } // namespace dfly diff --git a/tools/stream/README.md b/tools/stream/README.md new file mode 100644 index 000000000000..0cbe69edcb35 --- /dev/null +++ b/tools/stream/README.md @@ -0,0 +1,434 @@ +# Redis Stream Benchmarking Suite - Complete Guide + +> **Note:** This tool was written with [Claude Code](https://claude.com/claude-code) + +## Overview + +This benchmarking suite provides comprehensive performance testing for Redis stream commands in real-world scenarios. It measures throughput, latency, and memory usage across different workload patterns. + +## Features + +### Measured Metrics +- **Throughput**: Operations per second (ops/sec) +- **Latency**: Min, max, average, and percentiles (P50, P95, P99) +- **Memory**: Before/after memory usage and delta +- **Efficiency**: Throughput per unit of memory overhead +- **Memory Analysis**: Track memory consumption changes across benchmark runs + +### Covered Commands +- `XADD` - Adding entries to streams +- `XREAD` - Reading entries from streams +- `XRANGE` - Range queries on streams +- `XREADGROUP` + `XACK` - Consumer group operations + +### Payload +Each stream entry uses a generic variable-size payload with realistic fields: `seq`, `ts`, `user_id`, `action`, `status`, `region`, and `data` blob. All entries carry a random alphanumeric `data` field (8–64 bytes) to simulate traffic with variable message sizes. + +Use `--seed` for reproducible payload sequences across runs. Default: 0 (non-deterministic). + +### Workload Scenarios +1. **Producer** - XADD performance with configurable thread count +2. **Consumer** - XREAD performance +3. **Range Query** - XRANGE performance +4. **Consumer Groups** - Complete consumer group workflow (XREADGROUP + XACK) +5. **Mixed Workload** - Concurrent producers and consumers; reports XADD, XREAD, and XACK latency separately + +## Installation + +### Prerequisites +```bash +pip install redis pandas +``` + +### Files +- `stream_benchmark.py` - Main benchmarking script +- `stream_benchmark_analyzer.py` - Analysis and comparison tool +- This guide + +## Quick Start + +### 1. Run Full Benchmark Suite + +```bash +python3 stream_benchmark.py --full +``` + +This runs all benchmarks and saves results to: +- `redis_benchmark_results.csv` - Comma-separated values + +### 2. Run Specific Benchmarks + +```bash +# XADD only +python3 stream_benchmark.py --xadd + +# XREAD only +python3 stream_benchmark.py --xread + +# Consumer groups only +python3 stream_benchmark.py --consumer-group + +# Mixed workload +python3 stream_benchmark.py --mixed +``` + +### 3. Analyze Results + +```bash +# Comprehensive analysis +python3 stream_benchmark_analyzer.py redis_benchmark_results.csv --all + +# Compare two runs +python3 stream_benchmark_analyzer.py run_1.csv run_2.csv --all + +# Export comparison +python3 stream_benchmark_analyzer.py run_1.csv run_2.csv --export comparison.csv +``` + +## Configuration Options + +### Benchmark Script + +```bash +python3 stream_benchmark.py [OPTIONS] + +Connection Options: + --uri URI Redis connection URI (redis://[:password@]host[:port][/db]) + --hostname HOSTNAME Redis hostname (default: localhost) + --port PORT Redis port (default: 6379) + --db DB Redis database (default: 0) + --password PASSWORD Redis password + --output FILE Output CSV file (default: redis_benchmark_results.csv) + +Benchmark Selection: + --full Run full benchmark suite + --xadd Benchmark XADD only + --xread Benchmark XREAD only + --xrange Benchmark XRANGE only + --consumer-group Benchmark consumer groups + --mixed Benchmark mixed workload + +Benchmark Parameters (must be > 0): + --xadd-num-ops NUM Number of XADD operations (default: 10000) + --xread-num-ops NUM Number of XREAD operations (default: 5000) + --xread-num-entries NUM Pre-populate entries for XREAD (default: 5000) + --xrange-num-ops NUM Number of XRANGE operations (default: 5000) + --xrange-num-entries NUM Pre-populate entries for XRANGE (default: 5000) + --consumer-group-num-ops NUM Number of consumer group ops (default: 5000) + --mixed-duration-seconds NUM Duration of mixed workload (default: 30) + --threads NUM Threads for XADD, consumer group, and mixed workload (default: 4) + --seed NUM Random seed for reproducible payloads (default: 0, non-deterministic) +``` + +#### Connection Examples + +```bash +# Default (localhost:6379) +python3 stream_benchmark.py --full + +# Specific host and port +python3 stream_benchmark.py --hostname redis.example.com --port 6380 --full + +# With password +python3 stream_benchmark.py --hostname redis.example.com --password secret --full + +# Using URI +python3 stream_benchmark.py --uri "redis://:password@redis.example.com:6380/0" --full + +# URI with authentication +python3 stream_benchmark.py --uri "redis://:mypass@localhost:6379/1" --full +``` + +#### Custom Workload Parameters + +```bash +# Stress test XADD with more threads +python3 stream_benchmark.py --xadd --xadd-num-ops 50000 --threads 8 + +# Run XREAD with more pre-populated entries +python3 stream_benchmark.py --xread --xread-num-ops 10000 --xread-num-entries 10000 + +# Quick test with fewer operations +python3 stream_benchmark.py --full --xadd-num-ops 1000 --xread-num-ops 1000 --xrange-num-ops 1000 + +# Extended mixed workload test +python3 stream_benchmark.py --mixed --mixed-duration-seconds 120 --threads 8 + +# Reproducible run (fixed seed) +python3 stream_benchmark.py --full --seed 42 + +# Full suite with custom parameters +python3 stream_benchmark.py --full \ + --xadd-num-ops 20000 \ + --xread-num-ops 10000 \ + --xrange-num-ops 10000 \ + --consumer-group-num-ops 10000 \ + --mixed-duration-seconds 60 \ + --threads 8 \ + --seed 42 + +# Parameter validation (must be > 0) +python3 stream_benchmark.py --xadd --xadd-num-ops 0 # Error: must be greater than 0 +``` + +### Analyzer Script + +```bash +python3 stream_benchmark_analyzer.py FILE [FILE...] [OPTIONS] + +Options: + --throughput Compare throughput + --latency Compare latency + --memory Compare memory usage + --efficiency Analyze efficiency metrics + --bottlenecks Identify performance bottlenecks + --regression Detect performance regressions (includes memory analysis) + --percentiles Show latency percentile distribution + --scenarios Analyze by scenario (includes memory metrics) + --cross Cross-run comparison (includes memory changes) + --all Run all analyses + --export FILE Export comparison to CSV (includes memory delta changes) +``` + +## Real-World Scenarios + +### Scenario 1: High-Throughput Message Queue (E-commerce Orders) + +```bash +# Run with optimized settings for high throughput +python3 stream_benchmark.py --xadd --output orders_baseline.csv +python3 stream_benchmark_analyzer.py orders_baseline.csv --throughput --latency +``` + +### Scenario 2: Distributed Task Processing (Microservices) + +```bash +# Simulate multiple services processing tasks +python3 stream_benchmark.py --consumer-group --output tasks_baseline.csv +python3 stream_benchmark_analyzer.py tasks_baseline.csv --scenarios --efficiency +``` + +### Scenario 3: Analytics/Event Streaming (Multiple producers & consumers) + +```bash +# Run mixed workload to simulate real-world analytics +python3 stream_benchmark.py --mixed --output analytics_baseline.csv +python3 stream_benchmark_analyzer.py analytics_baseline.csv --all +``` + +### Scenario 4: Comparing Two Benchmark Runs + +**Run 1:** +```bash +python3 stream_benchmark.py --full --output run1.csv +``` + +**Run 2 (with different settings):** +```bash +python3 stream_benchmark.py --full --output run2.csv +``` + +**Compare results (includes memory changes):** +```bash +python3 stream_benchmark_analyzer.py run1.csv run2.csv --all --regression +``` + +## Understanding the Results + +### CSV Output Columns + +| Column | Description | +|--------|-------------| +| scenario | Test scenario (single_producer, consumer_group, etc.) | +| command | Redis command tested (XADD, XREAD, etc.) | +| num_operations | Total operations executed | +| duration_seconds | Time taken (in seconds) | +| throughput_ops_sec | Operations per second | +| min_latency_ms | Minimum latency | +| max_latency_ms | Maximum latency | +| avg_latency_ms | Average latency | +| p50_latency_ms | Median latency (50th percentile) | +| p95_latency_ms | 95th percentile latency | +| p99_latency_ms | 99th percentile latency | +| memory_before_mb | Server `used_memory` before test (MB) | +| memory_after_mb | Server `used_memory` after test (MB) | +| memory_delta_mb | Server memory growth during test | +| timestamp | Test timestamp | + +## Advanced Usage + +### Custom Workload Testing + +Modify `stream_benchmark.py` to add custom scenarios: + +```python +def benchmark_custom_scenario(self, num_ops: int = 10000) -> BenchmarkResult: + """Your custom benchmark scenario""" + print(f"\nBenchmarking Custom Scenario...") + + mem_before = self.get_memory_usage() + latencies = [] + + start_time = time.perf_counter() + for i in range(num_ops): + start = time.perf_counter() + # Your custom Redis operations here + self.r.xadd(self.stream_key, {'data': f'custom_{i}'}) + elapsed = (time.perf_counter() - start) * 1000 + latencies.append(elapsed) + + duration = time.perf_counter() - start_time + mem_after = self.get_memory_usage() + + return self._create_result( + 'custom_scenario', + 'CUSTOM_COMMAND', + num_ops, + duration, + latencies, + mem_before, + mem_after + ) +``` + +### Comparing Different Redis Instances + +```bash +# Benchmark against instance 1 +python3 stream_benchmark.py --hostname redis1.example.com --output redis1.csv + +# Benchmark against instance 2 +python3 stream_benchmark.py --hostname redis2.example.com --output redis2.csv + +# Compare with memory analysis +python3 stream_benchmark_analyzer.py redis1.csv redis2.csv --all --regression +``` + +### Remote Redis with Authentication + +```bash +# Benchmark remote secured instance +python3 stream_benchmark.py --uri "redis://:mypass@prod-redis.internal:6379/0" --full --output prod_baseline.csv +``` + +### Long-Running Stability Tests + +```bash +# Run multiple times and compare for memory leaks/performance degradation +python3 stream_benchmark.py --mixed --output run1.csv +python3 stream_benchmark.py --mixed --output run2.csv +python3 stream_benchmark_analyzer.py run1.csv run2.csv --regression +``` + +## Memory Analysis Features + +Memory metrics reflect **server-side** `used_memory` from `INFO memory`, not client process RSS. This accurately shows how much memory Dragonfly/Redis consumed storing stream data before and after each benchmark. + +The analyzer includes comprehensive memory tracking: + +### Cross-Run Memory Comparison +```bash +python3 stream_benchmark_analyzer.py baseline.csv optimized.csv --cross +``` + +Shows memory delta changes between runs for each scenario. + +### Regression Detection with Memory +```bash +python3 stream_benchmark_analyzer.py before.csv after.csv --regression +``` + +Reports: +- Memory delta changes (baseline → current) +- Identifies memory regressions or improvements + +### Exported Comparison Data +```bash +python3 stream_benchmark_analyzer.py before.csv after.csv --export comparison.csv +``` + +Includes: +- `baseline_memory_delta` - Memory usage in baseline +- `memory_delta` - Memory usage in current run +- `memory_delta_change_MB` - Absolute memory change + +## Best Practices + +### 1. Baseline Before Optimization +```bash +# Always create a baseline first +python3 stream_benchmark.py --full --output baseline.csv +``` + +### 2. Isolate Variables +- Test one change at a time +- Run multiple times to reduce noise +- Ensure consistent Redis state + +### 3. Realistic Data Sizes +- Use realistic payload sizes (match your production data) +- Test with production-like entry counts +- Consider memory constraints + +### 4. Monitor System Resources +- Run benchmarks on isolated systems +- Monitor CPU, memory, and disk usage +- Use `redis-cli --stat` to observe Redis metrics + +### 5. Statistical Significance +- Always look at percentiles (P95, P99) not just averages +- Run benchmarks multiple times +- Use analyzer to detect regressions across runs + +## Example Workflow + +```bash +# 1. Run first benchmark +python3 stream_benchmark.py --full --output benchmark_1.csv + +# 2. Run second benchmark with different settings +python3 stream_benchmark.py --full --output benchmark_2.csv + +# 3. Analyze results +python3 stream_benchmark_analyzer.py benchmark_1.csv benchmark_2.csv --all + +# 4. Check for regressions (including memory) +python3 stream_benchmark_analyzer.py benchmark_1.csv benchmark_2.csv --regression + +# 5. Export comparison +python3 stream_benchmark_analyzer.py benchmark_1.csv benchmark_2.csv --export comparison.csv + +# 6. Review CSV in spreadsheet application +``` + +## Troubleshooting + +### Connection Issues +- Check Redis is running: `redis-cli ping` +- Verify host/port using `--hostname` and `--port` options +- Test URI format with `redis-cli -u "your-uri"` + +### Memory Issues +- Reduce operation counts using parameters: `--xadd-num-ops`, `--xread-num-ops`, etc. +- Use shorter mixed workload: `--mixed-duration-seconds 10` instead of 30 +- Clean up Redis with `redis-cli FLUSHDB` +- Example: `python3 stream_benchmark.py --full --xadd-num-ops 1000 --xread-num-ops 1000` + +### Inconsistent Results +- Run benchmarks on isolated systems with minimal background load +- Run multiple times to get consistent data + +### Slow Benchmarks +- Reduce operation counts: `--xadd-num-ops 1000 --xread-num-ops 1000` +- Shorten mixed workload: `--mixed-duration-seconds 10` +- Test individual commands instead of `--full` +- Check network latency for remote Redis instances + +## Support + +For issues or questions: +1. Check Redis is running: `redis-cli ping` +2. Verify network connectivity with `--hostname` and `--port` options +3. Test URI format with `redis-cli -u "your-uri"` +4. Review logs for specific error messages +5. Test with reduced workload size diff --git a/tools/stream/stream_benchmark.py b/tools/stream/stream_benchmark.py new file mode 100755 index 000000000000..63f77937aa47 --- /dev/null +++ b/tools/stream/stream_benchmark.py @@ -0,0 +1,602 @@ +#!/usr/bin/env python3 +""" +Comprehensive Redis Stream Benchmarking Suite +Benchmarks: XADD, XREAD, XRANGE, consumer groups +Measures: Throughput, latency, memory usage +Scenarios: ops (XADD,XREAD), consumer groups, mixed workloads + +Written with Claude Code (https://claude.com/claude-code) +""" + +import redis +import time +import csv +import statistics +import argparse +import random +import string +from datetime import datetime +from typing import Dict, List, Tuple +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, asdict +from urllib.parse import urlparse + +_FIELD_NAMES = ["user_id", "session_id", "action", "resource", "status", "region", "version"] +_ACTIONS = ["read", "write", "update", "delete", "list", "create", "process"] +_STATUSES = ["ok", "ok", "ok", "error", "pending"] # weighted toward ok + + +def _make_payload(i: int) -> dict: + """Generic variable-size payload simulating a realistic stream entry.""" + payload = { + "seq": str(i), + "ts": str(time.time()), + "user_id": str(random.randint(1, 100_000)), + "action": random.choice(_ACTIONS), + "status": random.choice(_STATUSES), + "region": random.choice(["us-east", "us-west", "eu-west", "ap-south"]), + } + # Variable-size random data blob (8-64 bytes, mean 16) + size = int(16 + random.expovariate(1 / 16)) + size = min(max(size, 8), 64) + payload["data"] = "".join(random.choices(string.ascii_letters + string.digits, k=size)) + return payload + + +@dataclass +class BenchmarkResult: + """Stores benchmark metrics""" + + scenario: str + command: str + num_operations: int + duration_seconds: float + throughput_ops_sec: float + min_latency_ms: float + max_latency_ms: float + avg_latency_ms: float + p50_latency_ms: float + p95_latency_ms: float + p99_latency_ms: float + memory_before_mb: float + memory_after_mb: float + memory_delta_mb: float + timestamp: str + + +class RedisStreamBenchmark: + def __init__( + self, host="localhost", port=6379, db=0, password=None, uri=None, stream_key="bench_stream" + ): + if uri: + # Parse connection URI (redis://[:password@]host[:port][/db]) + parsed = urlparse(uri) + if parsed.scheme not in ("redis", "rediss"): + raise ValueError("URI must start with redis:// or rediss://") + + host = parsed.hostname or "localhost" + port = parsed.port or 6379 + password = parsed.password + if parsed.path: + try: + db = int(parsed.path.lstrip("/")) + except (ValueError, IndexError): + db = 0 + + self.r = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=True) + self.stream_key = stream_key + self.results: List[BenchmarkResult] = [] + + # Verify connection + try: + self.r.ping() + print(f"Connected to Redis at {host}:{port}") + except Exception as e: + print(f"Failed to connect to Redis: {e}") + raise + + def cleanup_stream(self): + """Delete the stream to ensure clean state""" + self.r.delete(self.stream_key) + + def get_memory_usage(self) -> float: + """Get server used_memory in MB from Redis INFO""" + return self.r.info("memory")["used_memory"] / 1024 / 1024 + + def benchmark_xadd(self, num_ops: int = 10000, num_threads: int = 1) -> BenchmarkResult: + """Benchmark XADD command (adding entries to stream)""" + print(f"\nBenchmarking XADD ({num_ops} ops, {num_threads} threads)...") + + self.cleanup_stream() + mem_before = self.get_memory_usage() + latencies = [] + + def add_entry(entry_num): + start = time.perf_counter() + self.r.xadd(self.stream_key, _make_payload(entry_num)) + elapsed = (time.perf_counter() - start) * 1000 # Convert to ms + return elapsed + + start_time = time.perf_counter() + + if num_threads == 1: + latencies = [add_entry(i) for i in range(num_ops)] + else: + with ThreadPoolExecutor(max_workers=num_threads) as executor: + latencies = list(executor.map(add_entry, range(num_ops))) + + duration = time.perf_counter() - start_time + mem_after = self.get_memory_usage() + + return self._create_result( + "producer", "XADD", num_ops, duration, latencies, mem_before, mem_after + ) + + def benchmark_xread(self, num_ops: int = 10000, num_entries: int = 5000) -> BenchmarkResult: + """Benchmark XREAD command (reading entries)""" + print(f"\nBenchmarking XREAD ({num_ops} ops on {num_entries} entries)...") + + # Pre-populate stream + print(f" Populating stream with {num_entries} entries...") + for i in range(num_entries): + self.r.xadd( + self.stream_key, + _make_payload(i), + ) + + mem_before = self.get_memory_usage() + latencies = [] + + # Read from various positions + start_time = time.perf_counter() + for i in range(num_ops): + start = time.perf_counter() + self.r.xread({self.stream_key: "0"}, count=10) + elapsed = (time.perf_counter() - start) * 1000 + latencies.append(elapsed) + + duration = time.perf_counter() - start_time + mem_after = self.get_memory_usage() + + return self._create_result( + "consumer", "XREAD", num_ops, duration, latencies, mem_before, mem_after + ) + + def benchmark_xrange(self, num_ops: int = 10000, num_entries: int = 5000) -> BenchmarkResult: + """Benchmark XRANGE command (range queries)""" + print(f"\nBenchmarking XRANGE ({num_ops} ops on {num_entries} entries)...") + + # Pre-populate stream + print(f" Populating stream with {num_entries} entries...") + for i in range(num_entries): + self.r.xadd( + self.stream_key, + _make_payload(i), + ) + + mem_before = self.get_memory_usage() + latencies = [] + + start_time = time.perf_counter() + for i in range(num_ops): + start = time.perf_counter() + self.r.xrange(self.stream_key, "-", "+", count=100) + elapsed = (time.perf_counter() - start) * 1000 + latencies.append(elapsed) + + duration = time.perf_counter() - start_time + mem_after = self.get_memory_usage() + + return self._create_result( + "range_query", "XRANGE", num_ops, duration, latencies, mem_before, mem_after + ) + + def benchmark_consumer_group( + self, num_ops: int = 10000, num_consumers: int = 3, num_entries: int = 5000 + ) -> BenchmarkResult: + """Benchmark consumer group operations (create group, read, ack)""" + print(f"\nBenchmarking Consumer Groups ({num_ops} ops, {num_consumers} consumers)...") + + group_name = "bench_group" + + # Pre-populate stream + print(f" Populating stream with {num_entries} entries...") + for i in range(num_entries): + self.r.xadd( + self.stream_key, + _make_payload(i), + ) + + # Create consumer group + try: + self.r.xgroup_create(self.stream_key, group_name, id="0", mkstream=False) + except redis.ResponseError: + self.r.xgroup_destroy(self.stream_key, group_name) + self.r.xgroup_create(self.stream_key, group_name, id="0", mkstream=False) + + mem_before = self.get_memory_usage() + latencies = [] + + def process_messages(consumer_id): + consumer_latencies = [] + ops_per_consumer = num_ops // num_consumers + + for _ in range(ops_per_consumer): + # Read pending messages + start = time.perf_counter() + messages = self.r.xreadgroup( + group_name, consumer_id, {self.stream_key: ">"}, count=1 + ) + + if messages and messages[0][1]: + msg_id = messages[0][1][0][0] + self.r.xack(self.stream_key, group_name, msg_id) + + elapsed = (time.perf_counter() - start) * 1000 + consumer_latencies.append(elapsed) + + return consumer_latencies + + start_time = time.perf_counter() + + with ThreadPoolExecutor(max_workers=num_consumers) as executor: + futures = [ + executor.submit(process_messages, f"consumer_{i}") for i in range(num_consumers) + ] + for future in as_completed(futures): + latencies.extend(future.result()) + + duration = time.perf_counter() - start_time + mem_after = self.get_memory_usage() + + self.r.xgroup_destroy(self.stream_key, group_name) + + return self._create_result( + "consumer_group", "XREADGROUP+XACK", num_ops, duration, latencies, mem_before, mem_after + ) + + def benchmark_mixed_workload( + self, duration_seconds: int = 30, num_threads: int = 5 + ) -> List[BenchmarkResult]: + """Benchmark realistic mixed workload (producers + consumers)""" + print(f"\nBenchmarking Mixed Workload ({duration_seconds}s, {num_threads} threads)...") + + self.cleanup_stream() + group_name = "mixed_group" + + try: + self.r.xgroup_create(self.stream_key, group_name, id="0", mkstream=True) + except redis.ResponseError: + pass + + mem_before = self.get_memory_usage() + stop_flag = False + stats = {"producers": 0, "consumers": 0, "acks": 0} + latencies = {"xadd": [], "xread": [], "xack": []} + + def producer(): + count = 0 + while not stop_flag: + start = time.perf_counter() + self.r.xadd(self.stream_key, _make_payload(count)) + latencies["xadd"].append((time.perf_counter() - start) * 1000) + count += 1 + stats["producers"] = count + + def consumer(consumer_id): + count = 0 + while not stop_flag: + start = time.perf_counter() + messages = self.r.xreadgroup( + group_name, + f"consumer_{consumer_id}", + {self.stream_key: ">"}, + count=5, + block=100, + ) + latencies["xread"].append((time.perf_counter() - start) * 1000) + + if messages: + for msg_id in [m[0] for m in messages[0][1]]: + ack_start = time.perf_counter() + self.r.xack(self.stream_key, group_name, msg_id) + latencies["xack"].append((time.perf_counter() - ack_start) * 1000) + stats["acks"] += 1 + count += 1 + stats["consumers"] = count + + start_time = time.perf_counter() + threads = [] + + # Start producers and consumers + num_producers = max(1, num_threads // 3) + num_consumers = num_threads - num_producers + + with ThreadPoolExecutor(max_workers=num_threads) as executor: + for i in range(num_producers): + executor.submit(producer) + for i in range(num_consumers): + executor.submit(consumer, i) + + time.sleep(duration_seconds) + stop_flag = True + + actual_duration = time.perf_counter() - start_time + mem_after = self.get_memory_usage() + + results = [] + for cmd, lats in latencies.items(): + if lats: + results.append( + self._create_result( + "mixed_workload", + cmd.upper(), + len(lats), + actual_duration, + lats, + mem_before, + mem_after, + ) + ) + + self.r.xgroup_destroy(self.stream_key, group_name) + + return results + + def _create_result( + self, + scenario: str, + command: str, + num_ops: int, + duration: float, + latencies: List[float], + mem_before: float, + mem_after: float, + ) -> BenchmarkResult: + """Create a BenchmarkResult from metrics""" + sorted_latencies = sorted(latencies) + + return BenchmarkResult( + scenario=scenario, + command=command, + num_operations=num_ops, + duration_seconds=round(duration, 3), + throughput_ops_sec=round(num_ops / duration, 2), + min_latency_ms=round(min(latencies), 3), + max_latency_ms=round(max(latencies), 3), + avg_latency_ms=round(statistics.mean(latencies), 3), + p50_latency_ms=round(sorted_latencies[int((len(sorted_latencies) - 1) * 0.50)], 3), + p95_latency_ms=round(sorted_latencies[int((len(sorted_latencies) - 1) * 0.95)], 3), + p99_latency_ms=round(sorted_latencies[int((len(sorted_latencies) - 1) * 0.99)], 3), + memory_before_mb=round(mem_before, 2), + memory_after_mb=round(mem_after, 2), + memory_delta_mb=round(mem_after - mem_before, 2), + timestamp=datetime.now().isoformat(), + ) + + def save_results_csv(self, filename: str = "redis_benchmark_results.csv"): + """Save results to CSV""" + if not self.results: + print("No results to save") + return + + with open(filename, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=asdict(self.results[0]).keys()) + writer.writeheader() + for result in self.results: + writer.writerow(asdict(result)) + + print(f"\nResults saved to {filename}") + + def print_results_summary(self): + """Print summary of all results""" + print("\n" + "=" * 120) + print("BENCHMARK RESULTS SUMMARY") + print("=" * 120) + + for result in self.results: + print(f"\n{result.scenario.upper()} - {result.command}") + print(f" Operations: {result.num_operations:,}") + print(f" Duration: {result.duration_seconds:.3f}s") + print(f" Throughput: {result.throughput_ops_sec:,.0f} ops/sec") + print( + f" Latency - Min: {result.min_latency_ms:.3f}ms, " + f"Avg: {result.avg_latency_ms:.3f}ms, " + f"P95: {result.p95_latency_ms:.3f}ms, " + f"P99: {result.p99_latency_ms:.3f}ms, " + f"Max: {result.max_latency_ms:.3f}ms" + ) + print( + f" Memory - Before: {result.memory_before_mb:.2f}MB, " + f"After: {result.memory_after_mb:.2f}MB, " + f"Delta: {result.memory_delta_mb:+.2f}MB" + ) + + print("\n" + "=" * 120) + + def run_full_benchmark_suite( + self, + xadd_num_ops=10000, + xread_num_ops=5000, + xread_num_entries=5000, + xrange_num_ops=5000, + xrange_num_entries=5000, + consumer_group_num_ops=5000, + mixed_duration_seconds=30, + threads=4, + ): + """Run all benchmarks""" + print("\nStarting Redis Stream Benchmark Suite...") + print(f"Stream Key: {self.stream_key}") + + # Operation benchmarks + self.results.append(self.benchmark_xadd(num_ops=xadd_num_ops, num_threads=threads)) + self.results.append( + self.benchmark_xread(num_ops=xread_num_ops, num_entries=xread_num_entries) + ) + self.results.append( + self.benchmark_xrange(num_ops=xrange_num_ops, num_entries=xrange_num_entries) + ) + self.results.append( + self.benchmark_consumer_group(num_ops=consumer_group_num_ops, num_consumers=threads) + ) + + # Mixed workload + mixed_results = self.benchmark_mixed_workload( + duration_seconds=mixed_duration_seconds, num_threads=threads + ) + self.results.extend(mixed_results) + + self.print_results_summary() + self.cleanup_stream() + + +def positive_int(value): + """Validator for positive integers""" + num = int(value) + if num <= 0: + raise argparse.ArgumentTypeError(f"{value} must be greater than 0") + return num + + +def main(): + parser = argparse.ArgumentParser(description="Redis Stream Benchmark Suite") + parser.add_argument("--uri", help="Redis connection URI (redis://[:password@]host[:port][/db])") + parser.add_argument("--hostname", default="localhost", help="Redis hostname") + parser.add_argument("--port", type=int, default=6379, help="Redis port") + parser.add_argument("--db", type=int, default=0, help="Redis database number") + parser.add_argument("--password", help="Redis password") + parser.add_argument("--output", default="redis_benchmark_results.csv", help="Output CSV file") + parser.add_argument("--full", action="store_true", help="Run full benchmark suite") + parser.add_argument("--xadd", action="store_true", help="Benchmark XADD only") + parser.add_argument("--xread", action="store_true", help="Benchmark XREAD only") + parser.add_argument("--xrange", action="store_true", help="Benchmark XRANGE only") + parser.add_argument("--consumer-group", action="store_true", help="Benchmark consumer groups") + parser.add_argument("--mixed", action="store_true", help="Benchmark mixed workload") + + # Benchmark parameters + parser.add_argument( + "--xadd-num-ops", + type=positive_int, + default=10000, + help="Number of XADD operations (default: 10000)", + ) + parser.add_argument( + "--xread-num-ops", + type=positive_int, + default=5000, + help="Number of XREAD operations (default: 5000)", + ) + parser.add_argument( + "--xread-num-entries", + type=positive_int, + default=5000, + help="Number of entries to pre-populate for XREAD (default: 5000)", + ) + parser.add_argument( + "--xrange-num-ops", + type=positive_int, + default=5000, + help="Number of XRANGE operations (default: 5000)", + ) + parser.add_argument( + "--xrange-num-entries", + type=positive_int, + default=5000, + help="Number of entries to pre-populate for XRANGE (default: 5000)", + ) + parser.add_argument( + "--consumer-group-num-ops", + type=positive_int, + default=5000, + help="Number of consumer group operations (default: 5000)", + ) + parser.add_argument( + "--mixed-duration-seconds", + type=positive_int, + default=30, + help="Duration of mixed workload in seconds (default: 30)", + ) + parser.add_argument( + "--threads", + type=positive_int, + default=4, + help="Number of threads for XADD, consumer group, and mixed workload (default: 4)", + ) + parser.add_argument( + "--seed", + type=int, + default=0, + help="Random seed for reproducible payload generation (default: None, non-deterministic)", + ) + + args = parser.parse_args() + + random.seed(args.seed) + + bench = RedisStreamBenchmark( + host=args.hostname, + port=args.port, + db=args.db, + password=args.password, + uri=args.uri, + ) + + try: + if args.full or not any( + [args.xadd, args.xread, args.xrange, args.consumer_group, args.mixed] + ): + bench.run_full_benchmark_suite( + xadd_num_ops=args.xadd_num_ops, + xread_num_ops=args.xread_num_ops, + xread_num_entries=args.xread_num_entries, + xrange_num_ops=args.xrange_num_ops, + xrange_num_entries=args.xrange_num_entries, + consumer_group_num_ops=args.consumer_group_num_ops, + mixed_duration_seconds=args.mixed_duration_seconds, + threads=args.threads, + ) + else: + if args.xadd: + bench.results.append( + bench.benchmark_xadd(num_ops=args.xadd_num_ops, num_threads=args.threads) + ) + if args.xread: + bench.results.append( + bench.benchmark_xread( + num_ops=args.xread_num_ops, num_entries=args.xread_num_entries + ) + ) + if args.xrange: + bench.results.append( + bench.benchmark_xrange( + num_ops=args.xrange_num_ops, num_entries=args.xrange_num_entries + ) + ) + if args.consumer_group: + bench.results.append( + bench.benchmark_consumer_group( + num_ops=args.consumer_group_num_ops, num_consumers=args.threads + ) + ) + if args.mixed: + bench.results.extend( + bench.benchmark_mixed_workload( + duration_seconds=args.mixed_duration_seconds, num_threads=args.threads + ) + ) + + bench.print_results_summary() + + bench.save_results_csv(args.output) + + except KeyboardInterrupt: + print("\n\nBenchmark interrupted") + except Exception as e: + print(f"\nBenchmark failed: {e}") + raise + + +if __name__ == "__main__": + main() diff --git a/tools/stream/stream_benchmark_analyzer.py b/tools/stream/stream_benchmark_analyzer.py new file mode 100755 index 000000000000..496c8404cf6c --- /dev/null +++ b/tools/stream/stream_benchmark_analyzer.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +""" +Redis Stream Benchmark Analysis Tool +Compares results across runs, generates visualizations, and provides insights + +Written with Claude Code (https://claude.com/claude-code) +""" + +import pandas as pd +import argparse +from pathlib import Path +from typing import Dict, List +import statistics + + +class BenchmarkAnalyzer: + def __init__(self, results_files: List[str]): + """Load and parse benchmark results""" + self.data = [] + self.labels = [] + + for filepath in results_files: + label = Path(filepath).stem + self.data.append(pd.read_csv(filepath)) + self.labels.append(label) + + print(f"Loaded {len(self.data)} result file(s)") + + def compare_throughput(self): + """Compare throughput across runs""" + print("\n" + "=" * 100) + print("THROUGHPUT COMPARISON (ops/sec)") + print("=" * 100) + + for i, df in enumerate(self.data): + print(f"\n{self.labels[i]}:") + print( + df[["scenario", "command", "num_operations", "throughput_ops_sec"]].to_string( + index=False + ) + ) + + def compare_latency(self, percentile: str = "p95"): + """Compare latency at specific percentile""" + print(f"\n{'='*100}") + print(f"LATENCY COMPARISON (P{percentile[1:].upper()} - ms)") + print("=" * 100) + + col_name = f"{percentile}_latency_ms" + + for i, df in enumerate(self.data): + print(f"\n{self.labels[i]}:") + print(df[["scenario", "command", "num_operations", col_name]].to_string(index=False)) + + def compare_memory(self): + """Compare memory usage""" + print("\n" + "=" * 100) + print("MEMORY USAGE COMPARISON (MB)") + print("=" * 100) + + for i, df in enumerate(self.data): + print(f"\n{self.labels[i]}:") + cols = [ + "scenario", + "command", + "num_operations", + "memory_before_mb", + "memory_after_mb", + "memory_delta_mb", + ] + print(df[cols].to_string(index=False)) + + def efficiency_analysis(self): + """Analyze efficiency: throughput per MB""" + print("\n" + "=" * 100) + print("EFFICIENCY ANALYSIS (throughput / memory delta)") + print("=" * 100) + + for i, df in enumerate(self.data): + print(f"\n{self.labels[i]}:") + + # Calculate efficiency metric + df_copy = df.copy() + df_copy["efficiency"] = df_copy.apply( + lambda row: ( + row["throughput_ops_sec"] / abs(row["memory_delta_mb"]) + if row["memory_delta_mb"] != 0 + else row["throughput_ops_sec"] + ), + axis=1, + ) + + print( + df_copy[ + ["scenario", "command", "throughput_ops_sec", "memory_delta_mb", "efficiency"] + ].to_string(index=False) + ) + + def identify_bottlenecks(self): + """Identify performance bottlenecks""" + print("\n" + "=" * 100) + print("BOTTLENECK ANALYSIS") + print("=" * 100) + + for i, df in enumerate(self.data): + print(f"\n{self.labels[i]}:") + + # Highest latency scenarios + print("\n Highest P99 Latency:") + top_latency = df.nlargest(3, "p99_latency_ms")[ + ["scenario", "command", "p99_latency_ms"] + ] + print(top_latency.to_string(index=False)) + + # Lowest throughput scenarios + print("\n Lowest Throughput:") + low_throughput = df.nsmallest(3, "throughput_ops_sec")[ + ["scenario", "command", "throughput_ops_sec"] + ] + print(low_throughput.to_string(index=False)) + + # Highest memory overhead + print("\n Highest Memory Delta:") + high_memory = df.nlargest(3, "memory_delta_mb")[ + ["scenario", "command", "memory_delta_mb"] + ] + print(high_memory.to_string(index=False)) + + def cross_run_comparison(self): + """Compare same scenarios across different runs""" + if len(self.data) < 2: + print("\nNeed at least 2 result files for cross-run comparison") + return + + print("\n" + "=" * 100) + print("CROSS-RUN COMPARISON") + print("=" * 100) + + # Get common scenarios + scenarios = set(self.data[0]["scenario"].unique()) + for df in self.data[1:]: + scenarios &= set(df["scenario"].unique()) + + for scenario in sorted(scenarios): + print(f"\n{scenario.upper()}:") + + # Compare throughput + print("\n Throughput (ops/sec):") + for i, df in enumerate(self.data): + scenario_data = df[df["scenario"] == scenario] + avg_throughput = scenario_data["throughput_ops_sec"].mean() + print(f" {self.labels[i]}: {avg_throughput:,.0f}") + + # Compare latency + print("\n P95 Latency (ms):") + for i, df in enumerate(self.data): + scenario_data = df[df["scenario"] == scenario] + avg_latency = scenario_data["p95_latency_ms"].mean() + print(f" {self.labels[i]}: {avg_latency:.3f}") + + # Compare memory delta + print("\n Memory Delta (MB):") + for i, df in enumerate(self.data): + scenario_data = df[df["scenario"] == scenario] + avg_memory = scenario_data["memory_delta_mb"].mean() + print(f" {self.labels[i]}: {avg_memory:+.2f}") + + def regression_detection(self): + """Detect performance regressions between runs""" + if len(self.data) < 2: + print("\nNeed at least 2 result files for regression detection") + return + + print("\n" + "=" * 100) + print("PERFORMANCE REGRESSION DETECTION") + print("=" * 100) + + baseline = self.data[0] + + for i in range(1, len(self.data)): + print(f"\n{self.labels[0]} vs {self.labels[i]}:") + + current = self.data[i] + + for scenario in baseline["scenario"].unique(): + baseline_rows = baseline[baseline["scenario"] == scenario] + current_rows = current[current["scenario"] == scenario] + + if baseline_rows.empty or current_rows.empty: + continue + + baseline_throughput = baseline_rows["throughput_ops_sec"].mean() + current_throughput = current_rows["throughput_ops_sec"].mean() + + baseline_latency = baseline_rows["p95_latency_ms"].mean() + current_latency = current_rows["p95_latency_ms"].mean() + + baseline_memory = baseline_rows["memory_delta_mb"].mean() + current_memory = current_rows["memory_delta_mb"].mean() + + throughput_change = ( + (current_throughput - baseline_throughput) / baseline_throughput + ) * 100 + latency_change = ((current_latency - baseline_latency) / baseline_latency) * 100 + memory_change = current_memory - baseline_memory + + print(f"\n {scenario}:") + print( + f" Throughput: {baseline_throughput:,.0f} → {current_throughput:,.0f} " + f"({throughput_change:+.1f}%)" + ) + print( + f" P95 Latency: {baseline_latency:.3f}ms → {current_latency:.3f}ms " + f"({latency_change:+.1f}%)" + ) + print( + f" Memory Delta: {baseline_memory:+.2f}MB → {current_memory:+.2f}MB " + f"({memory_change:+.2f}MB)" + ) + + if throughput_change < -5: + print(f" REGRESSION: Throughput decreased significantly") + elif latency_change > 10: + print(f" REGRESSION: Latency increased significantly") + elif throughput_change > 10: + print(f" IMPROVEMENT: Throughput increased") + + def percentile_distribution(self): + """Show latency percentile distribution""" + print("\n" + "=" * 100) + print("LATENCY PERCENTILE DISTRIBUTION") + print("=" * 100) + + for i, df in enumerate(self.data): + print(f"\n{self.labels[i]}:") + print( + df[ + [ + "scenario", + "command", + "min_latency_ms", + "p50_latency_ms", + "p95_latency_ms", + "p99_latency_ms", + "max_latency_ms", + ] + ].to_string(index=False) + ) + + def scenario_analysis(self): + """Analyze performance by scenario""" + print("\n" + "=" * 100) + print("SCENARIO-BASED ANALYSIS") + print("=" * 100) + + for i, df in enumerate(self.data): + print(f"\n{self.labels[i]}:") + + for scenario in sorted(df["scenario"].unique()): + scenario_data = df[df["scenario"] == scenario] + + print(f"\n {scenario}:") + print( + f" Avg Throughput: {scenario_data['throughput_ops_sec'].mean():,.0f} ops/sec" + ) + print(f" Avg P95 Latency: {scenario_data['p95_latency_ms'].mean():.3f}ms") + print(f" Avg Memory Delta: {scenario_data['memory_delta_mb'].mean():+.2f}MB") + print(f" Commands: {', '.join(scenario_data['command'].unique())}") + + def export_comparison_csv(self, output_file: str = "comparison_results.csv"): + """Export comparison to CSV""" + if len(self.data) < 2: + print("Need at least 2 files for comparison export") + return + + baseline = self.data[0] + comparison_rows = [] + + for scenario in baseline["scenario"].unique(): + baseline_rows = baseline[baseline["scenario"] == scenario] + + for _, baseline_row in baseline_rows.iterrows(): + row = { + "scenario": baseline_row["scenario"], + "command": baseline_row["command"], + "baseline_throughput": baseline_row["throughput_ops_sec"], + "baseline_p95_latency": baseline_row["p95_latency_ms"], + "baseline_memory_delta": baseline_row["memory_delta_mb"], + } + + for i in range(1, len(self.data)): + current = self.data[i] + current_rows = current[ + (current["scenario"] == baseline_row["scenario"]) + & (current["command"] == baseline_row["command"]) + ] + + if not current_rows.empty: + current_row = current_rows.iloc[0] + prefix = self.labels[i] + + row[f"{prefix}_throughput"] = current_row["throughput_ops_sec"] + row[f"{prefix}_p95_latency"] = current_row["p95_latency_ms"] + row[f"{prefix}_memory_delta"] = current_row["memory_delta_mb"] + row[f"{prefix}_throughput_change_%"] = ( + (current_row["throughput_ops_sec"] - baseline_row["throughput_ops_sec"]) + / baseline_row["throughput_ops_sec"] + * 100 + ) + row[f"{prefix}_memory_delta_change_MB"] = ( + current_row["memory_delta_mb"] - baseline_row["memory_delta_mb"] + ) + + comparison_rows.append(row) + + comparison_df = pd.DataFrame(comparison_rows) + comparison_df.to_csv(output_file, index=False) + print(f"\nComparison exported to {output_file}") + + +def main(): + parser = argparse.ArgumentParser(description="Analyze Redis benchmark results") + parser.add_argument("files", nargs="+", help="Result CSV files to analyze") + parser.add_argument("--throughput", action="store_true", help="Show throughput comparison") + parser.add_argument("--latency", action="store_true", help="Show latency comparison") + parser.add_argument("--memory", action="store_true", help="Show memory comparison") + parser.add_argument("--efficiency", action="store_true", help="Show efficiency analysis") + parser.add_argument("--bottlenecks", action="store_true", help="Identify bottlenecks") + parser.add_argument("--regression", action="store_true", help="Detect regressions") + parser.add_argument("--percentiles", action="store_true", help="Show percentile distribution") + parser.add_argument("--scenarios", action="store_true", help="Analyze by scenario") + parser.add_argument("--cross", action="store_true", help="Cross-run comparison") + parser.add_argument("--all", action="store_true", help="Run all analyses") + parser.add_argument("--export", help="Export comparison CSV") + + args = parser.parse_args() + + analyzer = BenchmarkAnalyzer(args.files) + + if args.all or not any( + [ + args.throughput, + args.latency, + args.memory, + args.efficiency, + args.bottlenecks, + args.regression, + args.percentiles, + args.scenarios, + args.cross, + ] + ): + analyzer.compare_throughput() + analyzer.compare_latency() + analyzer.compare_memory() + analyzer.efficiency_analysis() + analyzer.identify_bottlenecks() + analyzer.percentile_distribution() + analyzer.scenario_analysis() + analyzer.cross_run_comparison() + analyzer.regression_detection() + else: + if args.throughput: + analyzer.compare_throughput() + if args.latency: + analyzer.compare_latency() + if args.memory: + analyzer.compare_memory() + if args.efficiency: + analyzer.efficiency_analysis() + if args.bottlenecks: + analyzer.identify_bottlenecks() + if args.percentiles: + analyzer.percentile_distribution() + if args.scenarios: + analyzer.scenario_analysis() + if args.cross: + analyzer.cross_run_comparison() + if args.regression: + analyzer.regression_detection() + + if args.export: + analyzer.export_comparison_csv(args.export) + + +if __name__ == "__main__": + main()