Skip to content

Commit

Permalink
Allow to add changefeed to index table via AlterTable (ydb-platform#6883
Browse files Browse the repository at this point in the history
)
  • Loading branch information
CyberROFL committed Jul 26, 2024
1 parent 5649cd9 commit 171e293
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 13 deletions.
77 changes: 69 additions & 8 deletions ydb/core/grpc_services/rpc_alter_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
break;

case EOp::Attribute:
PrepareAlterUserAttrubutes();
case EOp::AddChangefeed:
case EOp::DropChangefeed:
GetProxyServices();
break;

case EOp::AddChangefeed:
case EOp::DropIndex:
case EOp::DropChangefeed:
case EOp::RenameIndex:
AlterTable(ctx);
break;
Expand Down Expand Up @@ -197,7 +197,7 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
Navigate(msg->Services.SchemeCache, ctx);
}

void PrepareAlterUserAttrubutes() {
void GetProxyServices() {
using namespace NTxProxy;
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvGetProxyServicesRequest);
}
Expand All @@ -222,13 +222,38 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
auto ev = CreateNavigateForPath(DatabaseName);
{
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
entry.Path = paths;
}

Send(schemeCache, ev);
}

void Navigate(const TTableId& pathId, const TActorContext& ctx) {
DatabaseName = Request_->GetDatabaseName()
.GetOrElse(DatabaseFromDomain(AppData()));

auto ev = CreateNavigateForPath(DatabaseName);
{
auto& entry = static_cast<TEvTxProxySchemeCache::TEvNavigateKeySet*>(ev)->Request->ResultSet.emplace_back();
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
entry.TableId = pathId;
entry.ShowPrivatePath = true;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
}

Send(MakeSchemeCacheID(), ev);
}

static bool IsChangefeedOperation(EOp type) {
switch (type) {
case EOp::AddChangefeed:
case EOp::DropChangefeed:
return true;
default:
return false;
}
}

void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
TXLOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult"
<< ", errors# " << ev->Get()->Request.Get()->ErrorCount);
Expand All @@ -251,13 +276,48 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

Y_ABORT_UNLESS(!resp->ResultSet.empty());
const auto& entry = resp->ResultSet.back();

