Skip to content

Commit 504ff4e

Browse files
Sujit Maharjanfacebook-github-bot
authored andcommitted
Auto skip Compression (facebook#13674)
Summary: **Context:** RocksDB's current compression approach rejects blocks if the compressed size exceeds a predefined threshold. To optimize performance, we aim to develop an algorithm that dynamically stops and resumes block compression attempts based on past rejection data. **Summary:** The goal of this milestone is to design, implement, and evaluate an algorithm that intelligently skips and resumes block compression attempts in RocksDB. The algorithm tracks whether randomly selected blocks was rejected, compressed or bypassed and using data of window size to determine the current rejection rate. The calculate rejection rate is used to decide whether to pause and resume compression attempts. We measure the effectiveness of skipping and resuming compression using DB bench and identify any concerning regressions in correctness and performance. Pull Request resolved: facebook#13674 Test Plan: 1. Test case to see if it can automatically start compression on compression friendly workload and see if it can automatically stop compression on non-compression friendly workload (auto_skip_compresor_test.cc) 3. Regression analysis to prove that no significant performance attempt ```bash SUFFIX=`tty | sed 's|/|_|g'`; for ARGS in "-compression_parallel_threads=1 -compression_type=zstd -compression_manager=none" "-compression_parallel_threads=4 -compression_type=zstd -compression_manager=none" "-compression_parallel_threads=1 -compression_type=zstd -compression_manager=autoskip" "-compression_parallel_threads=4 -compression_type=zstd -compression_manager=autoskip" ; do echo $ARGS; (for I in `seq 1 20`; do ./db_bench -db=/dev/shm/dbbench$SUFFIX --benchmarks=fillseq -num=10000000 -compaction_style=2 -fifo_compaction_max_table_files_size_mb=1000 -fifo_compaction_allow_compaction=0 -disable_wal -write_buffer_size=12000000 $ARGS 2>&1 | grep micros/op; done) | awk '{n++; sum += $5;} END { print int(sum / n); }'; done ``` Measurement experiment | throughput (% change from main branch) | |---------------|--------------------------------| compression manager = none (main branch) | 1106890.35 ops/s compression manager = none (auto skip) | 1097574.55 ops/s (-0.84%) compression manager = auto skip (auto skip branch) | 1133432.9 ops/s (+2.4%) Reviewed By: hx235 Differential Revision: D76220795 Pulled By: shubhajeet fbshipit-source-id: 0f46ab34da1b451f8907306afba221503e6e22a5
1 parent e3a91ec commit 504ff4e

11 files changed

Lines changed: 484 additions & 35 deletions

File tree

BUCK

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
249249
"trace_replay/trace_record_result.cc",
250250
"trace_replay/trace_replay.cc",
251251
"util/async_file_reader.cc",
252+
"util/auto_skip_compressor.cc",
252253
"util/build_version.cc",
253254
"util/cleanable.cc",
254255
"util/coding.cc",
@@ -4710,6 +4711,12 @@ cpp_unittest_wrapper(name="compressed_secondary_cache_test",
47104711
extra_compiler_flags=[])
47114712

47124713

4714+
cpp_unittest_wrapper(name="compression_test",
4715+
srcs=["util/compression_test.cc"],
4716+
deps=[":rocksdb_test_lib"],
4717+
extra_compiler_flags=[])
4718+
4719+
47134720
cpp_unittest_wrapper(name="configurable_test",
47144721
srcs=["options/configurable_test.cc"],
47154722
deps=[":rocksdb_test_lib"],

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,7 @@ set(SOURCES
880880
util/comparator.cc
881881
util/compression.cc
882882
util/simple_mixed_compressor.cc
883+
util/auto_skip_compressor.cc
883884
util/compression_context_cache.cc
884885
util/concurrent_task_limiter_impl.cc
885886
util/crc32c.cc
@@ -1446,6 +1447,7 @@ if(WITH_TESTS)
14461447
table/table_test.cc
14471448
table/block_fetcher_test.cc
14481449
test_util/testutil_test.cc
1450+
util/compression_test.cc
14491451
trace_replay/block_cache_tracer_test.cc
14501452
trace_replay/io_tracer_test.cc
14511453
tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc

Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,6 +1491,9 @@ db_test: $(OBJ_DIR)/db/db_test.o $(TEST_LIBRARY) $(LIBRARY)
14911491
db_test2: $(OBJ_DIR)/db/db_test2.o $(TEST_LIBRARY) $(LIBRARY)
14921492
$(AM_LINK)
14931493

1494+
compression_test: $(OBJ_DIR)/util/compression_test.o $(TEST_LIBRARY) $(LIBRARY)
1495+
$(AM_LINK)
1496+
14941497
db_logical_block_size_cache_test: $(OBJ_DIR)/db/db_logical_block_size_cache_test.o $(TEST_LIBRARY) $(LIBRARY)
14951498
$(AM_LINK)
14961499

db_stress_tool/db_stress_test_base.cc

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3412,21 +3412,27 @@ void StressTest::Open(SharedState* shared, bool reopen) {
34123412
InitializeOptionsFromFlags(cache_, filter_policy_, options_);
34133413
}
34143414
InitializeOptionsGeneral(cache_, filter_policy_, sqfc_factory_, options_);
3415-
if (!strcasecmp(FLAGS_compression_manager.c_str(), "mixed")) {
3416-
// Currently limited to ZSTD compression. Table property compression_name
3417-
// needs to set to zstd for now even when there can be more than one
3418-
// algorithm in the table under your compressor.
3419-
options_.compression = kZSTD;
3420-
options_.bottommost_compression = kZSTD;
3421-
if (!ZSTD_Supported()) {
3422-
fprintf(stderr,
3423-
"ZSTD compression not supported thus mixed compression cannot be "
3424-
"used\n");
3425-
exit(1);
3415+
if (strcasecmp(FLAGS_compression_manager.c_str(), "none")) {
3416+
if (!strcasecmp(FLAGS_compression_manager.c_str(), "mixed")) {
3417+
// Currently limited to ZSTD compression. Table property compression_name
3418+
// needs to set to zstd for now even when there can be more than one
3419+
// algorithm in the table under your compressor.
3420+
if (!ZSTD_Supported()) {
3421+
fprintf(
3422+
stderr,
3423+
"ZSTD compression not supported thus mixed compression cannot be "
3424+
"used\n");
3425+
exit(1);
3426+
}
3427+
auto mgr = std::make_shared<RoundRobinManager>(
3428+
GetDefaultBuiltinCompressionManager());
3429+
options_.compression_manager = mgr;
3430+
options_.compression = kZSTD;
3431+
options_.bottommost_compression = kZSTD;
3432+
} else if (!strcasecmp(FLAGS_compression_manager.c_str(), "autoskip")) {
3433+
options_.compression_manager = CreateAutoSkipCompressionManager(
3434+
GetDefaultBuiltinCompressionManager());
34263435
}
3427-
auto mgr = std::make_shared<RoundRobinManager>(
3428-
GetDefaultBuiltinCompressionManager());
3429-
options_.compression_manager = mgr;
34303436
} else if (!strcasecmp(FLAGS_compression_manager.c_str(), "none")) {
34313437
// Nothing to do using default compression manager
34323438
} else {

include/rocksdb/advanced_compression.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,5 +506,10 @@ class CompressionManagerWrapper : public CompressionManager {
506506
// compression_manager=nullptr with this
507507
const std::shared_ptr<CompressionManager>&
508508
GetDefaultBuiltinCompressionManager();
509-
509+
// Gets CompressionManager designed for the automated compression strategy.
510+
// This may include deciding to compress or not.
511+
// In future should be able to select compression algorithm based on the CPU
512+
// utilization and IO constraints.
513+
std::shared_ptr<CompressionManagerWrapper> CreateAutoSkipCompressionManager(
514+
std::shared_ptr<CompressionManager> wrapped);
510515
} // namespace ROCKSDB_NAMESPACE

src.mk

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ LIB_SOURCES = \
243243
util/compaction_job_stats_impl.cc \
244244
util/comparator.cc \
245245
util/compression.cc \
246+
util/auto_skip_compressor.cc \
246247
util/compression_context_cache.cc \
247248
util/concurrent_task_limiter_impl.cc \
248249
util/crc32c.cc \
@@ -592,6 +593,7 @@ TEST_MAIN_SOURCES = \
592593
table/table_test.cc \
593594
table/block_fetcher_test.cc \
594595
test_util/testutil_test.cc \
596+
util/compression_test.cc \
595597
tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc \
596598
tools/io_tracer_parser_test.cc \
597599
tools/ldb_cmd_test.cc \

tools/db_bench_tool.cc

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2903,14 +2903,13 @@ class Benchmark {
29032903
// mixed compression manager expect compression type to be expliciltiy
29042904
// configured through Options to be zstd
29052905
auto compression = std::string("zstd");
2906-
if (!strcasecmp(FLAGS_compression_manager.c_str(), "mixed")) {
2907-
fprintf(stdout, "Compression manager: mixed\n");
2908-
fprintf(stdout, "Compression: zstd\n");
2909-
} else {
2910-
fprintf(stdout, "Compression manager: none\n");
2906+
if (!strcasecmp(FLAGS_compression_manager.c_str(), "none")) {
29112907
compression = CompressionTypeToString(FLAGS_compression_type_e);
2912-
fprintf(stdout, "Compression: %s\n", compression.c_str());
2908+
} else {
2909+
fprintf(stdout, "Compression manager: %s\n",
2910+
FLAGS_compression_manager.c_str());
29132911
}
2912+
fprintf(stdout, "Compression: %s\n", compression.c_str());
29142913
fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
29152914
FLAGS_sample_for_compression);
29162915
if (options.memtable_factory != nullptr) {
@@ -4634,19 +4633,37 @@ class Benchmark {
46344633
FLAGS_level0_file_num_compaction_trigger;
46354634
options.level0_slowdown_writes_trigger =
46364635
FLAGS_level0_slowdown_writes_trigger;
4637-
if (!strcasecmp(FLAGS_compression_manager.c_str(), "mixed")) {
4638-
// Need to list zstd in the compression_name table property if it's
4639-
// potentially used by being in the mix (i.e., potentially at least one
4640-
// data block in the table is compressed by zstd). This ensures proper
4641-
// context and dictionary handling, and prevents crashes in older RocksDB
4642-
// versions.
4643-
options.compression = kZSTD;
4644-
options.bottommost_compression = kZSTD;
4645-
auto mgr = std::make_shared<RoundRobinManager>(
4646-
GetDefaultBuiltinCompressionManager());
4647-
options.compression_manager = mgr;
4648-
} else {
4636+
if (!strcasecmp(FLAGS_compression_manager.c_str(), "none")) {
46494637
options.compression = FLAGS_compression_type_e;
4638+
} else {
4639+
std::shared_ptr<CompressionManagerWrapper> mgr;
4640+
if (!strcasecmp(FLAGS_compression_manager.c_str(), "mixed")) {
4641+
// Need to list zstd in the compression_name table property if it's
4642+
// potentially used by being in the mix (i.e., potentially at least one
4643+
// data block in the table is compressed by zstd). This ensures proper
4644+
// context and dictionary handling, and prevents crashes in older
4645+
// RocksDB versions.
4646+
options.compression = kZSTD;
4647+
options.bottommost_compression = kZSTD;
4648+
4649+
mgr = std::make_shared<RoundRobinManager>(
4650+
GetDefaultBuiltinCompressionManager());
4651+
} else if (!strcasecmp(FLAGS_compression_manager.c_str(), "autoskip")) {
4652+
options.compression = FLAGS_compression_type_e;
4653+
if (FLAGS_compression_type_e == kNoCompression) {
4654+
fprintf(stderr,
4655+
"Compression type must not be no Compression when using "
4656+
"autoskip");
4657+
ErrorExit();
4658+
}
4659+
mgr = CreateAutoSkipCompressionManager(
4660+
GetDefaultBuiltinCompressionManager());
4661+
} else {
4662+
// not defined -> exit with error
4663+
fprintf(stderr, "Requested compression manager not supported");
4664+
ErrorExit();
4665+
}
4666+
options.compression_manager = mgr;
46504667
}
46514668

46524669
if (FLAGS_simulate_hybrid_fs_file != "") {

tools/db_crashtest.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -348,8 +348,8 @@
348348
"memtable_op_scan_flush_trigger": lambda: random.choice([0, 10, 100, 1000]),
349349
"memtable_avg_op_scan_flush_trigger": lambda: random.choice([0, 2, 20, 200]),
350350
"ingest_wbwi_one_in": lambda: random.choice([0, 0, 100, 500]),
351-
"compression_manager": lambda: random.choice(["mixed", "none"]),
352351
"universal_reduce_file_locking": lambda: random.randint(0, 1),
352+
"compression_manager": lambda: random.choice(["mixed", "none", "autoskip"]),
353353
}
354354

355355
_TEST_DIR_ENV_VAR = "TEST_TMPDIR"
@@ -1004,7 +1004,20 @@ def finalize_and_sanitize(src_params):
10041004
if dest_params.get("block_align") == 1:
10051005
dest_params["block_align"] = 0
10061006
dest_params["compression_type"] = "zstd"
1007-
dest_params["bottommost_compression_type"] = "none"
1007+
dest_params["bottommost_compression_type"] = "zstd"
1008+
elif dest_params.get("compression_manager") == "autoskip":
1009+
# disabling compression parallel threads if mixed manager is being used as the predictor is not thread safe
1010+
dest_params["compression_parallel_threads"] = 1
1011+
# esuring the compression is being used
1012+
if dest_params.get("compression_type") == "none":
1013+
dest_params["compression_type"] = random.choice(
1014+
["snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]
1015+
)
1016+
if dest_params.get("bottommost_compression_type") == "none":
1017+
dest_params["bottommost_compression_type"] = random.choice(
1018+
["snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]
1019+
)
1020+
dest_params["block_align"] = 0
10081021
else:
10091022
# Enabling block_align with compression is not supported
10101023
if dest_params.get("block_align") == 1:

util/auto_skip_compressor.cc

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// Copyright (c) Meta Platforms, Inc. and affiliates.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
//
6+
7+
#include "util/auto_skip_compressor.h"
8+
9+
#include "options/options_helper.h"
10+
#include "rocksdb/advanced_compression.h"
11+
#include "util/random.h"
12+
namespace ROCKSDB_NAMESPACE {
13+
14+
int CompressionRejectionProbabilityPredictor::Predict() const {
15+
return pred_rejection_prob_percentage_;
16+
}
17+
18+
size_t CompressionRejectionProbabilityPredictor::attempted_compression_count()
19+
const {
20+
return rejected_count_ + compressed_count_;
21+
}
22+
23+
bool CompressionRejectionProbabilityPredictor::Record(
24+
Slice uncompressed_block_data, std::string* compressed_output,
25+
const CompressionOptions& opts) {
26+
if (compressed_output->size() >
27+
(static_cast<uint64_t>(opts.max_compressed_bytes_per_kb) *
28+
uncompressed_block_data.size()) >>
29+
10) {
30+
rejected_count_++;
31+
} else {
32+
compressed_count_++;
33+
}
34+
if (attempted_compression_count() >= window_size_) {
35+
pred_rejection_prob_percentage_ = static_cast<int>(
36+
rejected_count_ * 100 / (compressed_count_ + rejected_count_));
37+
compressed_count_ = 0;
38+
rejected_count_ = 0;
39+
assert(attempted_compression_count() == 0);
40+
}
41+
return true;
42+
}
43+
AutoSkipCompressorWrapper::AutoSkipCompressorWrapper(
44+
std::unique_ptr<Compressor> compressor, const CompressionOptions& opts,
45+
const CompressionType type)
46+
: CompressorWrapper::CompressorWrapper(std::move(compressor)),
47+
opts_(opts),
48+
type_(type),
49+
predictor_(
50+
std::make_shared<CompressionRejectionProbabilityPredictor>(10)) {
51+
(void)type_;
52+
(void)opts_;
53+
}
54+
55+
Status AutoSkipCompressorWrapper::CompressBlock(
56+
Slice uncompressed_data, std::string* compressed_output,
57+
CompressionType* out_compression_type, ManagedWorkingArea* wa) {
58+
bool exploration =
59+
Random::GetTLSInstance()->PercentTrue(kExplorationPercentage);
60+
TEST_SYNC_POINT_CALLBACK(
61+
"AutoSkipCompressorWrapper::CompressBlock::exploitOrExplore",
62+
&exploration);
63+
if (exploration) {
64+
return CompressBlockAndRecord(uncompressed_data, compressed_output,
65+
out_compression_type, wa);
66+
} else {
67+
auto prediction = predictor_->Predict();
68+
if (prediction <= kProbabilityCutOff) {
69+
// decide to compress
70+
return CompressBlockAndRecord(uncompressed_data, compressed_output,
71+
out_compression_type, wa);
72+
} else {
73+
// decide to bypass compression
74+
*out_compression_type = kNoCompression;
75+
return Status::OK();
76+
}
77+
}
78+
return Status::OK();
79+
}
80+
81+
Status AutoSkipCompressorWrapper::CompressBlockAndRecord(
82+
Slice uncompressed_data, std::string* compressed_output,
83+
CompressionType* out_compression_type, ManagedWorkingArea* wa) {
84+
Status status = wrapped_->CompressBlock(uncompressed_data, compressed_output,
85+
out_compression_type, wa);
86+
// determine if it was rejected or compressed
87+
predictor_->Record(uncompressed_data, compressed_output, opts_);
88+
return status;
89+
}
90+
91+
const char* AutoSkipCompressorManager::Name() const {
92+
// should have returned "AutoSkipCompressorManager" but we currently have an
93+
// error so for now returning name of the wrapped container
94+
return wrapped_->Name();
95+
}
96+
97+
std::unique_ptr<Compressor> AutoSkipCompressorManager::GetCompressorForSST(
98+
const FilterBuildingContext& context, const CompressionOptions& opts,
99+
CompressionType preferred) {
100+
assert(GetSupportedCompressions().size() > 1);
101+
assert(preferred != kNoCompression);
102+
return std::make_unique<AutoSkipCompressorWrapper>(
103+
wrapped_->GetCompressorForSST(context, opts, preferred), opts, preferred);
104+
}
105+
106+
std::shared_ptr<CompressionManagerWrapper> CreateAutoSkipCompressionManager(
107+
std::shared_ptr<CompressionManager> wrapped) {
108+
return std::make_shared<AutoSkipCompressorManager>(
109+
wrapped == nullptr ? GetDefaultBuiltinCompressionManager() : wrapped);
110+
}
111+
} // namespace ROCKSDB_NAMESPACE

util/auto_skip_compressor.h

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright (c) Meta Platforms, Inc. and affiliates.
2+
// This source code is licensed under both the GPLv2 (found in the
3+
// COPYING file in the root directory) and Apache 2.0 License
4+
// (found in the LICENSE.Apache file in the root directory).
5+
//
6+
// Creates auto skip compressor wrapper which intelligently decides bypassing
7+
// compression based on past data
8+
9+
#pragma once
10+
#include <memory>
11+
12+
#include "rocksdb/advanced_compression.h"
13+
#include "util/compression.h"
14+
15+
namespace ROCKSDB_NAMESPACE {
16+
// Predict rejection probability using a moving window approach
17+
// This class is not thread safe
18+
class CompressionRejectionProbabilityPredictor {
19+
public:
20+
CompressionRejectionProbabilityPredictor(int window_size)
21+
: pred_rejection_prob_percentage_(0),
22+
rejected_count_(0),
23+
compressed_count_(0),
24+
window_size_(window_size) {}
25+
int Predict() const;
26+
bool Record(Slice uncompressed_block_data, std::string* compressed_output,
27+
const CompressionOptions& opts);
28+
size_t attempted_compression_count() const;
29+
30+
protected:
31+
int pred_rejection_prob_percentage_;
32+
size_t rejected_count_;
33+
size_t compressed_count_;
34+
size_t window_size_;
35+
};
36+
37+
class AutoSkipCompressorWrapper : public CompressorWrapper {
38+
public:
39+
explicit AutoSkipCompressorWrapper(std::unique_ptr<Compressor> compressor,
40+
const CompressionOptions& opts,
41+
const CompressionType type);
42+
43+
Status CompressBlock(Slice uncompressed_data, std::string* compressed_output,
44+
CompressionType* out_compression_type,
45+
ManagedWorkingArea* wa) override;
46+
47+
private:
48+
Status CompressBlockAndRecord(Slice uncompressed_data,
49+
std::string* compressed_output,
50+
CompressionType* out_compression_type,
51+
ManagedWorkingArea* wa);
52+
static constexpr int kExplorationPercentage = 10;
53+
static constexpr int kProbabilityCutOff = 50;
54+
const CompressionOptions& opts_;
55+
const CompressionType type_;
56+
std::shared_ptr<CompressionRejectionProbabilityPredictor> predictor_;
57+
};
58+
59+
class AutoSkipCompressorManager : public CompressionManagerWrapper {
60+
using CompressionManagerWrapper::CompressionManagerWrapper;
61+
const char* Name() const override;
62+
std::unique_ptr<Compressor> GetCompressorForSST(
63+
const FilterBuildingContext& context, const CompressionOptions& opts,
64+
CompressionType preferred) override;
65+
};
66+
67+
} // namespace ROCKSDB_NAMESPACE

0 commit comments

Comments
 (0)