diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index f71f7231e516..6539b5a4243e 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -226,41 +226,18 @@ void TDataShard::OnStopGuardStarting(const TActorContext &ctx) { // Handle immediate ops that have completed BuildAndWaitDependencies for (const auto &kv : Pipeline.GetImmediateOps()) { const auto &op = kv.second; - // Send reject result immediately, because we cannot control when - // a new datashard tablet may start and block us from commiting - // anything new. The usual progress queue is too slow for that. - if (!op->Result() && !op->HasResultSentFlag()) { - auto kind = static_cast(op->GetKind()); - auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED; - TString rejectReason = TStringBuilder() - << "Rejecting immediate tx " - << op->GetTxId() - << " because datashard " - << TabletID() - << " is restarting"; - auto result = MakeHolder( - kind, TabletID(), op->GetTxId(), rejectStatus); - result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason); - LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason); - - ctx.Send(op->GetTarget(), result.Release(), 0, op->GetCookie()); - - IncCounter(COUNTER_PREPARE_OVERLOADED); - IncCounter(COUNTER_PREPARE_COMPLETE); - op->SetResultSentFlag(); + if (op->OnStopping(*this, ctx)) { + Pipeline.AddCandidateOp(op); + PlanQueue.Progress(ctx); } - // Add op to candidates because IsReadyToExecute just became true - Pipeline.AddCandidateOp(op); - PlanQueue.Progress(ctx); } // Handle prepared ops by notifying about imminent shutdown for (const auto &kv : TransQueue.GetTxsInFly()) { const auto &op = kv.second; - if (op->GetTarget() && !op->HasCompletedFlag()) { - auto notify = MakeHolder( - TabletID(), op->GetTxId()); - ctx.Send(op->GetTarget(), notify.Release(), 0, op->GetCookie()); + if (op->OnStopping(*this, ctx)) { + Pipeline.AddCandidateOp(op); + PlanQueue.Progress(ctx); } } } diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index b54f21d39a82..a4a2c4ea5b5a 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -937,4 +937,45 @@ void TActiveTransaction::UntrackMemory() const { NActors::NMemory::TLabel::Sub(TxBody.size()); } +bool TActiveTransaction::OnStopping(TDataShard& self, const TActorContext& ctx) { + if (IsImmediate()) { + // Send reject result immediately, because we cannot control when + // a new datashard tablet may start and block us from commiting + // anything new. The usual progress queue is too slow for that. + if (!HasResultSentFlag() && !Result()) { + auto kind = static_cast(GetKind()); + auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED; + TString rejectReason = TStringBuilder() + << "Rejecting immediate tx " + << GetTxId() + << " because datashard " + << self.TabletID() + << " is restarting"; + auto result = MakeHolder( + kind, self.TabletID(), GetTxId(), rejectStatus); + result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason); + LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason); + + ctx.Send(GetTarget(), result.Release(), 0, GetCookie()); + + self.IncCounter(COUNTER_PREPARE_OVERLOADED); + self.IncCounter(COUNTER_PREPARE_COMPLETE); + SetResultSentFlag(); + } + + // Immediate ops become ready when stopping flag is set + return true; + } else { + // Distributed operations send notification when proposed + if (GetTarget() && !HasCompletedFlag()) { + auto notify = MakeHolder( + self.TabletID(), GetTxId()); + ctx.Send(GetTarget(), notify.Release(), 0, GetCookie()); + } + + // Distributed ops avoid doing new work when stopping + return false; + } +} + }} diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index cca5b12b876f..247b5335c0c9 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -603,6 +603,8 @@ class TActiveTransaction : public TOperation { return ++PageFaultCount; } + bool OnStopping(TDataShard& self, const TActorContext& ctx) override; + private: void TrackMemory() const; void UntrackMemory() const; diff --git a/ydb/core/tx/datashard/datashard_ut_order.cpp b/ydb/core/tx/datashard/datashard_ut_order.cpp index 194f826743e6..406020263c70 100644 --- a/ydb/core/tx/datashard/datashard_ut_order.cpp +++ b/ydb/core/tx/datashard/datashard_ut_order.cpp @@ -3386,6 +3386,72 @@ Y_UNIT_TEST_TWIN(TestShardRestartPlannedCommitShouldSucceed, StreamLookup) { } } +Y_UNIT_TEST(TestShardRestartDuringWaitingRead) { + TPortManager pm; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true); + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetAppConfig(app) + // We read from an unresolved volatile tx + .SetEnableDataShardVolatileTransactions(true); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto &runtime = *server->GetRuntime(); + auto sender = runtime.AllocateEdgeActor(); + + InitRoot(server, sender); + + CreateShardedTable(server, sender, "/Root", "table-1", 1); + CreateShardedTable(server, sender, "/Root", "table-2", 1); + auto table1shards = GetTableShards(server, sender, "/Root/table-1"); + + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 10)")); + ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-2` (key, value) VALUES (2, 20)")); + + // Block readset exchange + std::vector> readSets; + auto blockReadSets = runtime.AddObserver([&](TEvTxProcessing::TEvReadSet::TPtr& ev) { + readSets.emplace_back(ev.Release()); + }); + + // Start a distributed write to both tables + TString sessionId = CreateSessionRPC(runtime, "/Root"); + auto upsertResult = SendRequest( + runtime, + MakeSimpleRequestRPC(R"( + UPSERT INTO `/Root/table-1` (key, value) VALUES (3, 30); + UPSERT INTO `/Root/table-2` (key, value) VALUES (4, 40); + )", sessionId, /* txId */ "", /* commitTx */ true), + "/Root"); + WaitFor(runtime, [&]{ return readSets.size() >= 4; }, "readsets"); + + // Start reading the first table + TString readSessionId = CreateSessionRPC(runtime, "/Root"); + auto readResult = SendRequest( + runtime, + MakeSimpleRequestRPC(R"( + SELECT key, value FROM `/Root/table-1` + ORDER BY key; + )", readSessionId, /* txId */ "", /* commitTx */ true), + "/Root"); + + // Sleep to ensure read is properly waiting + runtime.SimulateSleep(TDuration::Seconds(1)); + + // Gracefully restart the first table shard + blockReadSets.Remove(); + GracefulRestartTablet(runtime, table1shards[0], sender); + + // Read succeeds because it is automatically retried + // No assert should be triggered in debug builds + UNIT_ASSERT_VALUES_EQUAL( + FormatResult(AwaitResponse(runtime, std::move(readResult))), + "{ items { uint32_value: 1 } items { uint32_value: 10 } }, " + "{ items { uint32_value: 3 } items { uint32_value: 30 } }"); +} + Y_UNIT_TEST(TestShardSnapshotReadNoEarlyReply) { TPortManager pm; TServerSettings serverSettings(pm.GetPort(2134)); diff --git a/ydb/core/tx/datashard/operation.cpp b/ydb/core/tx/datashard/operation.cpp index c217cbf5c6ca..f1779c24bf91 100644 --- a/ydb/core/tx/datashard/operation.cpp +++ b/ydb/core/tx/datashard/operation.cpp @@ -289,5 +289,12 @@ void TOperation::SetFinishProposeTs() noexcept SetFinishProposeTs(AppData()->MonotonicTimeProvider->Now()); } +bool TOperation::OnStopping(TDataShard&, const TActorContext&) +{ + // By default operations don't do anything when stopping + // However they may become ready so add to candidates + return true; +} + } // namespace NDataShard } // namespace NKikimr diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 7c7e5bf13769..201e22636a74 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -812,6 +812,18 @@ class TOperation return OperationSpan.GetTraceId(); } + /** + * Called when datashard is going to stop soon + * + * Operation may override this method to support sending notifications or + * results signalling that the operation will never complete. When result + * is sent operation is supposed to set its ResultSentFlag. + * + * When this method returns true the operation will be added to the + * pipeline as a candidate for execution. + */ + virtual bool OnStopping(TDataShard& self, const TActorContext& ctx); + protected: TOperation() : TOperation(TBasicOpInfo())