Skip to content

Commit c6f531a

Browse files
authored
Filled database name in Scheme Cache P.3 (#27693)
1 parent 9311533 commit c6f531a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+251
-129
lines changed

ydb/core/grpc_services/rpc_load_rows.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
159159
: TBase(std::make_shared<TVector<std::pair<TSerializedCellVec, TString>>>(), GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded,
160160
NWilson::TSpan(TWilsonKqp::BulkUpsertActor, request->GetWilsonTraceId(), name))
161161
, Request(request)
162+
, Database(Request->GetDatabaseName().GetOrElse(""))
162163
{
163164
}
164165

@@ -182,11 +183,11 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
182183
NKikimr::NGRpcService::AuditContextAppend(Request.get(), *GetProtoRequest(Request.get()));
183184
}
184185

185-
TString GetDatabase() override {
186-
return Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()));
186+
const TString& GetDatabase() const override {
187+
return Database;
187188
}
188189

189-
const TString& GetTable() override {
190+
const TString& GetTable() const override {
190191
return GetProtoRequest(Request.get())->table();
191192
}
192193

@@ -305,6 +306,7 @@ class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::T
305306

306307
private:
307308
std::unique_ptr<IRequestOpCtx> Request;
309+
const TString Database;
308310
};
309311

310312
class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
@@ -313,6 +315,7 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
313315
explicit TUploadColumnsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded)
314316
: TBase(std::make_shared<TVector<std::pair<TSerializedCellVec, TString>>>(), GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded)
315317
, Request(request)
318+
, Database(Request->GetDatabaseName().GetOrElse(""))
316319
{
317320
}
318321

@@ -347,11 +350,11 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
347350
NKikimr::NGRpcService::AuditContextAppend(Request.get(), *GetProtoRequest(Request.get()));
348351
}
349352

350-
TString GetDatabase() override {
351-
return Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()));
353+
const TString& GetDatabase() const override {
354+
return Database;
352355
}
353356

