From fcff2bc8045d1831cd2fac3fc4362e3512e5a73b Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 19 Jul 2024 21:50:18 +0300 Subject: [PATCH] Allow to add changefeed to index table via AlterTable (#6883) --- ydb/core/grpc_services/rpc_alter_table.cpp | 77 +++++++++++++++++++--- ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 36 ++++++++++ ydb/core/tx/scheme_board/cache.cpp | 2 - ydb/core/ydb_convert/table_description.cpp | 12 +++- ydb/core/ydb_convert/table_description.h | 3 + 5 files changed, 117 insertions(+), 13 deletions(-) diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp index b18ade6d3f0b..a33a030fc941 100644 --- a/ydb/core/grpc_services/rpc_alter_table.cpp +++ b/ydb/core/grpc_services/rpc_alter_table.cpp @@ -109,12 +109,12 @@ class TAlterTableRPC : public TRpcSchemeRequestActorServices.SchemeCache, ctx); } - void PrepareAlterUserAttrubutes() { + void GetProxyServices() { using namespace NTxProxy; Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest); } @@ -222,13 +222,38 @@ class TAlterTableRPC : public TRpcSchemeRequestActor(ev)->Request->ResultSet.emplace_back(); - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; entry.Path = paths; } Send(schemeCache, ev); } + void Navigate(const TTableId& pathId, const TActorContext& ctx) { + DatabaseName = Request_->GetDatabaseName() + .GetOrElse(DatabaseFromDomain(AppData())); + + auto ev = CreateNavigateForPath(DatabaseName); + { + auto& entry = static_cast(ev)->Request->ResultSet.emplace_back(); + entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId; + entry.TableId = pathId; + entry.ShowPrivatePath = true; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; + } + + Send(MakeSchemeCacheID(), ev); + } + + static bool IsChangefeedOperation(EOp type) { + switch (type) { + case EOp::AddChangefeed: + case EOp::DropChangefeed: + return true; + default: + return false; + } + } + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { TXLOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult" << ", errors# " << ev->Get()->Request.Get()->ErrorCount); @@ -251,13 +276,48 @@ class TAlterTableRPC : public TRpcSchemeRequestActorResultSet.empty()); + const auto& entry = resp->ResultSet.back(); + + switch (entry.Kind) { + case NSchemeCache::TSchemeCacheNavigate::KindTable: + case NSchemeCache::TSchemeCacheNavigate::KindColumnTable: + case NSchemeCache::TSchemeCacheNavigate::KindExternalTable: + case NSchemeCache::TSchemeCacheNavigate::KindExternalDataSource: + case NSchemeCache::TSchemeCacheNavigate::KindView: + break; // table + case NSchemeCache::TSchemeCacheNavigate::KindIndex: + if (IsChangefeedOperation(OpType)) { + break; + } + [[fallthrough]]; + default: + Request_->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TStringBuilder() + << "Unable to nagivate: " << JoinPath(entry.Path) << " status: PathNotTable")); + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + switch (OpType) { case EOp::AddIndex: return AlterTableAddIndexOp(resp, ctx); case EOp::Attribute: - Y_ABORT_UNLESS(!resp->ResultSet.empty()); ResolvedPathId = resp->ResultSet.back().TableId.PathId; return AlterTable(ctx); + case EOp::AddChangefeed: + case EOp::DropChangefeed: + if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindIndex) { + AlterTable(ctx); + } else if (auto list = entry.ListNodeEntry) { + if (list->Children.size() != 1) { + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + + const auto& child = list->Children.at(0); + AlterTable(ctx, CanonizePath(ChildPath(NKikimr::SplitPath(GetProtoRequest()->path()), child.Name))); + } else { + Navigate(entry.TableId, ctx); + } + break; default: TXLOG_E("Got unexpected cache response"); return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); @@ -351,13 +411,14 @@ class TAlterTableRPC : public TRpcSchemeRequestActor& overridePath = {}) { const auto req = GetProtoRequest(); std::unique_ptr proposeRequest = CreateProposeTransaction(); auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme(); + modifyScheme->SetAllowAccessToPrivatePaths(overridePath.Defined()); Ydb::StatusIds::StatusCode code; TString error; - if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) { + if (!BuildAlterTableModifyScheme(overridePath.GetOrElse(req->path()), req, modifyScheme, Profiles, ResolvedPathId, code, error)) { NYql::TIssues issues; issues.AddIssue(NYql::TIssue(error)); return Reply(code, issues, ctx); diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index d68616aecab4..9de9ab98b3cf 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -3929,6 +3929,42 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } } + Y_UNIT_TEST(ChangefeedOnIndexTable) { + TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + auto query = R"( + --!syntax_v1 + CREATE TABLE `/Root/table` ( + Key Uint64, + Value String, + PRIMARY KEY (Key), + INDEX SyncIndex GLOBAL SYNC ON (`Value`), + INDEX AsyncIndex GLOBAL ASYNC ON (`Value`) + ); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json); + { + auto result = session.AlterTable("/Root/table/AsyncIndex", TAlterTableSettings() + .AppendAddChangefeeds(changefeed) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } + { + auto result = session.AlterTable("/Root/table/SyncIndex", TAlterTableSettings() + .AppendAddChangefeeds(changefeed) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + Y_UNIT_TEST(CreatedAt) { TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root")); diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 89f0796b905a..f64137b6e802 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -897,8 +897,6 @@ class TSchemeCache: public TMonitorableActor { default: return false; } - case NKikimrSchemeOp::EPathTypeTableIndex: - return true; default: return false; } diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 3ca7f16acd6b..ad2933dc179f 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -163,7 +163,8 @@ bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NK return true; } -bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles, +bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req, + NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& code, TString& error) { @@ -184,7 +185,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki const auto OpType = *ops.begin(); try { - pathPair = SplitPathIntoWorkingDirAndName(req->path()); + pathPair = SplitPathIntoWorkingDirAndName(path); } catch (const std::exception&) { code = Ydb::StatusIds::BAD_REQUEST; return false; @@ -227,7 +228,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki for(const auto& rename: req->rename_indexes()) { modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex); auto& alter = *modifyScheme->MutableMoveIndex(); - alter.SetTablePath(req->path()); + alter.SetTablePath(path); alter.SetSrcPath(rename.source_name()); alter.SetDstPath(rename.destination_name()); alter.SetAllowOverwrite(rename.replace_destination()); @@ -352,6 +353,11 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki return true; } +bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, + const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& code, TString& error) +{ + return BuildAlterTableModifyScheme(req->path(), req, modifyScheme, profiles, resolvedPathId, code, error); +} template static Ydb::Type* AddColumn(Ydb::Table::ColumnMeta* newColumn, const TColumn& column) { diff --git a/ydb/core/ydb_convert/table_description.h b/ydb/core/ydb_convert/table_description.h index 8cbbe385214d..91c56497132d 100644 --- a/ydb/core/ydb_convert/table_description.h +++ b/ydb/core/ydb_convert/table_description.h @@ -30,6 +30,9 @@ struct TPathId; THashSet GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req); +bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, + const TTableProfiles& profiles, const TPathId& resolvedPathId, + Ydb::StatusIds::StatusCode& status, TString& error); bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& status, TString& error);