Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,12 @@ message TImmediateControlsConfig {
MinValue: 8,
MaxValue: 4096,
DefaultValue: 256 }];

optional uint64 MaxTxInFly = 2 [(ControlOptions) = {
Description: "Maximum tx queue size for all tablets",
MinValue: 0,
MaxValue: 1000000,
DefaultValue: 10000 }];
}

message TDSProxyControls {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,6 @@ message TFeatureFlags {
optional bool EnableTinyDisks = 207 [default = false];
optional bool EnableMetricsLevel = 208 [default = false];
optional bool EnableSecureScriptExecutions = 209 [default = false];
optional bool EnableOlapRejectProbability = 213 [default = false];
optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
}
11 changes: 6 additions & 5 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <ydb/core/base/hive.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/control/lib/immediate_control_board_impl.h>
#include <ydb/core/protos/memory_controller_config.pb.h>
#include <ydb/core/scheme/scheme_type_registry.h>
#include <ydb/core/tablet/tablet_counters_aggregator.h>
#include <ydb/library/actors/core/hfunc.h>
Expand All @@ -45,10 +46,10 @@
#include <util/generic/xrange.h>
#include <util/generic/ymath.h>


namespace NKikimr {
namespace NTabletFlatExecutor {

static constexpr ui64 MaxTxInFly = 10000;

LWTRACE_USING(TABLET_FLAT_PROVIDER)

Expand Down Expand Up @@ -137,10 +138,10 @@ TExecutor::TExecutor(
, Stats(new TExecutorStatsImpl())
, LogFlushDelayOverrideUsec(-1, -1, 60*1000*1000)
, MaxCommitRedoMB(256, 1, 4096)
, MaxTxInFly(10000, 0, 1000000)
{}

TExecutor::~TExecutor() {

}

bool TExecutor::OnUnhandledException(const std::exception& e) {
Expand Down Expand Up @@ -180,6 +181,7 @@ void TExecutor::Registered(TActorSystem *sys, const TActorId&)
TString myTabletType = TTabletTypes::TypeToStr(Owner->TabletType());
AppData()->Icb->RegisterSharedControl(LogFlushDelayOverrideUsec, myTabletType + "_LogFlushDelayOverrideUsec");
AppData()->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
AppData()->Icb->RegisterSharedControl(MaxTxInFly, "TabletControls.MaxTxInFly");

// instantiate alert counters so even never reported alerts are created
GetServiceCounters(AppData()->Counters, "tablets")->GetCounter("alerts_pending_nodata", true);
Expand Down Expand Up @@ -4072,8 +4074,7 @@ void TExecutor::ForceSendCounters() {

float TExecutor::GetRejectProbability() const {
// Limit number of in-flight TXs
// TODO: make configurable
if (Stats->TxInFly > MaxTxInFly) {
if (Stats->TxInFly > ui64(MaxTxInFly)) {
HadRejectProbabilityByTxInFly = true;
return 1.0;
}
Expand Down Expand Up @@ -4109,7 +4110,7 @@ float TExecutor::GetRejectProbability() const {
}

void TExecutor::MaybeRelaxRejectProbability() {
if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= MaxTxInFly ||
if (HadRejectProbabilityByTxInFly && Stats->TxInFly <= ui64(MaxTxInFly) ||
HadRejectProbabilityByOverload)
{
HadRejectProbabilityByTxInFly = false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ class TExecutor

TControlWrapper LogFlushDelayOverrideUsec;
TControlWrapper MaxCommitRedoMB;
TControlWrapper MaxTxInFly;

TActorId BackupWriter;

Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/columnshard/columnshard__overload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const TInte
"limit", txLimit);
return EOverloadStatus::ShardTxInFly;
}

if (AppData()->FeatureFlags.GetEnableOlapRejectProbability()) {
const float rejectProbabilty = Executor()->GetRejectProbability();
if (rejectProbabilty > 0) {
const float rnd = TAppData::RandomProvider->GenRandReal2();
if (rnd < rejectProbabilty) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "shard_overload")("reason", "reject_probality")("RP", rejectProbabilty);
return EOverloadStatus::RejectProbability;
}
}
}

return EOverloadStatus::None;
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
case EOverloadStatus::ShardWritesSizeInFly:
Counters.OnWriteOverloadShardWritesSize(writeSize);
break;
case EOverloadStatus::RejectProbability:
Counters.OnWriteOverloadRejectProbability(writeSize);
break;
case EOverloadStatus::None:
Y_ABORT("invalid function usage");
}
Expand Down Expand Up @@ -345,7 +348,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
const auto source = ev->Sender;
const auto cookie = ev->Cookie;


std::optional<TDuration> writeTimeout;
if (record.HasTimeoutSeconds()) {
writeTimeout = TDuration::Seconds(record.GetTimeoutSeconds());
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/columnshard/counters/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ TCSCounters::TCSCounters()
OverloadShardWritesCount = TBase::GetDeriviative("Overload/Shard/Writes/Count");
OverloadShardWritesSizeBytes = TBase::GetDeriviative("Overload/Shard/WritesSize/Bytes");
OverloadShardWritesSizeCount = TBase::GetDeriviative("Overload/Shard/WritesSize/Count");
OverloadRejectProbabilityBytes = TBase::GetDeriviative("Overload/RejectProbability/Bytes");
OverloadRejectProbabilityCount = TBase::GetDeriviative("Overload/RejectProbability/Count");

InternalCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("InternalCompaction/Bytes");
InternalCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("InternalCompaction/PortionsCount");
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/tx/columnshard/counters/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ enum class EOverloadStatus {
OverloadMetadata /* "overload_metadata" */,
Disk /* "disk" */,
None /* "none" */,
OverloadCompaction /* "overload_compaction" */
OverloadCompaction /* "overload_compaction" */,
RejectProbability,
};

enum class EWriteFailReason {
Expand Down Expand Up @@ -106,6 +107,8 @@ class TCSCounters: public TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesCount;
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeBytes;
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeCount;
NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityBytes;
NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityCount;

std::shared_ptr<TValueAggregationClient> InternalCompactionGranuleBytes;
std::shared_ptr<TValueAggregationClient> InternalCompactionGranulePortionsCount;
Expand Down Expand Up @@ -222,6 +225,11 @@ class TCSCounters: public TCommonCountersOwner {
OverloadShardWritesSizeCount->Add(1);
}

void OnWriteOverloadRejectProbability(const ui64 size) const {
OverloadRejectProbabilityBytes->Add(size);
OverloadRejectProbabilityCount->Add(1);
}

void SkipIndexationInputDueToSplitCompaction(const ui64 size) const {
SkipIndexationInputDueToSplitCompactionBytes->Add(size);
SkipIndexationInputDueToSplitCompactionCount->Add(1);
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/counters/counters_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ class TCountersManager {
CSCounters.OnWriteOverloadShardWritesSize(size);
}

void OnWriteOverloadRejectProbability(const ui64 size) const {
TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD);
CSCounters.OnWriteOverloadRejectProbability(size);
}

void FillTableStats(TInternalPathId pathId, ::NKikimrTableStats::TTableStats& tableStats) {
ColumnTablesCounters->GetPathIdCounter(pathId)->FillStats(tableStats);
BackgroundControllerCounters->FillStats(pathId, tableStats);
Expand Down
3 changes: 3 additions & 0 deletions ydb/tests/olap/common/ydb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def wait_connection(self, timeout=5):
def query(self, statement):
return self.session_pool.execute_with_retries(statement)

def query_async(self, statement, request_settings=None):
return self.session_pool.execute_with_retries_async(query=statement, settings=request_settings)

def bulk_upsert(self, table_path, column_types: ydb.BulkUpsertColumns, data_slice):
self.driver.table_client.bulk_upsert(
table_path,
Expand Down
77 changes: 77 additions & 0 deletions ydb/tests/olap/test_overloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import logging
import yatest.common
import ydb
import random
import requests

from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
from ydb.tests.library.harness.kikimr_runner import KiKiMR
Expand Down Expand Up @@ -103,6 +105,20 @@ def _setup_ydb(cls, writing_in_flight_requests_count_limit, writing_in_flight_re
cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}")
cls.ydb_client.wait_connection(timeout=60)

@classmethod
def _setup_ydb_rp(cls):
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
config = KikimrConfigGenerator(
extra_feature_flags={"enable_olap_reject_probability": True},
)
cls.cluster = KiKiMR(config)
cls.cluster.start()
node = cls.cluster.nodes[1]
cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}")
cls.ydb_client.wait_connection(timeout=60)
cls.mon_url = f"http://{node.host}:{node.mon_port}"

def get_row_count(self) -> int:
return self.ydb_client.query(f"select count(*) as Rows from `{self.table_name}`")[0].rows[0]["Rows"]

Expand Down Expand Up @@ -156,6 +172,7 @@ def test_overloads_bulk_upsert(self, writing_in_flight_requests_count_limit, wri
@pytest.mark.parametrize('writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit', [(1, 10000), (2, 10000), (1000, 1), (1000, 2), (1, 1), (2, 2)])
def test_overloads_workload(self, writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit):
self._setup_ydb(writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit)

wait_time: int = 60
self.table_name: str = f"log_{writing_in_flight_requests_count_limit}_{writing_in_flight_request_bytes_limit}"

Expand Down Expand Up @@ -191,3 +208,63 @@ def test_overloads_workload(self, writing_in_flight_requests_count_limit, writin

logging.info(f"Count rows after insert {self.get_row_count()}")
assert self.get_row_count() != 0

def tune_icb(self):
response = requests.post(
self.mon_url + "/actors/icb",
data="TabletControls.MaxTxInFly=0"
)
response.raise_for_status()

def test_overloads_reject_probability(self):
self._setup_ydb_rp()
self.tune_icb()

table_path = f"{self.ydb_client.database}/table_for_test_overloads_reject_probability"
self.ydb_client.query(
f"""
CREATE TABLE `{table_path}` (
id Uint64 NOT NULL,
v1 Int64,
v2 Int64,
PRIMARY KEY(id),
)
WITH (
STORE = COLUMN,
PARTITION_COUNT = 1,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1
)
"""
)

column_types = ydb.BulkUpsertColumns()
column_types.add_column("id", ydb.PrimitiveType.Uint64)
column_types.add_column("v1", ydb.PrimitiveType.Int64)
column_types.add_column("v2", ydb.PrimitiveType.Int64)

rows_count = 1000

data = [
{
"id": i,
"v1": 1,
"v2": -1,
}
for i in range(rows_count)
]

self.ydb_client.bulk_upsert(table_path, column_types, data)

futures = []

for _ in range(19):
lb = random.randint(0, rows_count)
futures.append(self.ydb_client.query_async(f"UPDATE `{table_path}` SET v1 = v1 + 1, v2 = v2 - 1 WHERE id > {lb};"))

for future in futures:
future.result()

monitor = self.cluster.monitors[0].fetch()
_, rejectProbabilityCount = monitor.get_by_name('Deriviative/Overload/RejectProbability/Count')[0]

assert rejectProbabilityCount > 0
Loading