Skip to content

Commit

Permalink
Merge 9262662 into 1a59bb5
Browse files Browse the repository at this point in the history
  • Loading branch information
ijon authored Jul 20, 2024
2 parents 1a59bb5 + 9262662 commit 0d1cb58
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 16 deletions.
125 changes: 110 additions & 15 deletions ydb/core/tx/schemeshard/schemeshard__operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,12 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
}

TOperation::TPtr operation = new TOperation(txId);
Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : record.GetTransaction()) {
auto quotaResult = operation->ConsumeQuota(transaction, context);
if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId)));
response->SetError(quotaResult.Status, quotaResult.Reason);
Operations.erase(txId);
return std::move(response);
}
}
Expand All @@ -139,7 +137,6 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId)));
response->SetError(splitResult.Status, splitResult.Reason);
Operations.erase(txId);
return std::move(response);
}

Expand All @@ -148,11 +145,15 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request

const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;

bool prevProposeUndoSafe = true;

Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose

for (const auto& transaction : transactions) {
auto parts = operation->ConstructParts(transaction, context);

if (parts.size() > 1) {
// les't allow altering impl index tables as part of consistent operation
// allow altering impl index tables as part of consistent operation
context.IsAllowedPrivateTables = true;
}

Expand Down Expand Up @@ -206,25 +207,77 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
<< ", with reason: " << response->Record.GetReason()
<< ", tx message: " << PrintSecurely(record));

context.OnComplete = {}; // recreate
context.DbChanges = {};
AbortOperationPropose(txId, context);

for (auto& toAbort : operation->Parts) {
toAbort->AbortPropose(context);
}
return std::move(response);
}

context.MemChanges.UnDo(context.SS);
context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx);
Operations.erase(txId);
// Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) {
prevProposeUndoSafe = false;

return std::move(response);
LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Operation part proposed ok, but propose itself is undo unsafe"
<< ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType())
<< ", opId: " << part->GetOperationId()
<< ", at schemeshard: " << selfId
);
}
}
}

return std::move(response);
}

void TSchemeShard::AbortOperationPropose(const TTxId txId, TOperationContext& context) {
Y_ABORT_UNLESS(Operations.contains(txId));
TOperation::TPtr operation = Operations.at(txId);

// Drop operation side effects, undo memory changes
// (Local db changes were already applied)
context.OnComplete = {};
context.DbChanges = {};

for (auto& i : operation->Parts) {
i->AbortPropose(context);
}

context.MemChanges.UnDo(context.SS);

// And remove aborted operation from existence
Operations.erase(txId);
}

void AbortOperation(TOperationContext& context, const TTxId txId, const TString& reason) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", txId: " << txId
<< ", operation is rejected and all changes reverted"
<< ", " << reason
<< ", at schemeshard: " << context.SS->SelfTabletId()
);

context.GetTxc().DB.RollbackChanges();
context.SS->AbortOperationPropose(txId, context);
}

bool IsCommitRedoSizeOverLimit(TString* reason, TOperationContext& context) {
// MaxCommitRedoMB is the ICB control shared with NTabletFlatExecutor::TExecutor.
// We subtract from MaxCommitRedoMB additional 1MB for anything extra
// that executor/tablet may (or may not) add under the hood
const ui64 limitBytes = (context.SS->MaxCommitRedoMB - 1) << 20; // MB to bytes
const ui64 commitRedoBytes = context.GetTxc().DB.GetCommitRedoBytes();
if (commitRedoBytes >= limitBytes) {
*reason = TStringBuilder()
<< "local tx commit redo size generated by IgniteOperation() is more than allowed limit: "
<< "commit redo size " << commitRedoBytes
<< ", limit " << limitBytes
<< ", excess " << (commitRedoBytes - limitBytes)
;
return true;
}
return false;
}

struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;

Expand All @@ -244,6 +297,7 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
TTabletId selfId = Self->SelfTabletId();
auto txId = TTxId(Request->Get()->Record.GetTxId());

LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TTxOperationPropose Execute"
Expand All @@ -254,7 +308,6 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti

auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken());
if (tokenParseError) {
auto txId = Request->Get()->Record.GetTxId();
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token");
return true;
}
Expand All @@ -266,10 +319,52 @@ struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransacti
TStorageChanges dbChanges;
TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)};

//NOTE: Successful IgniteOperation will leave created operation in Self->Operations and accumulated changes in the context.
// Unsuccessful IgniteOperation will leave no operation and context will also be clean.
Response = Self->IgniteOperation(*Request->Get(), context);

OnComplete.ApplyOnExecute(Self, txc, ctx);
//NOTE: Successfully created operation also must be checked for the size of this local tx.
//
// Limitation on a commit redo size of local transactions is imposed at the tablet executor level
// (See ydb/core/tablet_flat/flat_executor.cpp, NTabletFlatExecutor::TExecutor::ExecuteTransaction()).
// And a tablet violating that limit is considered broken and will be stopped unconditionally and immediately.
//
// So even if operation was ignited successfully, it's local tx size still must be checked
// as a precaution measure to avoid infinite loop of schemeshard restarting, attempting to propose
// persisted operation again, hitting commit redo size limit and restarting again.
//
// On unsuccessful check, local tx should be rolled back, operation should be rejected and
// all accumulated changes dropped or reverted.
//

// Actually build commit redo (dbChanges could be empty)
dbChanges.Apply(Self, txc, ctx);

