Skip to content

Commit

Permalink
use ss tablet id for schemeshard and sender for tx_proxy (#8684)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 4, 2024
1 parent 65e10f5 commit 53df48f
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ bool TLongTxTransactionOperator::DoParse(TColumnShard& /*owner*/, const TString&
return true;
}

void TLongTxTransactionOperator::DoSendReply(TColumnShard& owner, const TActorContext& ctx) {
const auto& txInfo = GetTxInfo();
ctx.Send(txInfo.Source, BuildProposeResultEvent(owner).release());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace NKikimr::NColumnShard {
virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {

}
virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override;

virtual void DoFinishProposeOnExecute(TColumnShard& /*owner*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) override {
}
virtual void DoFinishProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override {
Expand Down
23 changes: 19 additions & 4 deletions ydb/core/tx/columnshard/transactions/operators/propose_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,36 @@
namespace NKikimr::NColumnShard {

void IProposeTxOperator::DoSendReply(TColumnShard& owner, const TActorContext& ctx) {
if (owner.CurrentSchemeShardId) {
AFL_VERIFY(owner.CurrentSchemeShardId);
ctx.Send(MakePipePerNodeCacheID(false),
new TEvPipeCache::TEvForward(BuildProposeResultEvent(owner).release(), (ui64)owner.CurrentSchemeShardId, true));
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "scheme_shard_tablet_not_initialized")("source", GetTxInfo().Source);
ctx.Send(GetTxInfo().Source, BuildProposeResultEvent(owner).release());
}
}

std::unique_ptr<NKikimr::TEvColumnShard::TEvProposeTransactionResult> IProposeTxOperator::BuildProposeResultEvent(const TColumnShard& owner) const {
const auto& txInfo = GetTxInfo();
std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> evResult = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(
owner.TabletID(), txInfo.TxKind, txInfo.TxId, GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage());
std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> evResult =
std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(owner.TabletID(), txInfo.TxKind, txInfo.TxId,
GetProposeStartInfoVerified().GetStatus(), GetProposeStartInfoVerified().GetStatusMessage());
if (IsFail()) {
owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_ERROR);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())("tx_id", txInfo.TxId);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())(
"tx_id", txInfo.TxId);
} else {
evResult->Record.SetMinStep(txInfo.MinStep);
evResult->Record.SetMaxStep(txInfo.MaxStep);
if (owner.ProcessingParams) {
evResult->Record.MutableDomainCoordinators()->CopyFrom(owner.ProcessingParams->GetCoordinators());
}
owner.Counters.GetTabletCounters()->IncCounter(COUNTER_PREPARE_SUCCESS);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("message", GetProposeStartInfoVerified().GetStatusMessage())("tablet_id", owner.TabletID())(
"tx_id", txInfo.TxId);
}
ctx.Send(txInfo.Source, evResult.release());
return evResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class IProposeTxOperator: public TTxController::ITransactionOperator {
virtual bool DoCheckTxInfoForReply(const TFullTxInfo& originalTxInfo) const override {
return GetTxInfo() == originalTxInfo;
}
std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> BuildProposeResultEvent(const TColumnShard& owner) const;
virtual void DoSendReply(TColumnShard& owner, const TActorContext& ctx) override;
virtual bool DoCheckAllowUpdate(const TFullTxInfo& currentTxInfo) const override {
if (!currentTxInfo.SeqNo || !GetTxInfo().SeqNo) {
Expand Down
27 changes: 11 additions & 16 deletions ydb/core/tx/columnshard/transactions/operators/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,17 @@ class TWaitEraseTablesTxSubscriber: public NSubscriber::ISubscriber {
}
};

NKikimr::NColumnShard::TTxController::TProposeResult TSchemaTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
TTxController::TProposeResult TSchemaTransactionOperator::DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) {
auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo());
auto lastSeqNo = owner.LastSchemaSeqNo;

// Check if proposal is outdated
if (seqNo < lastSeqNo) {
auto errorMessage = TStringBuilder() << "Ignoring outdated schema tx proposal at tablet " << owner.TabletID() << " txId " << GetTxId()
<< " ssId " << owner.CurrentSchemeShardId << " seqNo " << seqNo << " lastSeqNo " << lastSeqNo;
return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage);
}

switch (SchemaTxBody.TxBody_case()) {
case NKikimrTxColumnShard::TSchemaTxBody::kInitShard:
{
Expand All @@ -67,21 +77,6 @@ NKikimr::NColumnShard::TTxController::TProposeResult TSchemaTransactionOperator:
break;
}

auto seqNo = SeqNoFromProto(SchemaTxBody.GetSeqNo());
auto lastSeqNo = owner.LastSchemaSeqNo;

// Check if proposal is outdated
if (seqNo < lastSeqNo) {
auto errorMessage = TStringBuilder()
<< "Ignoring outdated schema tx proposal at tablet "
<< owner.TabletID()
<< " txId " << GetTxId()
<< " ssId " << owner.CurrentSchemeShardId
<< " seqNo " << seqNo
<< " lastSeqNo " << lastSeqNo;
return TProposeResult(NKikimrTxColumnShard::EResultStatus::SCHEMA_CHANGED, errorMessage);
}

owner.UpdateSchemaSeqNo(seqNo, txc);
return TProposeResult();
}
Expand Down
7 changes: 5 additions & 2 deletions ydb/core/tx/columnshard/transactions/tx_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ TDuration TTxController::GetTxCompleteLag(ui64 timecastStep) const {
TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
auto it = Operators.find(txId);
if (it == Operators.end()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_plan_tx")("tx_id", txId);
return EPlanResult::Skipped;
} else {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "plan_tx")("tx_id", txId)("plan_step", it->second->MutableTxInfo().PlanStep);
}
auto& txInfo = it->second->MutableTxInfo();
if (txInfo.PlanStep == 0) {
Expand All @@ -304,8 +307,8 @@ void TTxController::OnTabletInit() {

std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartProposeOnExecute(
const TTxController::TTxInfo& txInfo, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")(
"tx_info", txInfo.DebugString())("tx_info", txInfo.DebugString());
NActors::TLogContextGuard lGuard =
NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")("tx_info", txInfo.DebugString());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
std::shared_ptr<TTxController::ITransactionOperator> txOperator(
TTxController::ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo));
Expand Down

0 comments on commit 53df48f

Please sign in to comment.