Skip to content

Commit 8716881

Browse files
committed
Reject operations in column shards by probability (ydb-platform#26783)
1 parent 7780ff5 commit 8716881

File tree

10 files changed

+107
-4
lines changed

10 files changed

+107
-4
lines changed

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,5 +233,6 @@ message TFeatureFlags {
233233
optional bool EnableTinyDisks = 207 [default = false];
234234
optional bool EnableMetricsLevel = 208 [default = false];
235235
optional bool EnableSecureScriptExecutions = 209 [default = false];
236+
optional bool EnableOlapRejectProbability = 213 [default = false];
236237
optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = false];
237238
}

ydb/core/protos/memory_controller_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,6 @@ message TMemoryControllerConfig {
2828

2929
optional float CompactionLimitPercent = 130 [default = 10];
3030
optional uint64 CompactionLimitBytes = 131;
31+
32+
optional uint64 MaxTxInFly = 132 [default = 10000];
3133
}

ydb/core/tablet_flat/flat_executor.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <ydb/core/base/hive.h>
3232
#include <ydb/core/base/tablet_pipecache.h>
3333
#include <ydb/core/control/lib/immediate_control_board_impl.h>
34+
#include <ydb/core/protos/memory_controller_config.pb.h>
3435
#include <ydb/core/scheme/scheme_type_registry.h>
3536
#include <ydb/core/tablet/tablet_counters_aggregator.h>
3637
#include <ydb/library/actors/core/hfunc.h>
@@ -45,10 +46,10 @@
4546
#include <util/generic/xrange.h>
4647
#include <util/generic/ymath.h>
4748

49+
4850
namespace NKikimr {
4951
namespace NTabletFlatExecutor {
5052

51-
static constexpr ui64 MaxTxInFly = 10000;
5253

5354
LWTRACE_USING(TABLET_FLAT_PROVIDER)
5455

@@ -132,6 +133,7 @@ TExecutor::TExecutor(
132133
, Time(TAppData::TimeProvider)
133134
, Owner(owner)
134135
, OwnerActorId(ownerActorId)
136+
, MaxTxInFly(AppData()->MemoryControllerConfig.GetMaxTxInFly())
135137
, Emitter(new TIdEmitter)
136138
, CounterEventsInFlight(new TEvTabletCounters::TInFlightCookie)
137139
, Stats(new TExecutorStatsImpl())
@@ -140,7 +142,6 @@ TExecutor::TExecutor(
140142
{}
141143

142144
TExecutor::~TExecutor() {
143-
144145
}
145146

146147
bool TExecutor::OnUnhandledException(const std::exception& e) {

ydb/core/tablet_flat/flat_executor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,8 @@ class TExecutor
385385
// Counts the number of times LeaseDuration was increased
386386
size_t LeaseDurationIncreases = 0;
387387

388+
const ui64 MaxTxInFly;
389+
388390
struct TLeaseCommit : public TIntrusiveListItem<TLeaseCommit> {
389391
using TByEndMap = std::multimap<TMonotonic, TLeaseCommit*>;
390392

ydb/core/tx/columnshard/columnshard__overload.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloadedImmediate(const TInte
2424
"limit", txLimit);
2525
return EOverloadStatus::ShardTxInFly;
2626
}
27+
28+
if (AppData()->FeatureFlags.GetEnableOlapRejectProbability()) {
29+
const float rejectProbabilty = Executor()->GetRejectProbability();
30+
if (rejectProbabilty > 0) {
31+
const float rnd = TAppData::RandomProvider->GenRandReal2();
32+
if (rnd < rejectProbabilty) {
33+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "shard_overload")("reason", "reject_probality")("RP", rejectProbabilty);
34+
return EOverloadStatus::RejectProbability;
35+
}
36+
}
37+
}
38+
2739
return EOverloadStatus::None;
2840
}
2941

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
4646
case EOverloadStatus::ShardWritesSizeInFly:
4747
Counters.OnWriteOverloadShardWritesSize(writeSize);
4848
break;
49+
case EOverloadStatus::RejectProbability:
50+
Counters.OnWriteOverloadRejectProbability(writeSize);
51+
break;
4952
case EOverloadStatus::None:
5053
Y_ABORT("invalid function usage");
5154
}
@@ -345,7 +348,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
345348
const auto source = ev->Sender;
346349
const auto cookie = ev->Cookie;
347350

348-
349351
std::optional<TDuration> writeTimeout;
350352
if (record.HasTimeoutSeconds()) {
351353
writeTimeout = TDuration::Seconds(record.GetTimeoutSeconds());

ydb/core/tx/columnshard/counters/columnshard.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ TCSCounters::TCSCounters()
4343
OverloadShardWritesCount = TBase::GetDeriviative("Overload/Shard/Writes/Count");
4444
OverloadShardWritesSizeBytes = TBase::GetDeriviative("Overload/Shard/WritesSize/Bytes");
4545
OverloadShardWritesSizeCount = TBase::GetDeriviative("Overload/Shard/WritesSize/Count");
46+
OverloadRejectProbabilityBytes = TBase::GetDeriviative("Overload/RejectProbability/Bytes");
47+
OverloadRejectProbabilityCount = TBase::GetDeriviative("Overload/RejectProbability/Count");
4648

4749
InternalCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("InternalCompaction/Bytes");
4850
InternalCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("InternalCompaction/PortionsCount");

ydb/core/tx/columnshard/counters/columnshard.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ enum class EOverloadStatus {
1818
OverloadMetadata /* "overload_metadata" */,
1919
Disk /* "disk" */,
2020
None /* "none" */,
21-
OverloadCompaction /* "overload_compaction" */
21+
OverloadCompaction /* "overload_compaction" */,
22+
RejectProbability,
2223
};
2324

2425
enum class EWriteFailReason {
@@ -106,6 +107,8 @@ class TCSCounters: public TCommonCountersOwner {
106107
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesCount;
107108
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeBytes;
108109
NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeCount;
110+
NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityBytes;
111+
NMonitoring::TDynamicCounters::TCounterPtr OverloadRejectProbabilityCount;
109112

110113
std::shared_ptr<TValueAggregationClient> InternalCompactionGranuleBytes;
111114
std::shared_ptr<TValueAggregationClient> InternalCompactionGranulePortionsCount;
@@ -222,6 +225,11 @@ class TCSCounters: public TCommonCountersOwner {
222225
OverloadShardWritesSizeCount->Add(1);
223226
}
224227

228+
void OnWriteOverloadRejectProbability(const ui64 size) const {
229+
OverloadRejectProbabilityBytes->Add(size);
230+
OverloadRejectProbabilityCount->Add(1);
231+
}
232+
225233
void SkipIndexationInputDueToSplitCompaction(const ui64 size) const {
226234
SkipIndexationInputDueToSplitCompactionBytes->Add(size);
227235
SkipIndexationInputDueToSplitCompactionCount->Add(1);

ydb/core/tx/columnshard/counters/counters_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ class TCountersManager {
7979
CSCounters.OnWriteOverloadShardWritesSize(size);
8080
}
8181

82+
void OnWriteOverloadRejectProbability(const ui64 size) const {
83+
TabletCounters->IncCounter(COUNTER_WRITE_OVERLOAD);
84+
CSCounters.OnWriteOverloadRejectProbability(size);
85+
}
86+
8287
void FillTableStats(TInternalPathId pathId, ::NKikimrTableStats::TTableStats& tableStats) {
8388
ColumnTablesCounters->GetPathIdCounter(pathId)->FillStats(tableStats);
8489
BackgroundControllerCounters->FillStats(pathId, tableStats);

ydb/tests/olap/test_overloads.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import yatest.common
66
import ydb
7+
import random
78

89
from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator
910
from ydb.tests.library.harness.kikimr_runner import KiKiMR
@@ -103,6 +104,20 @@ def _setup_ydb(cls, writing_in_flight_requests_count_limit, writing_in_flight_re
103104
cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}")
104105
cls.ydb_client.wait_connection(timeout=60)
105106

107+
@classmethod
108+
def _setup_ydb_rp(cls):
109+
ydb_path = yatest.common.build_path(os.environ.get("YDB_DRIVER_BINARY"))
110+
logger.info(yatest.common.execute([ydb_path, "-V"], wait=True).stdout.decode("utf-8"))
111+
config = KikimrConfigGenerator(
112+
extra_feature_flags={"enable_olap_reject_probability": True},
113+
memory_controller_config={"max_tx_in_fly": 0},
114+
)
115+
cls.cluster = KiKiMR(config)
116+
cls.cluster.start()
117+
node = cls.cluster.nodes[1]
118+
cls.ydb_client = YdbClient(endpoint=f"grpc://{node.host}:{node.port}", database=f"/{config.domain_name}")
119+
cls.ydb_client.wait_connection(timeout=60)
120+
106121
def get_row_count(self) -> int:
107122
return self.ydb_client.query(f"select count(*) as Rows from `{self.table_name}`")[0].rows[0]["Rows"]
108123

@@ -156,6 +171,7 @@ def test_overloads_bulk_upsert(self, writing_in_flight_requests_count_limit, wri
156171
@pytest.mark.parametrize('writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit', [(1, 10000), (2, 10000), (1000, 1), (1000, 2), (1, 1), (2, 2)])
157172
def test_overloads_workload(self, writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit):
158173
self._setup_ydb(writing_in_flight_requests_count_limit, writing_in_flight_request_bytes_limit)
174+
159175
wait_time: int = 60
160176
self.table_name: str = f"log_{writing_in_flight_requests_count_limit}_{writing_in_flight_request_bytes_limit}"
161177

@@ -191,3 +207,55 @@ def test_overloads_workload(self, writing_in_flight_requests_count_limit, writin
191207

192208
logging.info(f"Count rows after insert {self.get_row_count()}")
193209
assert self.get_row_count() != 0
210+
211+
def test_overloads_reject_probability(self):
212+
self._setup_ydb_rp()
213+
214+
table_path = f"{self.ydb_client.database}/table_for_test_overloads_reject_probability"
215+
self.ydb_client.query(
216+
f"""
217+
CREATE TABLE `{table_path}` (
218+
id Uint64 NOT NULL,
219+
v1 Int64,
220+
v2 Int64,
221+
PRIMARY KEY(id),
222+
)
223+
WITH (
224+
STORE = COLUMN,
225+
PARTITION_COUNT = 1,
226+
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1
227+
)
228+
"""
229+
)
230+
231+
column_types = ydb.BulkUpsertColumns()
232+
column_types.add_column("id", ydb.PrimitiveType.Uint64)
233+
column_types.add_column("v1", ydb.PrimitiveType.Int64)
234+
column_types.add_column("v2", ydb.PrimitiveType.Int64)
235+
236+
rows_count = 1000
237+
238+
data = [
239+
{
240+
"id": i,
241+
"v1": 1,
242+
"v2": -1,
243+
}
244+
for i in range(rows_count)
245+
]
246+
247+
self.ydb_client.bulk_upsert(table_path, column_types, data)
248+
249+
futures = []
250+
251+
for _ in range(19):
252+
lb = random.randint(0, rows_count)
253+
futures.append(self.ydb_client.query_async(f"UPDATE `{table_path}` SET v1 = v1 + 1, v2 = v2 - 1 WHERE id > {lb};"))
254+
255+
for future in futures:
256+
future.result()
257+
258+
monitor = self.cluster.monitors[0].fetch()
259+
_, rejectProbabilityCount = monitor.get_by_name('Deriviative/Overload/RejectProbability/Count')[0]
260+
261+
assert rejectProbabilityCount > 0

0 commit comments

Comments
 (0)