From 6bd76ff7f3ba5c89a8f0e23c1159e994dfcd69b7 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 9 Sep 2024 10:40:41 +0300 Subject: [PATCH] skip long tx writing to in control locks --- ydb/core/tx/columnshard/columnshard__write.cpp | 8 ++++---- ydb/core/tx/columnshard/columnshard_impl.h | 9 +++++++++ .../reader/plain_reader/constructor/read_metadata.cpp | 7 +++++-- ydb/core/tx/tx_proxy/rpc_long_tx.cpp | 8 ++++---- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 16dc43439613..3990e5941797 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -554,11 +554,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor ui64 lockId = 0; if (behaviour == EOperationBehaviour::NoTxWrite) { - static TAtomicCounter Counter = 0; - const ui64 shift = (ui64)1 << 47; - lockId = shift + Counter.Inc(); + lockId = BuildEphemeralTxId(); + } else if (behaviour == EOperationBehaviour::InTxWrite) { + lockId = record.GetTxId(); } else { - lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId(); + lockId = record.GetLockTxId(); } OperationsManager->RegisterLock(lockId, Generation()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 950c6c8655c7..003747753676 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -286,6 +286,12 @@ class TColumnShard void OnTieringModified(const std::optional pathId = {}); public: + ui64 BuildEphemeralTxId() { + static TAtomicCounter Counter = 0; + static constexpr ui64 shift = (ui64)1 << 47; + return shift | Counter.Inc(); + } + enum class EOverloadStatus { ShardTxInFly /* "shard_tx" */, ShardWritesInFly /* "shard_writes" */, @@ -494,6 +500,9 @@ class TColumnShard return ProgressTxController->GetTxCompleteLag(mediatorTime); } + bool HasLongTxWrites(const TInsertWriteId insertWriteId) const { + return LongTxWrites.contains(insertWriteId); + } TInsertWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const; TInsertWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId, const std::optional granuleShardingVersionId); void AddLongTxWrite(const TInsertWriteId writeId, ui64 txId); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp index 6848167e54db..eddfccd95ae0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp @@ -35,8 +35,11 @@ TConclusionStatus TReadMetadata::Init( if (LockId) { for (auto&& i : CommittedBlobs) { if (auto writeId = i.GetWriteIdOptional()) { - auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId); - AddWriteIdToCheck(*writeId, op->GetLockId()); + if (owner->HasLongTxWrites()) { + } else { + auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId); + AddWriteIdToCheck(*writeId, op->GetLockId()); + } } } } diff --git a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp index 449df1b6102e..557cf13c14cb 100644 --- a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp +++ b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp @@ -30,8 +30,7 @@ class TLongTxWriteBase: public TActorBootstrapped { public: TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId) - : TBase() - , DatabaseName(databaseName) + : DatabaseName(databaseName) , Path(path) , DedupId(dedupId) , LongTxId(longTxId) @@ -41,8 +40,8 @@ class TLongTxWriteBase: public TActorBootstrapped { } } - ~TLongTxWriteBase() { - MemoryInFlight.Sub(InFlightSize); + virtual ~TLongTxWriteBase() { + AFL_VERIFY(MemoryInFlight.Sub(InFlightSize) >= 0); } protected: @@ -66,6 +65,7 @@ class TLongTxWriteBase: public TActorBootstrapped { } auto accessor = ExtractDataAccessor(); + AFL_VERIFY(!InFlightSize); InFlightSize = accessor->GetSize(); const i64 sizeInFlight = MemoryInFlight.Add(InFlightSize); if (TLimits::MemoryInFlightWriting < (ui64)sizeInFlight && sizeInFlight != InFlightSize) {