if (Self->Operations.contains(txId)) {
Y_ABORT_UNLESS(Response->IsDone() || Response->IsAccepted() || Response->IsConditionalAccepted());

// Check local tx commit redo size
TString reason;
if (IsCommitRedoSizeOverLimit(&reason, context)) {
Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusSchemeError, ui64(txId), ui64(selfId), reason);

AbortOperation(context, txId, reason);

if (!context.IsUndoChangesSafe()) {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "TTxOperationPropose Execute"
<< ", opId: " << txId
<< ", operation should be rejected and all changes be reverted"
<< ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
<< ", message: " << PrintSecurely(Request->Get()->Record)
<< ", at schemeshard: " << context.SS->SelfTabletId()
);
}
}
}

// Apply accumulated changes (changes could be empty)
OnComplete.ApplyOnExecute(Self, txc, ctx);

return true;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4358,6 +4358,8 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping");
appData->Icb->RegisterSharedControl(FillAllocatePQ, "SchemeShard_FillAllocatePQ");

appData->Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");

AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable();
appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable");

Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ class TSchemeShard
TControlWrapper DisablePublicationsOfDropping;
TControlWrapper FillAllocatePQ;

// Shared with NTabletFlatExecutor::TExecutor
TControlWrapper MaxCommitRedoMB;

TSplitSettings SplitSettings;

struct TTenantInitState {
Expand Down Expand Up @@ -350,6 +353,8 @@ class TSchemeShard
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};

THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context);
void AbortOperationPropose(const TTxId txId, TOperationContext& context);

THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId,
const TString& body, const TActorContext& ctx) const;

Expand Down Expand Up @@ -399,7 +404,7 @@ class TSchemeShard
return MakeLocalId(NextLocalPathId);
}

TPathId AllocatePathId () {
TPathId AllocatePathId() {
TPathId next = PeekNextPathId();
++NextLocalPathId;
return next;
Expand Down
113 changes: 113 additions & 0 deletions ydb/core/tx/schemeshard/ut_base/ut_commit_redo_limit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>

using namespace NKikimr;
using namespace NSchemeShard;
using namespace NSchemeShardUT_Private;

Y_UNIT_TEST_SUITE(TSchemeShardCheckProposeSize) {

//TODO: can't check all operations as many of them do not implement
// TSubOperation::AbortPropose() properly and will abort.

Y_UNIT_TEST(CopyTable) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);

// Take control over MaxCommitRedoMB ICB setting.
// Drop down its min-value limit to be able to set it as low as test needs.
TControlWrapper MaxCommitRedoMB;
{
runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
MaxCommitRedoMB.Reset(200, 1, 4096);
}

ui64 txId = 100;

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "table"
Columns { Name: "key" Type: "Uint64"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

// 1. Set MaxCommitRedoMB to 1 and try to create table.
//
// (Check at the operation's Propose tests commit redo size against (MaxCommitRedoMB - 1)
// to give 1MB leeway to executer/tablet inner stuff to may be do "something extra".
// So MaxCommitRedoMB = 1 means effective 0 for the size of operation's commit.)
{
MaxCommitRedoMB = 1;
AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table");
TestModificationResults(runtime, txId,
{{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}}
);
env.TestWaitNotification(runtime, txId);
}

// 2. Set MaxCommitRedoMB back to high value and try again.
{
MaxCommitRedoMB = 200;
AsyncCopyTable(runtime, ++txId, "/MyRoot", "table-copy", "/MyRoot/table");
env.TestWaitNotification(runtime, txId);
}
}

Y_UNIT_TEST(CopyTables) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);

// Take control over MaxCommitRedoMB ICB setting.
// Drop down its min-value limit to be able to set it as low as test needs.
TControlWrapper MaxCommitRedoMB;
{
runtime.GetAppData().Icb->RegisterSharedControl(MaxCommitRedoMB, "TabletControls.MaxCommitRedoMB");
MaxCommitRedoMB.Reset(200, 1, 4096);
}

const ui64 tables = 100;
const ui64 shardsPerTable = 1;

ui64 txId = 100;

for (ui64 i : xrange(tables)) {
TestCreateTable(runtime, ++txId, "/MyRoot", Sprintf(
R"(
Name: "table-%lu"
Columns { Name: "key" Type: "Uint64"}
Columns { Name: "value" Type: "Utf8"}
KeyColumnNames: ["key"]
UniformPartitionsCount: %lu
)",
i,
shardsPerTable
));
env.TestWaitNotification(runtime, txId);
}

auto testCopyTables = [](auto& runtime, ui64 txId, ui64 tables) {
TVector<TEvTx*> schemeTxs;
for (ui64 i : xrange(tables)) {
schemeTxs.push_back(CopyTableRequest(txId, "/MyRoot", Sprintf("table-%lu-copy", i), Sprintf("/MyRoot/table-%lu", i)));
}
AsyncSend(runtime, TTestTxConfig::SchemeShard, CombineSchemeTransactions(schemeTxs));
};

// 1. Set MaxCommitRedoMB to 1 and try to copy tables.
{
MaxCommitRedoMB = 1;
testCopyTables(runtime, ++txId, tables);
TestModificationResults(runtime, txId,
{{NKikimrScheme::StatusSchemeError, "local tx commit redo size generated by IgniteOperation() is more than allowed limit"}}
);
}

// 2. Set MaxCommitRedoMB back to high value and try again.
{
MaxCommitRedoMB = 200;
testCopyTables(runtime, ++txId, tables);
TestModificationResults(runtime, txId, {{NKikimrScheme::StatusAccepted}});
}
}

}
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/ut_base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ SRCS(
ut_info_types.cpp
ut_allocate_pq.cpp
ut_table_pg_types.cpp
ut_commit_redo_limit.cpp
)

END()

0 comments on commit 0d1cb58

Please sign in to comment.