354-
const TString& GetTable() override {
357+
const TString& GetTable() const override {
355358
return GetProtoRequest(Request.get())->table();
356359
}
357360

@@ -510,6 +513,7 @@ class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices
510513

511514
private:
512515
std::unique_ptr<IRequestOpCtx> Request;
516+
const TString Database;
513517

514518
const Ydb::Formats::CsvSettings& GetCsvSettings() const {
515519
return GetProtoRequest(Request.get())->csv_settings();

ydb/core/kqp/ut/data/kqp_read_null_ut.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ using TRowTypes = TVector<std::pair<TString, Ydb::Type>>;
3030
static void DoStartUploadTestRows(
3131
const Tests::TServer::TPtr& server,
3232
const TActorId& sender,
33+
const TString& database,
3334
const TString& tableName,
3435
Ydb::Type::PrimitiveTypeId typeId,
3536
bool uploadNull)
@@ -53,7 +54,7 @@ static void DoStartUploadTestRows(
5354
rows->emplace_back(serializedKey, serializedValue);
5455
}
5556

56-
auto actor = NTxProxy::CreateUploadRowsInternal(sender, tableName, types, rows);
57+
auto actor = NTxProxy::CreateUploadRowsInternal(sender, database, tableName, types, rows);
5758
runtime.Register(actor);
5859
}
5960

@@ -69,10 +70,11 @@ static void DoWaitUploadTestRows(
6970
}
7071

7172
static void DoUploadTestRows(Tests::TServer::TPtr server, const TActorId& sender,
72-
const TString& tableName, Ydb::Type::PrimitiveTypeId typeId,
73+
const TString& database, const TString& tableName,
74+
Ydb::Type::PrimitiveTypeId typeId,
7375
Ydb::StatusIds::StatusCode expected, bool uploadNull)
7476
{
75-
DoStartUploadTestRows(server, sender, tableName, typeId, uploadNull);
77+
DoStartUploadTestRows(server, sender, database, tableName, typeId, uploadNull);
7678
DoWaitUploadTestRows(server, sender, expected);
7779
}
7880

@@ -101,8 +103,8 @@ Y_UNIT_TEST_SUITE(KqpUserConstraint) {
101103

102104
CreateShardedTable(server, sender, "/Root", "table-1", std::move(opts));
103105

104-
DoUploadTestRows(server, sender, "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::SUCCESS, UploadNull);
105-
106+
DoUploadTestRows(server, sender, "/Root", "/Root/table-1", Ydb::Type::UINT32, Ydb::StatusIds::SUCCESS, UploadNull);
107+
106108
auto request = MakeSQLRequest("SELECT * FROM `/Root/table-1`", true);
107109
runtime.Send(new IEventHandle(NKqp::MakeKqpProxyID(runtime.GetNodeId()), sender, request.Release()));
108110
auto ev = runtime.GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(sender);

ydb/core/protos/tx_datashard.proto

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1445,6 +1445,7 @@ message TEvConditionalEraseRowsRequest {
14451445
optional uint64 SchemaVersion = 3;
14461446
repeated TIndexDescription Indexes = 4;
14471447
optional TLimits Limits = 5;
1448+
optional string DatabaseName = 6;
14481449

14491450
oneof Condition {
14501451
TExpirationCondition Expiration = 2;
@@ -1501,6 +1502,8 @@ message TEvBuildIndexCreateRequest {
15011502
optional NKikimrIndexBuilder.TColumnBuildSettings ColumnBuildSettings = 16;
15021503

15031504
optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 17;
1505+
1506+
optional string DatabaseName = 18;
15041507
}
15051508

15061509
message TEvBuildIndexProgressResponse {
@@ -1611,6 +1614,8 @@ message TEvLocalKMeansRequest {
16111614
repeated string DataColumns = 20;
16121615

16131616
optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 22;
1617+
1618+
optional string DatabaseName = 23;
16141619
}
16151620

16161621
message TEvLocalKMeansResponse {
@@ -1659,6 +1664,8 @@ message TEvReshuffleKMeansRequest {
16591664
repeated string DataColumns = 15;
16601665

16611666
optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 16;
1667+
1668+
optional string DatabaseName = 17;
16621669
}
16631670

16641671
message TEvReshuffleKMeansResponse {
@@ -1725,7 +1732,7 @@ message TEvIncrementalRestoreResponse {
17251732
RETRY = 1;
17261733
ERROR = 2;
17271734
}
1728-
1735+
17291736
optional uint64 TxId = 1;
17301737
optional uint64 TableId = 2;
17311738
optional uint64 OperationId = 3;
@@ -1770,6 +1777,8 @@ message TEvPrefixKMeansRequest {
17701777
optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 18;
17711778

17721779
repeated string SourcePrimaryKeyColumns = 19;
1780+
1781+
optional string DatabaseName = 20;
17731782
}
17741783

17751784
message TEvPrefixKMeansResponse {
@@ -1806,6 +1815,8 @@ message TEvBuildFulltextIndexRequest {
18061815
repeated string DataColumns = 10;
18071816

18081817
optional NKikimrIndexBuilder.TIndexBuildScanSettings ScanSettings = 11;
1818+
1819+
optional string DatabaseName = 12;
18091820
}
18101821

18111822
message TEvBuildFulltextIndexResponse {

ydb/core/transfer/column_table.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ class TColumnTableState : public ITableKindState {
99
public:
1010
TColumnTableState(
1111
const TActorId& selfId,
12+
const TString& database,
1213
TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result
1314
)
14-
: ITableKindState(selfId, result)
15+
: ITableKindState(selfId, database, result)
1516
{
1617
Path = JoinPath(result->ResultSet.front().Path);
1718
}
@@ -33,7 +34,7 @@ class TColumnTableState : public ITableKindState {
3334
}
3435

3536
UploaderActorId = TActivationContext::AsActorContext().RegisterWithSameMailbox(
36-
new TTableUploader(SelfId, GetScheme(), std::move(tableData))
37+
new TTableUploader(SelfId, Database, GetScheme(), std::move(tableData))
3738
);
3839

3940
Batchers.clear();
@@ -45,13 +46,16 @@ class TColumnTableState : public ITableKindState {
4546
TString Path;
4647
};
4748

48-
std::unique_ptr<ITableKindState> CreateColumnTableState(const TActorId& selfId, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result) {
49-
return std::make_unique<TColumnTableState>(selfId, result);
49+
std::unique_ptr<ITableKindState> CreateColumnTableState(const TActorId& selfId, const TString& database, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result) {
50+
return std::make_unique<TColumnTableState>(selfId, database, result);
5051
}
5152

5253
template<>
53-
IActor* TTableUploader<arrow::RecordBatch>::CreateUploaderInternal(const TString& tablePath, const std::shared_ptr<arrow::RecordBatch>& data, ui64 cookie) {
54-
return NTxProxy::CreateUploadColumnsInternal(SelfId(), tablePath, Scheme->Types, data, cookie);
54+
IActor* TTableUploader<arrow::RecordBatch>::CreateUploaderInternal(
55+
const TString& database, const TString& tablePath,
56+
const std::shared_ptr<arrow::RecordBatch>& data, ui64 cookie)
57+
{
58+
return NTxProxy::CreateUploadColumnsInternal(SelfId(), database, tablePath, Scheme->Types, data, cookie);
5559
}
5660

5761
}

ydb/core/transfer/row_table.cpp

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ class TRowTableState : public ITableKindState {
1313
public:
1414
TRowTableState(
1515
const TActorId& selfId,
16+
const TString& database,
1617
TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result
1718
)
18-
: ITableKindState(selfId, result)
19+
: ITableKindState(selfId, database, result)
1920
{
2021
Path = JoinPath(result->ResultSet.front().Path);
2122
}
@@ -61,7 +62,7 @@ class TRowTableState : public ITableKindState {
6162
}
6263

6364
UploaderActorId = TActivationContext::AsActorContext().RegisterWithSameMailbox(
64-
new TTableUploader(SelfId, GetScheme(), std::move(tableData))
65+
new TTableUploader(SelfId, Database, GetScheme(), std::move(tableData))
6566
);
6667

6768
Batchers.clear();
@@ -73,8 +74,8 @@ class TRowTableState : public ITableKindState {
7374
TString Path;
7475
};
7576

76-
std::unique_ptr<ITableKindState> CreateRowTableState(const TActorId& selfId, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result) {
77-
return std::make_unique<TRowTableState>(selfId, result);
77+
std::unique_ptr<ITableKindState> CreateRowTableState(const TActorId& selfId, const TString& database, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result) {
78+
return std::make_unique<TRowTableState>(selfId, database, result);
7879
}
7980

8081
namespace {
@@ -83,8 +84,11 @@ const TBackoff DefaultBackoff = TBackoff(TDuration::Seconds(1), TDuration::Secon
8384

8485
}
8586
template<>
86-
IActor* TTableUploader<TData>::CreateUploaderInternal(const TString& tablePath, const std::shared_ptr<TData>& data, ui64 cookie) {
87-
return NTxProxy::CreateUploadRowsInternal(SelfId(), tablePath, Scheme->Types, data, NTxProxy::EUploadRowsMode::Normal, false, false, cookie, DefaultBackoff);
87+
IActor* TTableUploader<TData>::CreateUploaderInternal(
88+
const TString& database, const TString& tablePath,
89+
const std::shared_ptr<TData>& data, ui64 cookie)
90+
{
91+
return NTxProxy::CreateUploadRowsInternal(SelfId(), database, tablePath, Scheme->Types, data, NTxProxy::EUploadRowsMode::Normal, false, false, cookie, DefaultBackoff);
8892
}
8993

9094
}

ydb/core/transfer/table_kind_state.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ class ITableKindState {
1010
public:
1111
using TPtr = std::unique_ptr<ITableKindState>;
1212

13-
ITableKindState(const TActorId& selfId, const TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result)
13+
ITableKindState(const TActorId& selfId, const TString& database, const TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result)
1414
: SelfId(selfId)
15+
, Database(database)
1516
, Scheme(BuildScheme(result))
1617
{}
1718

@@ -55,14 +56,15 @@ class ITableKindState {
5556

5657
protected:
5758
const TActorId SelfId;
59+
const TString Database;
5860
const TScheme::TPtr Scheme;
5961

6062
std::map<TString, NKqp::IDataBatcherPtr> Batchers;
6163
TActorId UploaderActorId;
6264
};
6365

6466

65-
std::unique_ptr<ITableKindState> CreateColumnTableState(const TActorId& selfId, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result);
66-
std::unique_ptr<ITableKindState> CreateRowTableState(const TActorId& selfId, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result);
67+
std::unique_ptr<ITableKindState> CreateColumnTableState(const TActorId& selfId, const TString& database, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result);
68+
std::unique_ptr<ITableKindState> CreateRowTableState(const TActorId& selfId, const TString& database, TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& result);
6769

6870
}

ydb/core/transfer/transfer_writer.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class TTransferWriter
5151
Become(&TThis::StateGetTableScheme);
5252

5353
auto request = MakeHolder<TNavigate>();
54+
request->DatabaseName = Database;
5455
request->ResultSet.emplace_back(MakeNavigateEntry(TablePathId, TNavigate::OpTable));
5556
Send(MakeSchemeCacheID(), new TEvNavigate(request.Release()));
5657
}
@@ -137,9 +138,9 @@ class TTransferWriter
137138
DefaultTablePath = JoinPath(entry.Path);
138139

139140
if (entry.Kind == TNavigate::KindColumnTable) {
140-
TableState = CreateColumnTableState(SelfId(), result);
141+
TableState = CreateColumnTableState(SelfId(), Database, result);
141142
} else {
142-
TableState = CreateRowTableState(SelfId(), result);
143+
TableState = CreateRowTableState(SelfId(), Database, result);
143144
}
144145

145146
CompileTransferLambda();

ydb/core/transfer/uploader.h

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@ class TTableUploader : public TActorBootstrapped<TTableUploader<TData>> {
1717
static constexpr size_t MaxSchemeRetries = 3;
1818

1919
public:
20-
TTableUploader(const TActorId& parentActor, const TScheme::TPtr& scheme, std::unordered_map<TString, std::shared_ptr<TData>>&& data)
20+
TTableUploader(const TActorId& parentActor,
21+
const TString& database, const TScheme::TPtr& scheme,
22+
std::unordered_map<TString, std::shared_ptr<TData>>&& data
23+
)
2124
: ParentActor(parentActor)
25+
, Database(database)
2226
, Scheme(scheme)
2327
, Data(std::move(data))
2428
{
@@ -36,13 +40,13 @@ class TTableUploader : public TActorBootstrapped<TTableUploader<TData>> {
3640
}
3741
}
3842

39-
IActor* CreateUploaderInternal(const TString& tablePath, const std::shared_ptr<TData>& data, ui64 cookie);
43+
IActor* CreateUploaderInternal(const TString& database, const TString& tablePath, const std::shared_ptr<TData>& data, ui64 cookie);
4044

4145
void DoUpload(const TString& tablePath, const std::shared_ptr<TData>& data) {
4246
auto cookie = ++Cookie;
4347

4448
auto actorId = TActivationContext::AsActorContext().RegisterWithSameMailbox(
45-
CreateUploaderInternal(tablePath, data, cookie)
49+
CreateUploaderInternal(Database, tablePath, data, cookie)
4650
);
4751
CookieMapping[cookie] = {tablePath, actorId};
4852
}
@@ -97,7 +101,7 @@ class TTableUploader : public TActorBootstrapped<TTableUploader<TData>> {
97101

98102
void Handle(NTransferPrivate::TEvRetryTable::TPtr& ev) {
99103
auto& tablePath = ev->Get()->TablePath;
100-
104+
101105
auto it = Data.find(tablePath);
102106
if (it == Data.end()) {
103107
return ReplyErrorAndDie(TStringBuilder() << "Unexpected retry for table '" << tablePath << "'");
@@ -144,6 +148,7 @@ class TTableUploader : public TActorBootstrapped<TTableUploader<TData>> {
144148

145149
private:
146150
const TActorId ParentActor;
151+
const TString Database;
147152
const TScheme::TPtr Scheme;
148153
// Table path -> Data
149154
std::unordered_map<TString, std::shared_ptr<TData>> Data;

ydb/core/tx/datashard/build_index/common_helper.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ class TBatchRowsUploader
2929
};
3030

3131
public:
32-
TBatchRowsUploader(const TIndexBuildScanSettings& scanSettings)
33-
: ScanSettings(scanSettings)
32+
TBatchRowsUploader(const TString& database, const TIndexBuildScanSettings& scanSettings)
33+
: Database(database)
34+
, ScanSettings(scanSettings)
3435
{}
3536

3637
TBufferData* AddDestination(TString table, std::shared_ptr<NTxProxy::TUploadTypes> types) {
@@ -208,7 +209,7 @@ class TBatchRowsUploader
208209
Y_ENSURE(!UploaderId);
209210
Y_ENSURE(Owner);
210211
auto actor = NTxProxy::CreateUploadRowsInternal(
211-
Owner, Uploading.Table, Uploading.Types, Uploading.Buffer.GetRowsData(),
212+
Owner, Database, Uploading.Table, Uploading.Types, Uploading.Buffer.GetRowsData(),
212213
NTxProxy::EUploadRowsMode::WriteToTableShadow,
213214
true /*writeToPrivateTable*/,
214215
true /*writeToIndexImplTable*/);
@@ -217,6 +218,7 @@ class TBatchRowsUploader
217218
}
218219

219220
private:
221+
const TString Database;
220222
const TIndexBuildScanSettings ScanSettings;
221223
TActorId Owner;
222224

0 commit comments

Comments
 (0)