diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp index 3c04dcdff64a..f064ea4ad3a7 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.cpp @@ -48,4 +48,9 @@ bool TLongTxTransactionOperator::DoParse(TColumnShard& /*owner*/, const TString& return true; } +void TLongTxTransactionOperator::DoSendReply(TColumnShard& owner, const TActorContext& ctx) { + const auto& txInfo = GetTxInfo(); + ctx.Send(txInfo.Source, BuildProposeResultEvent(owner).release()); +} + } diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h index bc253acacabb..4933095c81de 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h @@ -24,6 +24,8 @@ namespace NKikimr::NColumnShard { virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { } + virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override; + virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override { } virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { diff --git a/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp b/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp index 1fd63abaf084..2a48ca49a279 100644 --- a/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp @@ -3,12 +3,25 @@ namespace NKikimr::NColumnShard { void IProposeTxOperator::DoSendReply(TColumnShard& owner, const TActorContext& ctx) { + if (owner.CurrentSchemeShardId) { + AFL_VERIFY(owner.CurrentSchemeShardId); + ctx.Send(MakePipePerNodeCacheID(false), + new TEvPipeCache::TEvForward(BuildProposeResultEvent(owner).release(), (ui64)owner.CurrentSchemeShardId, true)); + } else { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "scheme_shard_tablet_not_initialized")("source", GetTxInfo().Source); + ctx.Send(GetTxInfo().Source, BuildProposeResultEvent(owner).release()); + } +} + +std::unique_ptr IProposeTxOperator::BuildProposeResultEvent(const TColumnShard& owner) const { const auto& txInfo = GetTxInfo(); - std::unique_ptr evResult = std::make_unique( - owner.TabletID(), txInfo.TxKind, txInfo.TxId, GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage()); + std::unique_ptr evResult = + std::make_unique(owner.TabletID(), txInfo.TxKind, txInfo.TxId, + GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage()); if (IsFail()) { owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_ERROR); - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())("tx_id", txInfo.TxId); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())( + "tx_id", txInfo.TxId); } else { evResult->Record.SetMinStep(txInfo.MinStep); evResult->Record.SetMaxStep(txInfo.MaxStep); @@ -16,8 +29,10 @@ void IProposeTxOperator::DoSendReply(TColumnShard& owner, const TActorContext& c evResult->Record.MutableDomainCoordinators()->CopyFrom(owner.ProcessingParams->GetCoordinators()); } owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_SUCCESS); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())( + "tx_id", txInfo.TxId); } - ctx.Send(txInfo.Source, evResult.release()); + return evResult; } } diff --git a/ydb/core/tx/columnshard/transactions/operators/propose_tx.h b/ydb/core/tx/columnshard/transactions/operators/propose_tx.h index 84b2f7e8db66..d867e71bad9b 100644 --- a/ydb/core/tx/columnshard/transactions/operators/propose_tx.h +++ b/ydb/core/tx/columnshard/transactions/operators/propose_tx.h @@ -12,6 +12,7 @@ class IProposeTxOperator: public TTxController::ITransactionOperator { virtual bool DoCheckTxInfoForReply(const TFullTxInfo& originalTxInfo) const override { return GetTxInfo() == originalTxInfo; } + std::unique_ptr BuildProposeResultEvent(const TColumnShard& owner) const; virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override; virtual bool DoCheckAllowUpdate(const TFullTxInfo& currentTxInfo) const override { if (!currentTxInfo.SeqNo || !GetTxInfo().SeqNo) { diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.cpp b/ydb/core/tx/columnshard/transactions/operators/schema.cpp index d7c0107c2a9b..d4019542bf1e 100644 --- a/ydb/core/tx/columnshard/transactions/operators/schema.cpp +++ b/ydb/core/tx/columnshard/transactions/operators/schema.cpp @@ -40,7 +40,17 @@ class TWaitEraseTablesTxSubscriber: public NSubscriber::ISubscriber { } }; -NKikimr::NColumnShard::TTxController::TProposeResult TSchemaTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { +TTxController::TProposeResult TSchemaTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) { + auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo()); + auto lastSeqNo = owner.LastSchemaSeqNo; + + // Check if proposal is outdated + if (seqNo < lastSeqNo) { + auto errorMessage = TStringBuilder() << "Ignoring outdated schema tx proposal at tablet " << owner.TabletID() << " txId " << GetTxId() + << " ssId " << owner.CurrentSchemeShardId << " seqNo " << seqNo << " lastSeqNo " << lastSeqNo; + return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage); + } + switch (SchemaTxBody.TxBody_case()) { case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: { @@ -67,21 +77,6 @@ NKikimr::NColumnShard::TTxController::TProposeResult TSchemaTransactionOperator: break; } - auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo()); - auto lastSeqNo = owner.LastSchemaSeqNo; - - // Check if proposal is outdated - if (seqNo < lastSeqNo) { - auto errorMessage = TStringBuilder() - << "Ignoring outdated schema tx proposal at tablet " - << owner.TabletID() - << " txId " << GetTxId() - << " ssId " << owner.CurrentSchemeShardId - << " seqNo " << seqNo - << " lastSeqNo " << lastSeqNo; - return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage); - } - owner.UpdateSchemaSeqNo(seqNo, txc); return TProposeResult(); } diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp index 0aa92a857b09..2ef465fc7359 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp +++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp @@ -278,7 +278,10 @@ TDuration TTxController::GetTxCompleteLag(ui64 timecastStep) const { TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) { auto it = Operators.find(txId); if (it == Operators.end()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_plan_tx")("tx_id", txId); return EPlanResult::Skipped; + } else { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "plan_tx")("tx_id", txId)("plan_step", it->second->MutableTxInfo().PlanStep); } auto& txInfo = it->second->MutableTxInfo(); if (txInfo.PlanStep == 0) { @@ -304,8 +307,8 @@ void TTxController::OnTabletInit() { std::shared_ptr TTxController::StartProposeOnExecute( const TTxController::TTxInfo& txInfo, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc) { - NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")( - "tx_info", txInfo.DebugString())("tx_info", txInfo.DebugString()); + NActors::TLogContextGuard lGuard = + NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")("tx_info", txInfo.DebugString()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start"); std::shared_ptr txOperator( TTxController::ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo));