Skip to content

Commit

Permalink
Merge 7aade9f into 4002b4c
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jun 19, 2024
2 parents 4002b4c + 7aade9f commit b61bbb3
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 7 deletions.
9 changes: 8 additions & 1 deletion ydb/core/tx/datashard/datashard_repl_apply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,20 @@ class TDataShard::TTxApplyReplicationChanges : public TTransactionBase<TDataShar
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
Y_UNUSED(ctx);

if (Self->State != TShardState::Ready && !Self->IsReplicated()) {
if (Self->State != TShardState::Ready) {
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_WRONG_STATE);
return true;
}

if (!Self->IsReplicated()) {
Result = MakeHolder<TEvDataShard::TEvApplyReplicationChangesResult>(
NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED,
NKikimrTxDataShard::TEvApplyReplicationChangesResult::REASON_BAD_REQUEST);
return true;
}

const auto& msg = Ev->Get()->Record;

const auto& tableId = msg.GetTableId();
Expand Down
23 changes: 23 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,29 @@ Y_UNIT_TEST_SUITE(DataShardReplication) {
);
}

Y_UNIT_TEST(ApplyChangesToCommonTable) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);
CreateShardedTable(server, sender, "/Root", "table-1", TShardedTableOptions());

auto shards = GetTableShards(server, sender, "/Root/table-1");
auto tableId = ResolveTableId(server, sender, "/Root/table-1");

ApplyChanges(server, shards.at(0), tableId, "my-source", {
TChange{ .Offset = 0, .WriteTxId = 0, .Key = 1, .Value = 11 },
}, NKikimrTxDataShard::TEvApplyReplicationChangesResult::STATUS_REJECTED);
}

}

} // namespace NKikimr
3 changes: 2 additions & 1 deletion ydb/core/tx/replication/controller/dst_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class TDstCreator: public TActorBootstrapped<TDstCreator> {
if (bootstrap) {
GetTableProfiles();
} else {
Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(SrcPath, {}));
Send(YdbProxy, new TEvYdbProxy::TEvDescribeTableRequest(SrcPath, NYdb::NTable::TDescribeTableSettings()
.WithKeyShardBoundary(true)));
}
break;
}
Expand Down
33 changes: 33 additions & 0 deletions ydb/core/tx/replication/controller/dst_creator_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Y_UNIT_TEST_SUITE(DstCreator) {
},
.ReplicationConfig = Nothing(),
}));

env.GetRuntime().Register(CreateDstCreator(
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replicated"
Expand All @@ -93,6 +94,38 @@ Y_UNIT_TEST_SUITE(DstCreator) {
UNIT_ASSERT_VALUES_EQUAL(replicatedSelf.GetOwner(), "user@builtin");
}

Y_UNIT_TEST(SamePartitionCount) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);

env.CreateTable("/Root", *MakeTableDescription({
.Name = "Table",
.KeyColumns = {"key"},
.Columns = {
{.Name = "key", .Type = "Uint32"},
{.Name = "value", .Type = "Utf8"},
},
.ReplicationConfig = Nothing(),
.UniformPartitions = 2,
}));

env.GetRuntime().Register(CreateDstCreator(
env.GetSender(), env.GetSchemeshardId("/Root/Table"), env.GetYdbProxy(), env.GetPathId("/Root"),
1 /* rid */, 1 /* tid */, TReplication::ETargetKind::Table, "/Root/Table", "/Root/Replicated"
));

auto ev = env.GetRuntime().GrabEdgeEvent<TEvPrivate::TEvCreateDstResult>(env.GetSender());
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Status, NKikimrScheme::StatusSuccess);

auto originalDesc = env.GetDescription("/Root/Table");
const auto& originalTable = originalDesc.GetPathDescription();
UNIT_ASSERT_VALUES_EQUAL(originalTable.TablePartitionsSize(), 2);

auto replicatedDesc = env.GetDescription("/Root/Replicated");
const auto& replicatedTable = replicatedDesc.GetPathDescription();
UNIT_ASSERT_VALUES_EQUAL(originalTable.TablePartitionsSize(), replicatedTable.TablePartitionsSize());
}

Y_UNIT_TEST(NonExistentSrc) {
TEnv env;
env.GetRuntime().SetLogPriority(NKikimrServices::REPLICATION_CONTROLLER, NLog::PRI_TRACE);
Expand Down
19 changes: 15 additions & 4 deletions ydb/core/tx/replication/service/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
event->Record.SetSource(source);
}

Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, false));
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), TabletId, true, ++SubscribeCookie));
Become(&TThis::StateWaitingStatus);
}

Expand All @@ -117,7 +117,11 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
<< ": status# " << static_cast<ui32>(record.GetStatus())
<< ", reason# " << static_cast<ui32>(record.GetReason())
<< ", error# " << record.GetErrorDescription());
return Leave(IsHardError(record.GetReason()));
if (IsHardError(record.GetReason())) {
return Leave(true);
} else {
return DelayedLeave();
}
}
}

Expand All @@ -133,11 +137,16 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
}

void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
if (TabletId == ev->Get()->TabletId) {
Leave();
if (TabletId == ev->Get()->TabletId && ev->Cookie == SubscribeCookie) {
DelayedLeave();
}
}

void DelayedLeave() {
static constexpr TDuration delay = TDuration::MilliSeconds(50);
this->Schedule(delay, new TEvents::TEvWakeup());
}

void Leave(bool hardError = false) {
LOG_I("Leave"
<< ": hard error# " << hardError);
Expand Down Expand Up @@ -177,6 +186,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
sFunc(TEvents::TEvWakeup, Leave);
sFunc(TEvents::TEvPoison, PassAway);
}
}
Expand All @@ -188,6 +198,7 @@ class TTablePartitionWriter: public TActorBootstrapped<TTablePartitionWriter> {
mutable TMaybe<TString> LogPrefix;

TActorId LeaderPipeCache;
ui64 SubscribeCookie = 0;
TMemoryPool MemoryPool;

}; // TTablePartitionWriter
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/replication/ut_helpers/test_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ void TTestTableDescription::SerializeTo(NKikimrSchemeOp::TTableDescription& prot
if (ReplicationConfig) {
ReplicationConfig->SerializeTo(*proto.MutableReplicationConfig());
}

if (UniformPartitions) {
proto.SetUniformPartitionsCount(*UniformPartitions);
}
}

THolder<NKikimrSchemeOp::TTableDescription> MakeTableDescription(const TTestTableDescription& desc) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/ut_helpers/test_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct TTestTableDescription {
TVector<TString> KeyColumns;
TVector<TColumn> Columns;
TMaybe<TReplicationConfig> ReplicationConfig = TReplicationConfig::Default();
TMaybe<ui32> UniformPartitions = Nothing();

void SerializeTo(NKikimrSchemeOp::TTableDescription& proto) const;
};
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/sql/v1/sql_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2395,7 +2395,7 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {

Y_UNIT_TEST(AlterTableAddIndexWithIsNotSupported) {
ExpectFailWithError("USE plato; ALTER TABLE table ADD INDEX idx LOCAL WITH (a=b, c=d, e=f) ON (col)",
"<main>:1:40: Error: local: alternative is not implemented yet: 714:7: local_index\n");
"<main>:1:40: Error: local: alternative is not implemented yet: 715:7: local_index\n");
}

Y_UNIT_TEST(OptionalAliases) {
Expand Down

0 comments on commit b61bbb3

Please sign in to comment.