From 87168811d39fffd49c97b48e3b8bfdc69caa425d Mon Sep 17 00:00:00 2001 From: Stanislav Yablonskiy Date: Wed, 22 Oct 2025 11:46:14 +0300 Subject: [PATCH 1/3] Reject operations in column shards by probability (#26783) --- ydb/core/protos/feature_flags.proto | 1 + .../protos/memory_controller_config.proto | 2 + ydb/core/tablet_flat/flat_executor.cpp | 5 +- ydb/core/tablet_flat/flat_executor.h | 2 + .../tx/columnshard/columnshard__overload.cpp | 12 ++++ .../tx/columnshard/columnshard__write.cpp | 4 +- .../tx/columnshard/counters/columnshard.cpp | 2 + .../tx/columnshard/counters/columnshard.h | 10 ++- .../columnshard/counters/counters_manager.h | 5 ++ ydb/tests/olap/test_overloads.py | 68 +++++++++++++++++++ 10 files changed, 107 insertions(+), 4 deletions(-) diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 48f3e7f3b21a..ceeadfd7bcad 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -233,5 +233,6 @@ message TFeatureFlags { optional bool EnableTinyDisks = 207 [default = false]; optional bool EnableMetricsLevel = 208 [default = false]; optional bool EnableSecureScriptExecutions = 209 [default = false]; + optional bool EnableOlapRejectProbability = 213 [default = false]; optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false]; } diff --git a/ydb/core/protos/memory_controller_config.proto b/ydb/core/protos/memory_controller_config.proto index a4040970250f..00776138ddce 100644 --- a/ydb/core/protos/memory_controller_config.proto +++ b/ydb/core/protos/memory_controller_config.proto @@ -28,4 +28,6 @@ message TMemoryControllerConfig { optional float CompactionLimitPercent = 130 [default = 10]; optional uint64 CompactionLimitBytes = 131; + + optional uint64 MaxTxInFly = 132 [default = 10000]; } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 97451b91a001..7e2f9e3e2865 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -45,10 +46,10 @@ #include #include + namespace NKikimr { namespace NTabletFlatExecutor { -static constexpr ui64 MaxTxInFly = 10000; LWTRACE_USING(TABLET_FLAT_PROVIDER) @@ -132,6 +133,7 @@ TExecutor::TExecutor( , Time(TAppData::TimeProvider) , Owner(owner) , OwnerActorId(ownerActorId) + , MaxTxInFly(AppData()->MemoryControllerConfig.GetMaxTxInFly()) , Emitter(new TIdEmitter) , CounterEventsInFlight(new TEvTabletCounters::TInFlightCookie) , Stats(new TExecutorStatsImpl()) @@ -140,7 +142,6 @@ TExecutor::TExecutor( {} TExecutor::~TExecutor() { - } bool TExecutor::OnUnhandledException(const std::exception& e) { diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 90ee8b44ac56..643fe7ffab04 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -385,6 +385,8 @@ class TExecutor // Counts the number of times LeaseDuration was increased size_t LeaseDurationIncreases = 0; + const ui64 MaxTxInFly; + struct TLeaseCommit : public TIntrusiveListItem { using TByEndMap = std::multimap; diff --git a/ydb/core/tx/columnshard/columnshard__overload.cpp b/ydb/core/tx/columnshard/columnshard__overload.cpp index ab3eabb02dca..8ca464e7c441 100644 --- a/ydb/core/tx/columnshard/columnshard__overload.cpp +++ b/ydb/core/tx/columnshard/columnshard__overload.cpp @@ -24,6 +24,18 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const TInte "limit", txLimit); return EOverloadStatus::ShardTxInFly; } + + if (AppData()->FeatureFlags.GetEnableOlapRejectProbability()) { + const float rejectProbabilty = Executor()->GetRejectProbability(); + if (rejectProbabilty > 0) { + const float rnd = TAppData::RandomProvider->GenRandReal2(); + if (rnd < rejectProbabilty) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "shard_overload")("reason", "reject_probality")("RP", rejectProbabilty); + return EOverloadStatus::RejectProbability; + } + } + } + return EOverloadStatus::None; } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 02d95f471abb..57486ffeb183 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -46,6 +46,9 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const case EOverloadStatus::ShardWritesSizeInFly: Counters.OnWriteOverloadShardWritesSize(writeSize); break; + case EOverloadStatus::RejectProbability: + Counters.OnWriteOverloadRejectProbability(writeSize); + break; case EOverloadStatus::None: Y_ABORT("invalid function usage"); } @@ -345,7 +348,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor const auto source = ev->Sender; const auto cookie = ev->Cookie; - std::optional writeTimeout; if (record.HasTimeoutSeconds()) { writeTimeout = TDuration::Seconds(record.GetTimeoutSeconds()); diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp index e2a2905ae72f..236a72f4dd33 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.cpp +++ b/ydb/core/tx/columnshard/counters/columnshard.cpp @@ -43,6 +43,8 @@ TCSCounters::TCSCounters() OverloadShardWritesCount = TBase::GetDeriviative("Overload/Shard/Writes/Count"); OverloadShardWritesSizeBytes = TBase::GetDeriviative("Overload/Shard/WritesSize/Bytes"); OverloadShardWritesSizeCount = TBase::GetDeriviative("Overload/Shard/WritesSize/Count"); + OverloadRejectProbabilityBytes = TBase::GetDeriviative("Overload/RejectProbability/Bytes"); + OverloadRejectProbabilityCount = TBase::GetDeriviative("Overload/RejectProbability/Count"); InternalCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("InternalCompaction/Bytes"); InternalCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("InternalCompaction/PortionsCount"); diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h index bf1af7e763d1..4dfcce2380a7 100644 --- a/ydb/core/tx/columnshard/counters/columnshard.h +++ b/ydb/core/tx/columnshard/counters/columnshard.h @@ -18,7 +18,8 @@ enum class EOverloadStatus { OverloadMetadata /* "overload_metadata" */, Disk /* "disk" */, None /* "none" */, - OverloadCompaction /* "overload_compaction" */ + OverloadCompaction /* "overload_compaction" */, + RejectProbability, }; enum class EWriteFailReason { @@ -106,6 +107,8 @@ class TCSCounters: public TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesCount; NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeBytes; NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeCount; + NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityBytes; + NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityCount; std::shared_ptr InternalCompactionGranuleBytes; std::shared_ptr InternalCompactionGranulePortionsCount; @@ -222,6 +225,11 @@ class TCSCounters: public TCommonCountersOwner { OverloadShardWritesSizeCount->Add(1); } + void OnWriteOverloadRejectProbability(const ui64 size) const { + OverloadRejectProbabilityBytes->Add(size); + OverloadRejectProbabilityCount->Add(1); + } + void SkipIndexationInputDueToSplitCompaction(const ui64 size) const { SkipIndexationInputDueToSplitCompactionBytes->Add(size); SkipIndexationInputDueToSplitCompactionCount->Add(1); diff --git a/ydb/core/tx/columnshard/counters/counters_manager.h b/ydb/core/tx/columnshard/counters/counters_manager.h index 29647de128e9..88bbb526bcb9 100644 --- a/ydb/core/tx/columnshard/counters/counters_manager.h +++ b/ydb/core/tx/columnshard/counters/counters_manager.h @@ -79,6 +79,11 @@ class TCountersManager { CSCounters.OnWriteOverloadShardWritesSize(size); } + void OnWriteOverloadRejectProbability(const ui64 size) const { + TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD); + CSCounters.OnWriteOverloadRejectProbability(size); + } + void FillTableStats(TInternalPathId pathId, ::NKikimrTableStats::TTableStats& tableStats) { ColumnTablesCounters->GetPathIdCounter(pathId)->FillStats(tableStats); BackgroundControllerCounters->FillStats(pathId, tableStats); diff --git a/ydb/tests/olap/test_overloads.py b/ydb/tests/olap/test_overloads.py index 84abb1865511..a93b8aa15ac5 100644 --- a/ydb/tests/olap/test_overloads.py +++ b/ydb/tests/olap/test_overloads.py @@ -4,6 +4,7 @@ import logging import yatest.common import ydb +import random from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -103,6 +104,20 @@ def _setup_ydb(cls, writing_in_flight_requests_count_limit, writing_in_flight_re cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}") cls.ydb_client.wait_connection(timeout=60) + @classmethod + def _setup_ydb_rp(cls): + ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY")) + logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) + config = KikimrConfigGenerator( + extra_feature_flags={"enable_olap_reject_probability": True}, + memory_controller_config={"max_tx_in_fly": 0}, + ) + cls.cluster = KiKiMR(config) + cls.cluster.start() + node = cls.cluster.nodes[1] + cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}") + cls.ydb_client.wait_connection(timeout=60) + def get_row_count(self) -> int: return self.ydb_client.query(f"select count(*) as Rows from `{self.table_name}`")[0].rows[0]["Rows"] @@ -156,6 +171,7 @@ def test_overloads_bulk_upsert(self, writing_in_flight_requests_count_limit, wri @pytest.mark.parametrize('writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit', [(1, 10000), (2, 10000), (1000, 1), (1000, 2), (1, 1), (2, 2)]) def test_overloads_workload(self, writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit): self._setup_ydb(writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit) + wait_time: int = 60 self.table_name: str = f"log_{writing_in_flight_requests_count_limit}_{writing_in_flight_request_bytes_limit}" @@ -191,3 +207,55 @@ def test_overloads_workload(self, writing_in_flight_requests_count_limit, writin logging.info(f"Count rows after insert {self.get_row_count()}") assert self.get_row_count() != 0 + + def test_overloads_reject_probability(self): + self._setup_ydb_rp() + + table_path = f"{self.ydb_client.database}/table_for_test_overloads_reject_probability" + self.ydb_client.query( + f""" + CREATE TABLE `{table_path}` ( + id Uint64 NOT NULL, + v1 Int64, + v2 Int64, + PRIMARY KEY(id), + ) + WITH ( + STORE = COLUMN, + PARTITION_COUNT = 1, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1 + ) + """ + ) + + column_types = ydb.BulkUpsertColumns() + column_types.add_column("id", ydb.PrimitiveType.Uint64) + column_types.add_column("v1", ydb.PrimitiveType.Int64) + column_types.add_column("v2", ydb.PrimitiveType.Int64) + + rows_count = 1000 + + data = [ + { + "id": i, + "v1": 1, + "v2": -1, + } + for i in range(rows_count) + ] + + self.ydb_client.bulk_upsert(table_path, column_types, data) + + futures = [] + + for _ in range(19): + lb = random.randint(0, rows_count) + futures.append(self.ydb_client.query_async(f"UPDATE `{table_path}` SET v1 = v1 + 1, v2 = v2 - 1 WHERE id > {lb};")) + + for future in futures: + future.result() + + monitor = self.cluster.monitors[0].fetch() + _, rejectProbabilityCount = monitor.get_by_name('Deriviative/Overload/RejectProbability/Count')[0] + + assert rejectProbabilityCount > 0 From b0cb7a35b8065ff2df74b7ce280c76ee2c775b6a Mon Sep 17 00:00:00 2001 From: Stanislav Yablonskiy Date: Thu, 30 Oct 2025 14:49:56 +0300 Subject: [PATCH 2/3] Move MaxTxInFly to ICB (#27635) --- ydb/core/protos/config.proto | 6 ++++++ ydb/core/protos/memory_controller_config.proto | 2 -- ydb/core/tablet_flat/flat_executor.cpp | 8 ++++---- ydb/core/tablet_flat/flat_executor.h | 3 +-- ydb/tests/olap/test_overloads.py | 11 ++++++++++- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c94ad28ba8fe..fd4bf7f03397 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1629,6 +1629,12 @@ message TImmediateControlsConfig { MinValue: 8, MaxValue: 4096, DefaultValue: 256 }]; + + optional uint64 MaxTxInFly = 2 [(ControlOptions) = { + Description: "Maximum tx queue size for all tablets", + MinValue: 0, + MaxValue: 1000000, + DefaultValue: 10000 }]; } message TDSProxyControls { diff --git a/ydb/core/protos/memory_controller_config.proto b/ydb/core/protos/memory_controller_config.proto index 00776138ddce..a4040970250f 100644 --- a/ydb/core/protos/memory_controller_config.proto +++ b/ydb/core/protos/memory_controller_config.proto @@ -28,6 +28,4 @@ message TMemoryControllerConfig { optional float CompactionLimitPercent = 130 [default = 10]; optional uint64 CompactionLimitBytes = 131; - - optional uint64 MaxTxInFly = 132 [default = 10000]; } diff --git a/ydb/core/tablet_flat/flat_executor.cpp b/ydb/core/tablet_flat/flat_executor.cpp index 7e2f9e3e2865..55a5080f7a7c 100644 --- a/ydb/core/tablet_flat/flat_executor.cpp +++ b/ydb/core/tablet_flat/flat_executor.cpp @@ -133,12 +133,12 @@ TExecutor::TExecutor( , Time(TAppData::TimeProvider) , Owner(owner) , OwnerActorId(ownerActorId) - , MaxTxInFly(AppData()->MemoryControllerConfig.GetMaxTxInFly()) , Emitter(new TIdEmitter) , CounterEventsInFlight(new TEvTabletCounters::TInFlightCookie) , Stats(new TExecutorStatsImpl()) , LogFlushDelayOverrideUsec(-1, -1, 60*1000*1000) , MaxCommitRedoMB(256, 1, 4096) + , MaxTxInFly(10000, 0, 1000000) {} TExecutor::~TExecutor() { @@ -181,6 +181,7 @@ void TExecutor::Registered(TActorSystem *sys, const TActorId&) TString myTabletType = TTabletTypes::TypeToStr(Owner->TabletType()); AppData()->Icb->RegisterSharedControl(LogFlushDelayOverrideUsec, myTabletType + "_LogFlushDelayOverrideUsec"); AppData()->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB"); + AppData()->Icb->RegisterSharedControl(MaxTxInFly, "TabletControls.MaxTxInFly"); // instantiate alert counters so even never reported alerts are created GetServiceCounters(AppData()->Counters, "tablets")->GetCounter("alerts_pending_nodata", true); @@ -4073,8 +4074,7 @@ void TExecutor::ForceSendCounters() { float TExecutor::GetRejectProbability() const { // Limit number of in-flight TXs - // TODO: make configurable - if (Stats->TxInFly > MaxTxInFly) { + if (Stats->TxInFly > ui64(MaxTxInFly)) { HadRejectProbabilityByTxInFly = true; return 1.0; } @@ -4110,7 +4110,7 @@ float TExecutor::GetRejectProbability() const { } void TExecutor::MaybeRelaxRejectProbability() { - if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= MaxTxInFly || + if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= ui64(MaxTxInFly) || HadRejectProbabilityByOverload) { HadRejectProbabilityByTxInFly = false; diff --git a/ydb/core/tablet_flat/flat_executor.h b/ydb/core/tablet_flat/flat_executor.h index 643fe7ffab04..1e515de3f45e 100644 --- a/ydb/core/tablet_flat/flat_executor.h +++ b/ydb/core/tablet_flat/flat_executor.h @@ -385,8 +385,6 @@ class TExecutor // Counts the number of times LeaseDuration was increased size_t LeaseDurationIncreases = 0; - const ui64 MaxTxInFly; - struct TLeaseCommit : public TIntrusiveListItem { using TByEndMap = std::multimap; @@ -501,6 +499,7 @@ class TExecutor TControlWrapper LogFlushDelayOverrideUsec; TControlWrapper MaxCommitRedoMB; + TControlWrapper MaxTxInFly; TActorId BackupWriter; diff --git a/ydb/tests/olap/test_overloads.py b/ydb/tests/olap/test_overloads.py index a93b8aa15ac5..709f32d50c9b 100644 --- a/ydb/tests/olap/test_overloads.py +++ b/ydb/tests/olap/test_overloads.py @@ -5,6 +5,7 @@ import yatest.common import ydb import random +import requests from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.kikimr_runner import KiKiMR @@ -110,13 +111,13 @@ def _setup_ydb_rp(cls): logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8")) config = KikimrConfigGenerator( extra_feature_flags={"enable_olap_reject_probability": True}, - memory_controller_config={"max_tx_in_fly": 0}, ) cls.cluster = KiKiMR(config) cls.cluster.start() node = cls.cluster.nodes[1] cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}") cls.ydb_client.wait_connection(timeout=60) + cls.mon_url = f"http://{node.host}:{node.mon_port}" def get_row_count(self) -> int: return self.ydb_client.query(f"select count(*) as Rows from `{self.table_name}`")[0].rows[0]["Rows"] @@ -208,8 +209,16 @@ def test_overloads_workload(self, writing_in_flight_requests_count_limit, writin logging.info(f"Count rows after insert {self.get_row_count()}") assert self.get_row_count() != 0 + def tune_icb(self): + response = requests.post( + self.mon_url + "/actors/icb", + data="TabletControls.MaxTxInFly=0" + ) + response.raise_for_status() + def test_overloads_reject_probability(self): self._setup_ydb_rp() + self.tune_icb() table_path = f"{self.ydb_client.database}/table_for_test_overloads_reject_probability" self.ydb_client.query( From b732233427ae08e52f6c7aad6547f09d09ccfdc4 Mon Sep 17 00:00:00 2001 From: Stanislav Yablonskiy Date: Sat, 1 Nov 2025 15:01:42 +0000 Subject: [PATCH 3/3] Missed test method added --- ydb/tests/olap/common/ydb_client.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ydb/tests/olap/common/ydb_client.py b/ydb/tests/olap/common/ydb_client.py index a43b52f8eafe..9e48d21b92f1 100644 --- a/ydb/tests/olap/common/ydb_client.py +++ b/ydb/tests/olap/common/ydb_client.py @@ -23,6 +23,9 @@ def wait_connection(self, timeout=5): def query(self, statement): return self.session_pool.execute_with_retries(statement) + def query_async(self, statement, request_settings=None): + return self.session_pool.execute_with_retries_async(query=statement, settings=request_settings) + def bulk_upsert(self, table_path, column_types: ydb.BulkUpsertColumns, data_slice): self.driver.table_client.bulk_upsert( table_path,