Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_build_index__progress.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2604,6 +2604,23 @@ struct TSchemeShard::TIndexBuilder::TTxReplyModify: public TSchemeShard::TIndexB
case TIndexBuildInfo::EState::CreateBuild:
case TIndexBuildInfo::EState::LockBuild:
case TIndexBuildInfo::EState::AlterSequence:
{
Y_ENSURE(txId == buildInfo.ApplyTxId);

if (record.GetStatus() != NKikimrScheme::StatusAccepted &&
record.GetStatus() != NKikimrScheme::StatusAlreadyExists) {
// Otherwise we won't cancel the index build correctly
buildInfo.ApplyTxId = {};
buildInfo.ApplyTxStatus = NKikimrScheme::StatusSuccess;
buildInfo.ApplyTxDone = false;
} else {
buildInfo.ApplyTxStatus = record.GetStatus();
}
Self->PersistBuildIndexApplyTxStatus(db, buildInfo);

ifErrorMoveTo(TIndexBuildInfo::EState::Rejection_Applying);
break;
}
case TIndexBuildInfo::EState::Applying:
case TIndexBuildInfo::EState::Rejection_Applying:
{
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2842,7 +2842,7 @@ namespace NSchemeShardUT_Private {
return WaitNextValResult(runtime, sender, expectedStatus);
}

NKikimrMiniKQL::TResult ReadTable(TTestActorRuntime& runtime, ui64 tabletId,
NKikimrMiniKQL::TResult ReadSystemTable(TTestActorRuntime& runtime, ui64 tabletId,
const TString& table, const TVector<TString>& pk, const TVector<TString>& columns,
const TString& rangeFlags)
{
Expand All @@ -2857,7 +2857,7 @@ namespace NSchemeShardUT_Private {
NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"((
(let range '(%s%s))
(let columns '(%s))
(let result (SelectRange '__user__%s range columns '()))
(let result (SelectRange '%s range columns '()))
(return (AsList (SetResult 'Result result) ))
))", rangeFlags.data(), keyFmt.data(), columnsFmt.data(), table.data()), result, error);
UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
Expand All @@ -2866,6 +2866,13 @@ namespace NSchemeShardUT_Private {
return result;
}

NKikimrMiniKQL::TResult ReadTable(TTestActorRuntime& runtime, ui64 tabletId,
const TString& table, const TVector<TString>& pk, const TVector<TString>& columns,
const TString& rangeFlags)
{
return ReadSystemTable(runtime, tabletId, "__user__"+table, pk, columns, rangeFlags);
}

ui32 CountRows(TTestActorRuntime& runtime, ui64 schemeshardId, const TString& table) {
auto tableDesc = DescribePath(runtime, schemeshardId, table, true, false, true);
const auto& pathDesc = tableDesc.GetPathDescription();
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ namespace NSchemeShardUT_Private {
TTestActorRuntime& runtime, const TString& path,
Ydb::StatusIds::StatusCode expectedStatus = Ydb::StatusIds::SUCCESS);

NKikimrMiniKQL::TResult ReadSystemTable(TTestActorRuntime& runtime, ui64 tabletId,
const TString& table, const TVector<TString>& pk, const TVector<TString>& columns, const TString& rangeFlags = "");
NKikimrMiniKQL::TResult ReadTable(TTestActorRuntime& runtime, ui64 tabletId,
const TString& table, const TVector<TString>& pk, const TVector<TString>& columns, const TString& rangeFlags = "");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <ydb/public/lib/deprecated/kicli/kicli.h>
#include <ydb/core/base/table_index.h>
#include <ydb/core/protos/schemeshard/operations.pb.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
Expand Down Expand Up @@ -1710,4 +1711,92 @@ Y_UNIT_TEST_SUITE(VectorIndexBuildTest) {
}
}

Y_UNIT_TEST(CreateBuildProposeReject) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::BUILD_INDEX, NLog::PRI_TRACE);

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "vectors"
Columns { Name: "id" Type: "Uint64" }
Columns { Name: "embedding" Type: "String" }
KeyColumnNames: [ "id" ]
)");
env.TestWaitNotification(runtime, txId);

NYdb::NTable::TGlobalIndexSettings globalIndexSettings;

std::unique_ptr<NYdb::NTable::TKMeansTreeSettings> kmeansTreeSettings;
{
Ydb::Table::KMeansTreeSettings proto;
UNIT_ASSERT(google::protobuf::TextFormat::ParseFromString(R"(
settings {
metric: DISTANCE_COSINE
vector_type: VECTOR_TYPE_FLOAT
vector_dimension: 1024
}
levels: 5
clusters: 4
)", &proto));
using T = NYdb::NTable::TKMeansTreeSettings;
kmeansTreeSettings = std::make_unique<T>(T::FromProto(proto));
}

const auto maxShards = DescribePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/vectors")
.GetPathDescription().GetDomainDescription().GetSchemeLimits().GetMaxShardsInPath();

TBlockEvents<TEvSchemeShard::TEvModifySchemeTransaction> blocker(runtime, [&](auto& ev) {
auto& modifyScheme = *ev->Get()->Record.MutableTransaction(0);
if (modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpInitiateBuildIndexImplTable) {
auto& op = *modifyScheme.MutableCreateTable();
// make shard count exceed the limit to fail the operation
op.SetUniformPartitionsCount(maxShards+1);
}
return false;
});

const ui64 buildIndexTx = ++txId;
AsyncBuildVectorIndex(runtime, buildIndexTx, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", "index1", {"embedding"});

env.TestWaitNotification(runtime, buildIndexTx);

{
auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx);
Cout << "BuildIndex 1 " << buildIndexOperation.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL_C(
buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_REJECTED,
buildIndexOperation.DebugString()
);
UNIT_ASSERT_STRING_CONTAINS(buildIndexOperation.DebugString(), "Invalid partition count specified");
}

blocker.Stop().Unblock();

{
auto result = ReadSystemTable(runtime, TTestTxConfig::SchemeShard, "SnapshotTables", {"Id", "TableOwnerId", "TableLocalId"}, {"Id"});
auto value = NClient::TValue::Create(result);
auto rowCount = value["Result"]["List"].Size();
UNIT_ASSERT_VALUES_EQUAL_C(rowCount, 0, "Snapshot is not removed after rejecting index build");
}

// The next index build should succeed

const ui64 buildIndexTx2 = ++txId;
AsyncBuildVectorIndex(runtime, buildIndexTx2, TTestTxConfig::SchemeShard, "/MyRoot", "/MyRoot/vectors", "index1", {"embedding"});
env.TestWaitNotification(runtime, buildIndexTx2);

{
auto buildIndexOperation = TestGetBuildIndex(runtime, TTestTxConfig::SchemeShard, "/MyRoot", buildIndexTx2);
Cout << "BuildIndex 2 " << buildIndexOperation.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL_C(
buildIndexOperation.GetIndexBuild().GetState(), Ydb::Table::IndexBuildState::STATE_DONE,
buildIndexOperation.DebugString()
);
}

}

}
Loading