Skip to content

Commit

Permalink
24-1: Fix read iterator local snapshot consistency. Fixes #2885. (#3074)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Mar 22, 2024
1 parent fe6d594 commit 112be1a
Show file tree
Hide file tree
Showing 9 changed files with 575 additions and 112 deletions.
6 changes: 6 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2107,6 +2107,12 @@ TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdge
<< " promoting UnprotectedReadEdge to " << version);
SnapshotManager.PromoteUnprotectedReadEdge(version);

// Make sure pending distributed transactions are marked incomplete,
// since we just protected up to and including version from writes,
// we need to make sure new immediate conflicting writes are blocked
// and don't perform writes with out-of-order versions.
res.HadWrites |= Pipeline.MarkPlannedLogicallyIncompleteUpTo(version, txc);

// We want to promote the complete edge when protected reads are
// used or when we're already writing something anyway.
if (res.HadWrites) {
Expand Down
222 changes: 116 additions & 106 deletions ydb/core/tx/datashard/datashard__read_iterator.cpp

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions ydb/core/tx/datashard/datashard_dep_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ namespace {
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() < b.TxId);
}

bool IsLessEqual(const TOperation& a, const TRowVersion& b) {
return a.GetStep() < b.Step || (a.GetStep() == b.Step && a.GetTxId() <= b.TxId);
bool IsEqual(const TOperation& a, const TRowVersion& b) {
return a.GetStep() == b.Step && a.GetTxId() == b.TxId;
}
}

Expand Down Expand Up @@ -799,8 +799,10 @@ void TDependencyTracker::TMvccDependencyTrackingLogic::AddOperation(const TOpera
Y_ABORT_UNLESS(!conflict.IsImmediate());
if (snapshot.IsMax()) {
conflict.AddImmediateConflict(op);
} else if (snapshotRepeatable ? IsLessEqual(conflict, snapshot) : IsLess(conflict, snapshot)) {
} else if (IsLess(conflict, snapshot)) {
op->AddDependency(&conflict);
} else if (IsEqual(conflict, snapshot)) {
op->AddRepeatableReadConflict(&conflict);
}
};

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ TPipeline::~TPipeline()
pr.second->ClearSpecialDependencies();
pr.second->ClearPlannedConflicts();
pr.second->ClearImmediateConflicts();
pr.second->ClearRepeatableReadConflicts();
}
}

Expand Down Expand Up @@ -487,6 +488,7 @@ void TPipeline::UnblockNormalDependencies(const TOperation::TPtr &op)
op->ClearDependencies();
op->ClearPlannedConflicts();
op->ClearImmediateConflicts();
op->ClearRepeatableReadConflicts();
DepTracker.RemoveOperation(op);
}

Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/datashard/datashard_ut_common_kqp.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,13 @@ namespace NKqpHelpers {
return FormatResult(result);
}

inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
inline auto KqpSimpleSendCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
Y_ABORT_UNLESS(!txId.empty(), "commit on empty transaction");
auto response = AwaitResponse(runtime, SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */)));
return SendRequest(runtime, MakeSimpleRequestRPC(query, sessionId, txId, true /* commitTx */));
}

inline TString KqpSimpleCommit(TTestActorRuntime& runtime, const TString& sessionId, const TString& txId, const TString& query) {
auto response = AwaitResponse(runtime, KqpSimpleSendCommit(runtime, sessionId, txId, query));
if (response.operation().status() != Ydb::StatusIds::SUCCESS) {
return TStringBuilder() << "ERROR: " << response.operation().status();
}
Expand Down
Loading

0 comments on commit 112be1a

Please sign in to comment.