Skip to content

Commit

Permalink
24-1: Cleanup persistent locks on table rename (#4979)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Jun 3, 2024
1 parent 4dd5b42 commit 62b7bff
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 8 deletions.
5 changes: 4 additions & 1 deletion ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "datashard_impl.h"
#include "datashard_txs.h"
#include "datashard_locks_db.h"
#include "probes.h"

#include <ydb/core/base/interconnect_channels.h>
Expand Down Expand Up @@ -1620,7 +1621,9 @@ TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxD
newTableInfo->StatsUpdateInProgress = false;
newTableInfo->StatsNeedUpdate = true;

RemoveUserTable(prevId);
TDataShardLocksDb locksDb(*this, txc);

RemoveUserTable(prevId, &locksDb);
AddUserTable(newId, newTableInfo);

for (auto& [_, record] : ChangesQueue) {
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1607,10 +1607,10 @@ class TDataShard
return nullptr;
}

void RemoveUserTable(const TPathId& tableId) {
TableInfos.erase(tableId.LocalPathId);
SysLocks.RemoveSchema(tableId);
void RemoveUserTable(const TPathId& tableId, ILocksDb* locksDb) {
SysLocks.RemoveSchema(tableId, locksDb);
Pipeline.GetDepTracker().RemoveSchema(tableId);
TableInfos.erase(tableId.LocalPathId);
}

void AddUserTable(const TPathId& tableId, TUserTable::TPtr tableInfo) {
Expand Down
13 changes: 12 additions & 1 deletion ydb/core/tx/datashard/datashard_locks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,23 @@ void TLockLocker::UpdateSchema(const TPathId& tableId, const TUserTable& tableIn
table->UpdateKeyColumnsTypes(tableInfo.KeyColumnTypes);
}

void TLockLocker::RemoveSchema(const TPathId& tableId) {
void TLockLocker::RemoveSchema(const TPathId& tableId, ILocksDb* db) {
// Make sure all persistent locks are removed from the database
for (auto& pr : Locks) {
if (pr.second->IsPersistent()) {
pr.second->PersistRemoveLock(db);
}
pr.second->OnRemoved();
}

Tables.erase(tableId);
Y_ABORT_UNLESS(Tables.empty());
Locks.clear();
ShardLocks.clear();
ExpireQueue.Clear();
BrokenLocks.Clear();
BrokenPersistentLocks.Clear();
BrokenLocksCount_ = 0;
CleanupPending.clear();
CleanupCandidates.clear();
PendingSubscribeLocks.clear();
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/datashard/datashard_locks.h
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ class TLockLocker {
}

void UpdateSchema(const TPathId& tableId, const TUserTable& tableInfo);
void RemoveSchema(const TPathId& tableId);
void RemoveSchema(const TPathId& tableId, ILocksDb* db);
bool ForceShardLock(const TPathId& tableId) const;
bool ForceShardLock(const TIntrusiveList<TTableLocks, TTableLocksReadListTag>& readTables) const;

Expand Down Expand Up @@ -840,8 +840,8 @@ class TSysLocks {
Locker.UpdateSchema(tableId, tableInfo);
}

void RemoveSchema(const TPathId& tableId) {
Locker.RemoveSchema(tableId);
void RemoveSchema(const TPathId& tableId, ILocksDb* db) {
Locker.RemoveSchema(tableId, db);
}

TVector<TLock> ApplyLocks();
Expand Down
77 changes: 77 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4896,6 +4896,83 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { int32_value: 2 } items { int32_value: 20 } }");
}

Y_UNIT_TEST(UncommittedChangesRenameTable) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetDomainPlanResolution(100)
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);

InitRoot(server, sender);

TDisableDataShardLogBatching disableDataShardLogBatching;

UNIT_ASSERT_VALUES_EQUAL(
KqpSchemeExec(runtime, R"(
CREATE TABLE `/Root/table1` (key int, value int, PRIMARY KEY (key));
)"),
"SUCCESS");

ExecSQL(server, sender, "UPSERT INTO `/Root/table1` (key, value) VALUES (2, 22);");

TString sessionId = CreateSessionRPC(runtime);
TString txId;
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleBegin(runtime, sessionId, txId, R"(
UPSERT INTO `/Root/table1` (key, value) VALUES (1, 11), (3, 33);
SELECT key, value FROM `/Root/table1` ORDER BY key;
)"),
"{ items { int32_value: 1 } items { int32_value: 11 } }, "
"{ items { int32_value: 2 } items { int32_value: 22 } }, "
"{ items { int32_value: 3 } items { int32_value: 33 } }");

auto shards = GetTableShards(server, sender, "/Root/table1");
auto tableId1 = ResolveTableId(server, sender, "/Root/table1");

// Check shard has open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId1.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(!ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}

WaitTxNotification(server, sender, AsyncMoveTable(server, "/Root/table1", "/Root/table1moved"));
auto tableId2 = ResolveTableId(server, sender, "/Root/table1moved");

runtime.SimulateSleep(TDuration::Seconds(1));

// Check shard doesn't have open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}

RebootTablet(runtime, shards.at(0), sender);

// The original table was removed
// We must not be able to commit the transaction
UNIT_ASSERT_VALUES_EQUAL(
KqpSimpleCommit(runtime, sessionId, txId, "SELECT 1"),
"ERROR: ABORTED");

runtime.SimulateSleep(TDuration::Seconds(1));

// Check shard doesn't have open transactions
{
runtime.SendToPipe(shards.at(0), sender, new TEvDataShard::TEvGetOpenTxs(tableId2.PathId));
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvGetOpenTxsResult>(sender);
UNIT_ASSERT_C(ev->Get()->OpenTxs.empty(), "at shard " << shards.at(0));
}
}

}

} // namespace NKikimr

0 comments on commit 62b7bff

Please sign in to comment.