Skip to content

Commit

Permalink
schemeshard: reject operations with too big local tx commit
Browse files Browse the repository at this point in the history
Add commit redo size check for successfully ignited operations
as a precaution measure to avoid infinite loop of schemeshard
hitting local tx commit redo size limit, restarting, attempting
to propose persisted operation again, hitting commit redo size
limit again, restarting and so on.

This could happen with inherently massive operations such as
copy-tables used as a starting step of database export/backup.

Coping large number of tables with huge number of partitions can
result in so large TTxOperationPropose local transaction size that
it would hit the limit imposed by the tablet executor level.
Tablet violating that limit is considered broken and will be
immediately stopped.
See ydb/core/tablet_flat/flat_executor.cpp,
NTabletFlatExecutor::TExecutor::ExecuteTransaction().

KIKIMR-21751
  • Loading branch information
ijon committed Jul 16, 2024
1 parent 02fde22 commit 49663a2
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 16 deletions.
125 changes: 110 additions & 15 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,12 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
}

TOperation::TPtr operation = new TOperation(txId);
Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : record.GetTransaction()) {
auto quotaResult = operation->ConsumeQuota(transaction, context);
if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId)));
response->SetError(quotaResult.Status, quotaResult.Reason);
Operations.erase(txId);
return std::move(response);
}
}
Expand All @@ -131,7 +129,6 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId)));
response->SetError(splitResult.Status, splitResult.Reason);
Operations.erase(txId);
return std::move(response);
}

Expand All @@ -140,11 +137,15 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request

const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;

bool prevProposeUndoSafe = true;

Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : transactions) {
auto parts = operation->ConstructParts(transaction, context);

if (parts.size() > 1) {
// les't allow altering impl index tables as part of consistent operation
// allow altering impl index tables as part of consistent operation
context.IsAllowedPrivateTables = true;
}

Expand Down Expand Up @@ -198,25 +199,77 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
<< ", with reason: " << response->Record.GetReason()
<< ", tx message: " << SecureDebugString(record));

context.OnComplete = {}; // recreate
context.DbChanges = {};
AbortOperationPropose(txId, context);

for (auto& toAbort : operation->Parts) {
toAbort->AbortPropose(context);
}
return std::move(response);
}

context.MemChanges.UnDo(context.SS);
context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx);
Operations.erase(txId);
// Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) {
prevProposeUndoSafe = false;

return std::move(response);
LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Operation part proposed ok, but propose itself is undo unsafe"
<< ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType())
<< ", opId: " << part->GetOperationId()
<< ", at schemeshard: " << selfId
);
}
}
}

return std::move(response);
}

void TSchemeShard::AbortOperationPropose(const TTxId txId, TOperationContext& context) {
Y_ABORT_UNLESS(Operations.contains(txId));
TOperation::TPtr operation = Operations.at(txId);

// Drop operation side effects, undo memory changes
// (Local db changes were already applied)
context.OnComplete = {};
context.DbChanges = {};

for (auto& i : operation->Parts) {
i->AbortPropose(context);
}

context.MemChanges.UnDo(context.SS);

// And remove aborted operation from existence
Operations.erase(txId);
}

void AbortOperation(TOperationContext& context, const TTxId txId, const TString& reason) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", txId: " << txId
<< ", operation is rejected and all changes reverted"
<< ", " << reason
<< ", at schemeshard: " << context.SS->SelfTabletId()
);

context.GetTxc().DB.RollbackChanges();
context.SS->AbortOperationPropose(txId, context);
}

bool IsCommitRedoSizeOverLimit(TString* reason, TOperationContext& context) {
// MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor.
// We subtract from MaxCommitRedoMB additional 1MB for anything extra
// that executor/tablet may (or may not) add under the hood
const ui64 limitBytes = (context.SS->MaxCommitRedoMB - 1) << 20; // MB to bytes
const ui64 commitRedoBytes = context.GetTxc().DB.GetCommitRedoBytes();
if (commitRedoBytes >= limitBytes) {
*reason = TStringBuilder()
<< "local tx commit redo size generated by IgniteOperation() is more than allowed limit: "
<< "commit redo size " << commitRedoBytes
<< ", limit " << limitBytes
<< ", excess " << (commitRedoBytes - limitBytes)
;
return true;
}
return false;
}

struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;

Expand All @@ -236,6 +289,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
TTabletId selfId = Self->SelfTabletId();
auto txId = TTxId(Request->Get()->Record.GetTxId());

LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxOperationPropose Execute"
Expand All @@ -246,7 +300,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken());
if (tokenParseError) {
auto txId = Request->Get()->Record.GetTxId();
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token");
return true;
}
Expand All @@ -258,10 +311,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
TStorageChanges dbChanges;
TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)};

//NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context.
// Unsuccessful IgniteOperation will leave no operation and context will also be clean.
Response = Self->IgniteOperation(*Request->Get(), context);

OnComplete.ApplyOnExecute(Self, txc, ctx);
//NOTE: Successfully created operation also must be checked for the size of this local tx.
//
// Limitation on a commit redo size of local transactions is imposed at the tablet executor level
// (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()).
// And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately.
//
// So even if operation was ignited successfully, it's local tx size still must be checked
// as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose
// persisted operation again, hitting commit redo size limit and restarting again.
//
// On unsuccessful check, local tx should be rolled back, operation should be rejected and
// all accumulated changes dropped or reverted.
//

// Actually build commit redo (dbChanges could be empty)
dbChanges.Apply(Self, txc, ctx);

if (Self->Operations.contains(txId)) {
Y_ABORT_UNLESS(Response->IsDone() || Response->IsAccepted() || Response->IsConditionalAccepted());

// Check local tx commit redo size
TString reason;
if (IsCommitRedoSizeOverLimit(&reason, context)) {
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusSchemeError, ui64(txId), ui64(selfId), reason);

AbortOperation(context, txId, reason);

if (!context.IsUndoChangesSafe()) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", opId: " << txId
<< ", operation should be rejected and all changes be reverted"
<< ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
<< ", message: " << SecureDebugString(Request->Get()->Record)
<< ", at schemeshard: " << context.SS->SelfTabletId()
);
}
}
}

// Apply accumulated changes (changes could be empty)
OnComplete.ApplyOnExecute(Self, txc, ctx);

return true;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4462,6 +4462,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping");
appData->Icb->RegisterSharedControl(FillAllocatePQ, "SchemeShard_FillAllocatePQ");

appData->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");

AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable();
appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable");

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class TSchemeShard
TControlWrapper DisablePublicationsOfDropping;
TControlWrapper FillAllocatePQ;

// Shared with NTabletFlatExecutor::TExecutor
TControlWrapper MaxCommitRedoMB;

TSplitSettings SplitSettings;

struct TTenantInitState {
Expand Down Expand Up @@ -370,6 +373,8 @@ class TSchemeShard
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};

THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context);
void AbortOperationPropose(const TTxId txId, TOperationContext& context);

THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId,
const TString& body, const TActorContext& ctx) const;

Expand Down Expand Up @@ -419,7 +424,7 @@ class TSchemeShard
return MakeLocalId(NextLocalPathId);
}

TPathId AllocatePathId () {
TPathId AllocatePathId() {
TPathId next = PeekNextPathId();
++NextLocalPathId;
return next;
Expand Down

0 comments on commit 49663a2

Please sign in to comment.