Skip to content

Commit 5041a0f

Browse files
authored
Stable-25-3 Reject Probability in Column Shards (#28077)
2 parents 2da3d30 + b732233 commit 5041a0f

File tree

11 files changed

+125
-7
lines changed

11 files changed

+125
-7
lines changed

ydb/core/protos/config.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1629,6 +1629,12 @@ message TImmediateControlsConfig {
16291629
MinValue: 8,
16301630
MaxValue: 4096,
16311631
DefaultValue: 256 }];
1632+
1633+
optional uint64 MaxTxInFly = 2 [(ControlOptions) = {
1634+
Description: "Maximum tx queue size for all tablets",
1635+
MinValue: 0,
1636+
MaxValue: 1000000,
1637+
DefaultValue: 10000 }];
16321638
}
16331639

16341640
message TDSProxyControls {

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,6 @@ message TFeatureFlags {
233233
optional bool EnableTinyDisks = 207 [default = false];
234234
optional bool EnableMetricsLevel = 208 [default = false];
235235
optional bool EnableSecureScriptExecutions = 209 [default = false];
236+
optional bool EnableOlapRejectProbability = 213 [default = false];
236237
optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
237238
}

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <ydb/core/base/hive.h>
3232
#include <ydb/core/base/tablet_pipecache.h>
3333
#include <ydb/core/control/lib/immediate_control_board_impl.h>
34+
#include <ydb/core/protos/memory_controller_config.pb.h>
3435
#include <ydb/core/scheme/scheme_type_registry.h>
3536
#include <ydb/core/tablet/tablet_counters_aggregator.h>
3637
#include <ydb/library/actors/core/hfunc.h>
@@ -45,10 +46,10 @@
4546
#include <util/generic/xrange.h>
4647
#include <util/generic/ymath.h>
4748

49+
4850
namespace NKikimr {
4951
namespace NTabletFlatExecutor {
5052

51-
static constexpr ui64 MaxTxInFly = 10000;
5253

5354
LWTRACE_USING(TABLET_FLAT_PROVIDER)
5455

@@ -137,10 +138,10 @@ TExecutor::TExecutor(
137138
, Stats(new TExecutorStatsImpl())
138139
, LogFlushDelayOverrideUsec(-1, -1, 60*1000*1000)
139140
, MaxCommitRedoMB(256, 1, 4096)
141+
, MaxTxInFly(10000, 0, 1000000)
140142
{}
141143

142144
TExecutor::~TExecutor() {
143-
144145
}
145146

146147
bool TExecutor::OnUnhandledException(const std::exception& e) {
@@ -180,6 +181,7 @@ void TExecutor::Registered(TActorSystem *sys, const TActorId&)
180181
TString myTabletType = TTabletTypes::TypeToStr(Owner->TabletType());
181182
AppData()->Icb->RegisterSharedControl(LogFlushDelayOverrideUsec, myTabletType + "_LogFlushDelayOverrideUsec");
182183
AppData()->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
184+
AppData()->Icb->RegisterSharedControl(MaxTxInFly, "TabletControls.MaxTxInFly");
183185

184186
// instantiate alert counters so even never reported alerts are created
185187
GetServiceCounters(AppData()->Counters, "tablets")->GetCounter("alerts_pending_nodata", true);
@@ -4072,8 +4074,7 @@ void TExecutor::ForceSendCounters() {
40724074

40734075
float TExecutor::GetRejectProbability() const {
40744076
// Limit number of in-flight TXs
4075-
// TODO: make configurable
4076-
if (Stats->TxInFly > MaxTxInFly) {
4077+
if (Stats->TxInFly > ui64(MaxTxInFly)) {
40774078
HadRejectProbabilityByTxInFly = true;
40784079
return 1.0;
40794080
}
@@ -4109,7 +4110,7 @@ float TExecutor::GetRejectProbability() const {
41094110
}
41104111

41114112
void TExecutor::MaybeRelaxRejectProbability() {
4112-
if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= MaxTxInFly ||
4113+
if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= ui64(MaxTxInFly) ||
41134114
HadRejectProbabilityByOverload)
41144115
{
41154116
HadRejectProbabilityByTxInFly = false;

ydb/core/tablet_flat/flat_executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ class TExecutor
499499

500500
TControlWrapper LogFlushDelayOverrideUsec;
501501
TControlWrapper MaxCommitRedoMB;
502+
TControlWrapper MaxTxInFly;
502503

503504
TActorId BackupWriter;
504505

ydb/core/tx/columnshard/columnshard__overload.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const TInte
2424
"limit", txLimit);
2525
return EOverloadStatus::ShardTxInFly;
2626
}
27+
28+
if (AppData()->FeatureFlags.GetEnableOlapRejectProbability()) {
29+
const float rejectProbabilty = Executor()->GetRejectProbability();
30+
if (rejectProbabilty > 0) {
31+
const float rnd = TAppData::RandomProvider->GenRandReal2();
32+
if (rnd < rejectProbabilty) {
33+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "shard_overload")("reason", "reject_probality")("RP", rejectProbabilty);
34+
return EOverloadStatus::RejectProbability;
35+
}
36+
}
37+
}
38+
2739
return EOverloadStatus::None;
2840
}
2941

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
4646
case EOverloadStatus::ShardWritesSizeInFly:
4747
Counters.OnWriteOverloadShardWritesSize(writeSize);
4848
break;
49+
case EOverloadStatus::RejectProbability:
50+
Counters.OnWriteOverloadRejectProbability(writeSize);
51+
break;
4952
case EOverloadStatus::None:
5053
Y_ABORT("invalid function usage");
5154
}
@@ -345,7 +348,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
345348
const auto source = ev->Sender;
346349
const auto cookie = ev->Cookie;
347350

348-
349351
std::optional<TDuration> writeTimeout;
350352
if (record.HasTimeoutSeconds()) {
351353
writeTimeout = TDuration::Seconds(record.GetTimeoutSeconds());

ydb/core/tx/columnshard/counters/columnshard.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ TCSCounters::TCSCounters()
4343
OverloadShardWritesCount = TBase::GetDeriviative("Overload/Shard/Writes/Count");
4444
OverloadShardWritesSizeBytes = TBase::GetDeriviative("Overload/Shard/WritesSize/Bytes");
4545
OverloadShardWritesSizeCount = TBase::GetDeriviative("Overload/Shard/WritesSize/Count");
46+
OverloadRejectProbabilityBytes = TBase::GetDeriviative("Overload/RejectProbability/Bytes");
47+
OverloadRejectProbabilityCount = TBase::GetDeriviative("Overload/RejectProbability/Count");
4648

4749
InternalCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("InternalCompaction/Bytes");
4850
InternalCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("InternalCompaction/PortionsCount");

ydb/core/tx/columnshard/counters/columnshard.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ enum class EOverloadStatus {
1818
OverloadMetadata /* "overload_metadata" */,
1919
Disk /* "disk" */,
2020
None /* "none" */,
21-
OverloadCompaction /* "overload_compaction" */
21+
OverloadCompaction /* "overload_compaction" */,
22+
RejectProbability,
2223
};
2324

2425
enum class EWriteFailReason {
@@ -106,6 +107,8 @@ class TCSCounters: public TCommonCountersOwner {
106107
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesCount;
107108
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeBytes;
108109
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeCount;
110+
NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityBytes;
111+
NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityCount;
109112

110113
std::shared_ptr<TValueAggregationClient> InternalCompactionGranuleBytes;
111114
std::shared_ptr<TValueAggregationClient> InternalCompactionGranulePortionsCount;
@@ -222,6 +225,11 @@ class TCSCounters: public TCommonCountersOwner {
222225
OverloadShardWritesSizeCount->Add(1);
223226
}
224227

228+
void OnWriteOverloadRejectProbability(const ui64 size) const {
229+
OverloadRejectProbabilityBytes->Add(size);
230+
OverloadRejectProbabilityCount->Add(1);
231+
}
232+
225233
void SkipIndexationInputDueToSplitCompaction(const ui64 size) const {
226234
SkipIndexationInputDueToSplitCompactionBytes->Add(size);
227235
SkipIndexationInputDueToSplitCompactionCount->Add(1);

ydb/core/tx/columnshard/counters/counters_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ class TCountersManager {
7979
CSCounters.OnWriteOverloadShardWritesSize(size);
8080
}
8181

82+
void OnWriteOverloadRejectProbability(const ui64 size) const {
83+
TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD);
84+
CSCounters.OnWriteOverloadRejectProbability(size);
85+
}
86+
8287
void FillTableStats(TInternalPathId pathId, ::NKikimrTableStats::TTableStats& tableStats) {
8388
ColumnTablesCounters->GetPathIdCounter(pathId)->FillStats(tableStats);
8489
BackgroundControllerCounters->FillStats(pathId, tableStats);

ydb/tests/olap/common/ydb_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ def wait_connection(self, timeout=5):
2323
def query(self, statement):
2424
return self.session_pool.execute_with_retries(statement)
2525

26+
def query_async(self, statement, request_settings=None):
27+
return self.session_pool.execute_with_retries_async(query=statement, settings=request_settings)
28+
2629
def bulk_upsert(self, table_path, column_types: ydb.BulkUpsertColumns, data_slice):
2730
self.driver.table_client.bulk_upsert(
2831
table_path,

0 commit comments

Comments
 (0)