Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1747,6 +1747,12 @@ message TImmediateControlsConfig {
MinValue: 8,
MaxValue: 4096,
DefaultValue: 256 }];

optional uint64 MaxTxInFly = 2 [(ControlOptions) = {
Description: "Maximum tx queue size for all tablets",
MinValue: 0,
MaxValue: 1000000,
DefaultValue: 10000 }];
}

message TDSProxyControls {
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/protos/memory_controller_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,4 @@ message TMemoryControllerConfig {

optional float CompactionLimitPercent = 130 [default = 10];
optional uint64 CompactionLimitBytes = 131;

optional uint64 MaxTxInFly = 132 [default = 10000];
}
8 changes: 4 additions & 4 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,12 @@ TExecutor::TExecutor(
, Time(TAppData::TimeProvider)
, Owner(owner)
, OwnerActorId(ownerActorId)
, MaxTxInFly(AppData()->MemoryControllerConfig.GetMaxTxInFly())
, Emitter(new TIdEmitter)
, CounterEventsInFlight(new TEvTabletCounters::TInFlightCookie)
, Stats(new TExecutorStatsImpl())
, LogFlushDelayOverrideUsec(-1, -1, 60*1000*1000)
, MaxCommitRedoMB(256, 1, 4096)
, MaxTxInFly(10000, 0, 1000000)
{}

TExecutor::~TExecutor() {
Expand Down Expand Up @@ -183,6 +183,7 @@ void TExecutor::Registered(TActorSystem *sys, const TActorId&)
TControlBoard::RegisterSharedControl(LogFlushDelayOverrideUsec, icb.LogFlushDelayOverrideUsec[static_cast<size_t>(Owner->TabletType())]);
}
TControlBoard::RegisterSharedControl(MaxCommitRedoMB, icb.TabletControls.MaxCommitRedoMB);
TControlBoard::RegisterSharedControl(MaxTxInFly, icb.TabletControls.MaxTxInFly);

// instantiate alert counters so even never reported alerts are created
GetServiceCounters(AppData()->Counters, "tablets")->GetCounter("alerts_pending_nodata", true);
Expand Down Expand Up @@ -4076,8 +4077,7 @@ void TExecutor::ForceSendCounters() {

float TExecutor::GetRejectProbability() const {
// Limit number of in-flight TXs
// TODO: make configurable
if (Stats->TxInFly > MaxTxInFly) {
if (Stats->TxInFly > ui64(MaxTxInFly)) {
HadRejectProbabilityByTxInFly = true;
return 1.0;
}
Expand Down Expand Up @@ -4113,7 +4113,7 @@ float TExecutor::GetRejectProbability() const {
}

void TExecutor::MaybeRelaxRejectProbability() {
if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= MaxTxInFly ||
if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= ui64(MaxTxInFly) ||
HadRejectProbabilityByOverload)
{
HadRejectProbabilityByTxInFly = false;
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,6 @@ class TExecutor
// Counts the number of times LeaseDuration was increased
size_t LeaseDurationIncreases = 0;

const ui64 MaxTxInFly;

struct TLeaseCommit : public TIntrusiveListItem<TLeaseCommit> {
using TByEndMap = std::multimap<TMonotonic, TLeaseCommit*>;

Expand Down Expand Up @@ -502,6 +500,7 @@ class TExecutor

TControlWrapper LogFlushDelayOverrideUsec;
TControlWrapper MaxCommitRedoMB;
TControlWrapper MaxTxInFly;

ui64 Stamp() const noexcept;
void Registered(TActorSystem*, const TActorId&) override;
Expand Down
11 changes: 10 additions & 1 deletion ydb/tests/olap/test_overloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import yatest.common
import ydb
import random
import requests

from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
from ydb.tests.library.harness.kikimr_runner import KiKiMR
Expand Down Expand Up @@ -110,13 +111,13 @@ def _setup_ydb_rp(cls):
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
config = KikimrConfigGenerator(
extra_feature_flags={"enable_olap_reject_probability": True},
memory_controller_config={"max_tx_in_fly": 0},
)
cls.cluster = KiKiMR(config)
cls.cluster.start()
node = cls.cluster.nodes[1]
cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}")
cls.ydb_client.wait_connection(timeout=60)
cls.mon_url = f"http://{node.host}:{node.mon_port}"

def get_row_count(self) -> int:
return self.ydb_client.query(f"select count(*) as Rows from `{self.table_name}`")[0].rows[0]["Rows"]
Expand Down Expand Up @@ -208,8 +209,16 @@ def test_overloads_workload(self, writing_in_flight_requests_count_limit, writin
logging.info(f"Count rows after insert {self.get_row_count()}")
assert self.get_row_count() != 0

def tune_icb(self):
response = requests.post(
self.mon_url + "/actors/icb",
data="TabletControls.MaxTxInFly=0"
)
response.raise_for_status()

def test_overloads_reject_probability(self):
self._setup_ydb_rp()
self.tune_icb()

table_path = f"{self.ydb_client.database}/table_for_test_overloads_reject_probability"
self.ydb_client.query(
Expand Down
Loading