Skip to content

Commit

Permalink
Allow streams on index table (ydb-platform#6827)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jul 26, 2024
1 parent 90805a4 commit bc978cc
Show file tree
Hide file tree
Showing 19 changed files with 512 additions and 411 deletions.
5 changes: 0 additions & 5 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -913,10 +913,6 @@ message TCreateCdcStream {
optional TCdcStreamDescription StreamDescription = 2;
optional uint64 RetentionPeriodSeconds = 3 [default = 86400]; // 1d by default
optional uint32 TopicPartitions = 4;
oneof IndexMode {
google.protobuf.Empty AllIndexes = 5; // Create topic per each index
string IndexName = 6;
}
}

message TAlterCdcStream {
Expand Down Expand Up @@ -1631,7 +1627,6 @@ message TIndexBuildControl {

message TLockConfig {
optional string Name = 1;
optional bool AllowIndexImplLock = 2;
}

message TLockGuard {
Expand Down
35 changes: 22 additions & 13 deletions ydb/core/tx/schemeshard/schemeshard__operation_alter_cdc_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,12 @@ class TAlterCdcStream: public TSubOperation {
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.IsCommonSensePath()
.NotUnderOperation();

if (checks && !tablePath.IsInsideTableIndexPath()) {
checks.IsCommonSensePath();
}

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
Expand Down Expand Up @@ -370,10 +373,13 @@ class TAlterCdcStreamAtTable: public TSubOperation {
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.IsCommonSensePath()
.NotUnderDeleting()
.NotUnderOperation();

if (checks && !tablePath.IsInsideTableIndexPath()) {
checks.IsCommonSensePath();
}

if (!checks) {
result->SetError(checks.GetStatus(), checks.GetError());
return result;
Expand Down Expand Up @@ -476,10 +482,10 @@ class TAlterCdcStreamAtTable: public TSubOperation {
} // anonymous

std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
const TOperationId& opId,
const TPath& workingDirPath,
const TString& tableName,
const TString& streamName)
const TOperationId& opId,
const TPath& workingDirPath,
const TString& tableName,
const TString& streamName)
{
const auto tablePath = workingDirPath.Child(tableName);
{
Expand All @@ -492,9 +498,12 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
.NotDeleted()
.IsTable()
.NotAsyncReplicaTable()
.IsCommonSensePath()
.NotUnderOperation();

if (checks && !tablePath.IsInsideTableIndexPath()) {
checks.IsCommonSensePath();
}

if (!checks) {
return CreateReject(opId, checks.GetStatus(), checks.GetError());
}
Expand All @@ -521,11 +530,11 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
}

void DoAlterStream(
const NKikimrSchemeOp::TAlterCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
const TPath& tablePath,
TVector<ISubOperation::TPtr>& result)
TVector<ISubOperation::TPtr>& result,
const NKikimrSchemeOp::TAlterCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
const TPath& tablePath)
{
{
auto outTx = TransactionTemplate(tablePath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl);
Expand Down Expand Up @@ -601,7 +610,7 @@ TVector<ISubOperation::TPtr> CreateAlterCdcStream(TOperationId opId, const TTxTr

TVector<ISubOperation::TPtr> result;

DoAlterStream(op, opId, workingDirPath, tablePath, result);
DoAlterStream(result, op, opId, workingDirPath, tablePath);

if (op.HasGetReady()) {
auto outTx = TransactionTemplate(workingDirPath.PathString(), NKikimrSchemeOp::EOperationType::ESchemeOpDropLock);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
#include "schemeshard__operation_common.h"
#include "schemeshard__operation_create_cdc_stream.h" // for TStreamPaths
#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"

Expand All @@ -17,10 +17,10 @@ std::variant<TStreamPaths, ISubOperation::TPtr> DoAlterStreamPathChecks(
const TString& streamName);

void DoAlterStream(
TVector<ISubOperation::TPtr>& result,
const NKikimrSchemeOp::TAlterCdcStream& op,
const TOperationId& opId,
const TPath& workingDirPath,
const TPath& tablePath,
TVector<ISubOperation::TPtr>& result);
const TPath& tablePath);

} // namespace NKikimr::NSchemesShard::NCdc
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#include "schemeshard__operation_part.h"
#include "schemeshard__operation_alter_cdc_stream.h"
#include "schemeshard__operation_common.h"
#include "schemeshard__operation_part.h"
#include "schemeshard_impl.h"

#include "schemeshard__operation_alter_cdc_stream.h"

#include <ydb/core/tx/schemeshard/backup/constants.h>

#include <ydb/core/engine/mkql_proto.h>
Expand Down Expand Up @@ -111,7 +110,7 @@ TVector<ISubOperation::TPtr> CreateAlterContinuousBackup(TOperationId opId, cons

TVector<ISubOperation::TPtr> result;

NCdc::DoAlterStream(alterCdcStreamOp, opId, workingDirPath, tablePath, result);
NCdc::DoAlterStream(result, alterCdcStreamOp, opId, workingDirPath, tablePath);

if (cbOp.GetActionCase() == NKikimrSchemeOp::TAlterContinuousBackup::kTakeIncrementalBackup) {
DoCreateIncBackupTable(opId, backupTablePath, schema, result);
Expand Down
Loading

0 comments on commit bc978cc

Please sign in to comment.