switch (entry.Kind) {
case NSchemeCache::TSchemeCacheNavigate::KindTable:
case NSchemeCache::TSchemeCacheNavigate::KindColumnTable:
case NSchemeCache::TSchemeCacheNavigate::KindExternalTable:
case NSchemeCache::TSchemeCacheNavigate::KindExternalDataSource:
case NSchemeCache::TSchemeCacheNavigate::KindView:
break; // table
case NSchemeCache::TSchemeCacheNavigate::KindIndex:
if (IsChangefeedOperation(OpType)) {
break;
}
[[fallthrough]];
default:
Request_->RaiseIssue(MakeIssue(NKikimrIssues::TIssuesIds::GENERIC_RESOLVE_ERROR, TStringBuilder()
<< "Unable to nagivate: " << JoinPath(entry.Path) << " status: PathNotTable"));
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

switch (OpType) {
case EOp::AddIndex:
return AlterTableAddIndexOp(resp, ctx);
case EOp::Attribute:
Y_ABORT_UNLESS(!resp->ResultSet.empty());
ResolvedPathId = resp->ResultSet.back().TableId.PathId;
return AlterTable(ctx);
case EOp::AddChangefeed:
case EOp::DropChangefeed:
if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindIndex) {
AlterTable(ctx);
} else if (auto list = entry.ListNodeEntry) {
if (list->Children.size() != 1) {
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

const auto& child = list->Children.at(0);
AlterTable(ctx, CanonizePath(ChildPath(NKikimr::SplitPath(GetProtoRequest()->path()), child.Name)));
} else {
Navigate(entry.TableId, ctx);
}
break;
default:
TXLOG_E("Got unexpected cache response");
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
Expand Down Expand Up @@ -351,13 +411,14 @@ class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTab
Die(ctx);
}

void AlterTable(const TActorContext &ctx) {
void AlterTable(const TActorContext &ctx, const TMaybe<TString>& overridePath = {}) {
const auto req = GetProtoRequest();
std::unique_ptr<TEvTxUserProxy::TEvProposeTransaction> proposeRequest = CreateProposeTransaction();
auto modifyScheme = proposeRequest->Record.MutableTransaction()->MutableModifyScheme();
modifyScheme->SetAllowAccessToPrivatePaths(overridePath.Defined());
Ydb::StatusIds::StatusCode code;
TString error;
if (!BuildAlterTableModifyScheme(req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
if (!BuildAlterTableModifyScheme(overridePath.GetOrElse(req->path()), req, modifyScheme, Profiles, ResolvedPathId, code, error)) {
NYql::TIssues issues;
issues.AddIssue(NYql::TIssue(error));
return Reply(code, issues, ctx);
Expand Down
36 changes: 36 additions & 0 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4075,6 +4075,42 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}

Y_UNIT_TEST(ChangefeedOnIndexTable) {
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

{
auto query = R"(
--!syntax_v1
CREATE TABLE `/Root/table` (
Key Uint64,
Value String,
PRIMARY KEY (Key),
INDEX SyncIndex GLOBAL SYNC ON (`Value`),
INDEX AsyncIndex GLOBAL ASYNC ON (`Value`)
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json);
{
auto result = session.AlterTable("/Root/table/AsyncIndex", TAlterTableSettings()
.AppendAddChangefeeds(changefeed)
).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString());
}
{
auto result = session.AlterTable("/Root/table/SyncIndex", TAlterTableSettings()
.AppendAddChangefeeds(changefeed)
).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
}

Y_UNIT_TEST(CreatedAt) {
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tx/scheme_board/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,6 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
default:
return false;
}
case NKikimrSchemeOp::EPathTypeTableIndex:
return true;
default:
return false;
}
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/ydb_convert/table_description.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ bool BuildAlterTableAddIndexRequest(const Ydb::Table::AlterTableRequest* req, NK
return true;
}

bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles,
bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req,
NKikimrSchemeOp::TModifyScheme* modifyScheme, const TTableProfiles& profiles,
const TPathId& resolvedPathId,
Ydb::StatusIds::StatusCode& code, TString& error)
{
Expand All @@ -187,7 +188,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
const auto OpType = *ops.begin();

try {
pathPair = SplitPathIntoWorkingDirAndName(req->path());
pathPair = SplitPathIntoWorkingDirAndName(path);
} catch (const std::exception&) {
code = Ydb::StatusIds::BAD_REQUEST;
return false;
Expand Down Expand Up @@ -230,7 +231,7 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
for(const auto& rename: req->rename_indexes()) {
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex);
auto& alter = *modifyScheme->MutableMoveIndex();
alter.SetTablePath(req->path());
alter.SetTablePath(path);
alter.SetSrcPath(rename.source_name());
alter.SetDstPath(rename.destination_name());
alter.SetAllowOverwrite(rename.replace_destination());
Expand Down Expand Up @@ -366,6 +367,11 @@ bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKiki
return true;
}

bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme,
const TTableProfiles& profiles, const TPathId& resolvedPathId, Ydb::StatusIds::StatusCode& code, TString& error)
{
return BuildAlterTableModifyScheme(req->path(), req, modifyScheme, profiles, resolvedPathId, code, error);
}

template <typename TColumn>
static Ydb::Type* AddColumn(Ydb::Table::ColumnMeta* newColumn, const TColumn& column) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/ydb_convert/table_description.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ struct TPathId;


THashSet<EAlterOperationKind> GetAlterOperationKinds(const Ydb::Table::AlterTableRequest* req);
bool BuildAlterTableModifyScheme(const TString& path, const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme,
const TTableProfiles& profiles, const TPathId& resolvedPathId,
Ydb::StatusIds::StatusCode& status, TString& error);
bool BuildAlterTableModifyScheme(const Ydb::Table::AlterTableRequest* req, NKikimrSchemeOp::TModifyScheme* modifyScheme,
const TTableProfiles& profiles, const TPathId& resolvedPathId,
Ydb::StatusIds::StatusCode& status, TString& error);
Expand Down

0 comments on commit 171e293

Please sign in to comment.