Skip to content

Commit

Permalink
Merge e13ab13 into 679ef9f
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Feb 12, 2024
2 parents 679ef9f + e13ab13 commit d7f0c59
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 29 deletions.
35 changes: 6 additions & 29 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,41 +226,18 @@ void TDataShard::OnStopGuardStarting(const TActorContext &ctx) {
// Handle immediate ops that have completed BuildAndWaitDependencies
for (const auto &kv : Pipeline.GetImmediateOps()) {
const auto &op = kv.second;
// Send reject result immediately, because we cannot control when
// a new datashard tablet may start and block us from commiting
// anything new. The usual progress queue is too slow for that.
if (!op->Result() && !op->HasResultSentFlag()) {
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(op->GetKind());
auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
TString rejectReason = TStringBuilder()
<< "Rejecting immediate tx "
<< op->GetTxId()
<< " because datashard "
<< TabletID()
<< " is restarting";
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
kind, TabletID(), op->GetTxId(), rejectStatus);
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);

ctx.Send(op->GetTarget(), result.Release(), 0, op->GetCookie());

IncCounter(COUNTER_PREPARE_OVERLOADED);
IncCounter(COUNTER_PREPARE_COMPLETE);
op->SetResultSentFlag();
if (op->OnStopping(*this, ctx)) {
Pipeline.AddCandidateOp(op);
PlanQueue.Progress(ctx);
}
// Add op to candidates because IsReadyToExecute just became true
Pipeline.AddCandidateOp(op);
PlanQueue.Progress(ctx);
}

// Handle prepared ops by notifying about imminent shutdown
for (const auto &kv : TransQueue.GetTxsInFly()) {
const auto &op = kv.second;
if (op->GetTarget() && !op->HasCompletedFlag()) {
auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(
TabletID(), op->GetTxId());
ctx.Send(op->GetTarget(), notify.Release(), 0, op->GetCookie());
if (op->OnStopping(*this, ctx)) {
Pipeline.AddCandidateOp(op);
PlanQueue.Progress(ctx);
}
}
}
Expand Down
41 changes: 41 additions & 0 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -937,4 +937,45 @@ void TActiveTransaction::UntrackMemory() const {
NActors::NMemory::TLabel<MemoryLabelActiveTransactionBody>::Sub(TxBody.size());
}

bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx) {
if (IsImmediate()) {
// Send reject result immediately, because we cannot control when
// a new datashard tablet may start and block us from commiting
// anything new. The usual progress queue is too slow for that.
if (!HasResultSentFlag() && !Result()) {
auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(GetKind());
auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
TString rejectReason = TStringBuilder()
<< "Rejecting immediate tx "
<< GetTxId()
<< " because datashard "
<< self.TabletID()
<< " is restarting";
auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
kind, self.TabletID(), GetTxId(), rejectStatus);
result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);

ctx.Send(GetTarget(), result.Release(), 0, GetCookie());

self.IncCounter(COUNTER_PREPARE_OVERLOADED);
self.IncCounter(COUNTER_PREPARE_COMPLETE);
SetResultSentFlag();
}

// Immediate ops become ready when stopping flag is set
return true;
} else {
// Distributed operations send notification when proposed
if (GetTarget() && !HasCompletedFlag()) {
auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(
self.TabletID(), GetTxId());
ctx.Send(GetTarget(), notify.Release(), 0, GetCookie());
}

// Distributed ops avoid doing new work when stopping
return false;
}
}

}}
2 changes: 2 additions & 0 deletions ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,8 @@ class TActiveTransaction : public TOperation {
return ++PageFaultCount;
}

bool OnStopping(TDataShard& self, const TActorContext& ctx) override;

private:
void TrackMemory() const;
void UntrackMemory() const;
Expand Down
66 changes: 66 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_order.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3386,6 +3386,72 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) {
}
}

Y_UNIT_TEST(TestShardRestartDuringWaitingRead) {
TPortManager pm;
NKikimrConfig::TAppConfig app;
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false)
.SetAppConfig(app)
// We read from an unresolved volatile tx
.SetEnableDataShardVolatileTransactions(true);

Tests::TServer::TPtr server = new TServer(serverSettings);
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

InitRoot(server, sender);

CreateShardedTable(server, sender, "/Root", "table-1", 1);
CreateShardedTable(server, sender, "/Root", "table-2", 1);
auto table1shards = GetTableShards(server, sender, "/Root/table-1");

ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10)"));
ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20)"));

// Block readset exchange
std::vector<std::unique_ptr<IEventHandle>> readSets;
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
readSets.emplace_back(ev.Release());
});

// Start a distributed write to both tables
TString sessionId = CreateSessionRPC(runtime, "/Root");
auto upsertResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30);
UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40);
)", sessionId, /* txId */ "", /* commitTx */ true),
"/Root");
WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets");

// Start reading the first table
TString readSessionId = CreateSessionRPC(runtime, "/Root");
auto readResult = SendRequest(
runtime,
MakeSimpleRequestRPC(R"(
SELECT key, value FROM `/Root/table-1`
ORDER BY key;
)", readSessionId, /* txId */ "", /* commitTx */ true),
"/Root");

// Sleep to ensure read is properly waiting
runtime.SimulateSleep(TDuration::Seconds(1));

// Gracefully restart the first table shard
blockReadSets.Remove();
GracefulRestartTablet(runtime, table1shards[0], sender);

// Read succeeds because it is automatically retried
// No assert should be triggered in debug builds
UNIT_ASSERT_VALUES_EQUAL(
FormatResult(AwaitResponse(runtime, std::move(readResult))),
"{ items { uint32_value: 1 } items { uint32_value: 10 } }, "
"{ items { uint32_value: 3 } items { uint32_value: 30 } }");
}

Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/datashard/operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,12 @@ void TOperation::SetFinishProposeTs() noexcept
SetFinishProposeTs(AppData()->MonotonicTimeProvider->Now());
}

bool TOperation::OnStopping(TDataShard&, const TActorContext&)
{
// By default operations don't do anything when stopping
// However they may become ready so add to candidates
return true;
}

} // namespace NDataShard
} // namespace NKikimr
12 changes: 12 additions & 0 deletions ydb/core/tx/datashard/operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ class TOperation
return OperationSpan.GetTraceId();
}

/**
* Called when datashard is going to stop soon
*
* Operation may override this method to support sending notifications or
* results signalling that the operation will never complete. When result
* is sent operation is supposed to set its ResultSentFlag.
*
* When this method returns true the operation will be added to the
* pipeline as a candidate for execution.
*/
virtual bool OnStopping(TDataShard& self, const TActorContext& ctx);

protected:
TOperation()
: TOperation(TBasicOpInfo())
Expand Down

0 comments on commit d7f0c59

Please sign in to comment.