Skip to content

Commit

Permalink
Merge 6bd76ff into e546733
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 9, 2024
2 parents e546733 + 6bd76ff commit 9fc96cc
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 10 deletions.
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

ui64 lockId = 0;
if (behaviour == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
const ui64 shift = (ui64)1 << 47;
lockId = shift + Counter.Inc();
lockId = BuildEphemeralTxId();
} else if (behaviour == EOperationBehaviour::InTxWrite) {
lockId = record.GetTxId();
} else {
lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId();
lockId = record.GetLockTxId();
}

OperationsManager->RegisterLock(lockId, Generation());
Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TColumnShard
void OnTieringModified(const std::optional<ui64> pathId = {});

public:
ui64 BuildEphemeralTxId() {
static TAtomicCounter Counter = 0;
static constexpr ui64 shift = (ui64)1 << 47;
return shift | Counter.Inc();
}

enum class EOverloadStatus {
ShardTxInFly /* "shard_tx" */,
ShardWritesInFly /* "shard_writes" */,
Expand Down Expand Up @@ -494,6 +500,9 @@ class TColumnShard
return ProgressTxController->GetTxCompleteLag(mediatorTime);
}

bool HasLongTxWrites(const TInsertWriteId insertWriteId) const {
return LongTxWrites.contains(insertWriteId);
}
TInsertWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId) const;
TInsertWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId, const std::optional<ui32> granuleShardingVersionId);
void AddLongTxWrite(const TInsertWriteId writeId, ui64 txId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ TConclusionStatus TReadMetadata::Init(
if (LockId) {
for (auto&& i : CommittedBlobs) {
if (auto writeId = i.GetWriteIdOptional()) {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
AddWriteIdToCheck(*writeId, op->GetLockId());
if (owner->HasLongTxWrites()) {
} else {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
AddWriteIdToCheck(*writeId, op->GetLockId());
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {

public:
TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId)
: TBase()
, DatabaseName(databaseName)
: DatabaseName(databaseName)
, Path(path)
, DedupId(dedupId)
, LongTxId(longTxId)
Expand All @@ -41,8 +40,8 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
}
}

~TLongTxWriteBase() {
MemoryInFlight.Sub(InFlightSize);
virtual ~TLongTxWriteBase() {
AFL_VERIFY(MemoryInFlight.Sub(InFlightSize) >= 0);
}

protected:
Expand All @@ -66,6 +65,7 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
}

auto accessor = ExtractDataAccessor();
AFL_VERIFY(!InFlightSize);
InFlightSize = accessor->GetSize();
const i64 sizeInFlight = MemoryInFlight.Add(InFlightSize);
if (TLimits::MemoryInFlightWriting < (ui64)sizeInFlight && sizeInFlight != InFlightSize) {
Expand Down

0 comments on commit 9fc96cc

Please sign in to comment.