From 79bc3c243b0896ca2988aab3808dc82dbfa25ab8 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 19 Sep 2024 12:30:50 +0300 Subject: [PATCH] [24-3-8] analytics: locks fixes & basic HTAP (#9117) --- .github/config/muted_ya.txt | 5 + ydb/core/kqp/common/kqp_tx.cpp | 29 +++- ydb/core/kqp/common/kqp_tx.h | 51 ++++++ .../kqp/compile_service/kqp_compile_actor.cpp | 1 + .../compile_service/kqp_compile_service.cpp | 2 + .../kqp/executer_actor/kqp_data_executer.cpp | 148 ++++++++++++------ ydb/core/kqp/executer_actor/kqp_executer.h | 8 +- .../kqp/executer_actor/kqp_executer_impl.cpp | 19 +-- .../kqp/executer_actor/kqp_executer_impl.h | 17 +- .../kqp/node_service/kqp_node_service.cpp | 22 +++ .../effects/kqp_opt_phy_delete_index.cpp | 10 +- .../effects/kqp_opt_phy_effects_impl.h | 4 +- .../physical/effects/kqp_opt_phy_indexes.cpp | 4 +- .../effects/kqp_opt_phy_insert_index.cpp | 15 +- .../effects/kqp_opt_phy_upsert_index.cpp | 15 +- .../kqp/opt/physical/kqp_opt_phy_helpers.cpp | 4 + ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h | 3 + ydb/core/kqp/provider/yql_kikimr_settings.h | 1 + ydb/core/kqp/runtime/kqp_write_actor.cpp | 96 ++++++------ .../kqp/runtime/kqp_write_actor_settings.cpp | 28 ++++ .../kqp/runtime/kqp_write_actor_settings.h | 28 ++++ ydb/core/kqp/runtime/kqp_write_table.cpp | 2 +- ydb/core/kqp/runtime/ya.make | 1 + .../kqp/session_actor/kqp_session_actor.cpp | 43 +++-- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 70 +++++++++ ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp | 2 + ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp | 1 + ydb/core/kqp/ut/tx/kqp_sink_common.h | 5 +- ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp | 41 ++--- ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp | 99 +++++++++++- ydb/core/protos/data_events.proto | 4 + ydb/core/protos/table_service_config.proto | 20 ++- 32 files changed, 618 insertions(+), 180 deletions(-) create mode 100644 ydb/core/kqp/runtime/kqp_write_actor_settings.cpp create mode 100644 ydb/core/kqp/runtime/kqp_write_actor_settings.h diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index ce8392c1c381..aa54aa1413fc 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -18,6 +18,11 @@ ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssueInteractiveTx+withSink ydb/core/kqp/ut/tx KqpLocksTricky.TestNoLocksIssue+withSink ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink ydb/core/kqp/ut/tx KqpSinkTx.InvalidateOnError +ydb/core/kqp/ut/tx KqpSinkMvcc.ReadWriteTxFailsOnConcurrentWrite3 +ydb/core/kqp/ut/tx KqpSinkMvcc.OltpNamedStatement +ydb/core/kqp/ut/tx KqpSinkMvcc.OlapNamedStatement +ydb/core/kqp/ut/tx KqpSinkMvcc.OlapMultiSinks +ydb/core/kqp/ut/tx KqpSinkMvcc.OltpMultiSinks ydb/core/kqp/ut/query KqpLimits.QueryReplySize ydb/core/kqp/ut/query KqpQuery.QueryTimeout ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index a4cb09bc8f1a..cd5549de76a9 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -14,17 +14,17 @@ NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const if (pathId.OwnerId() != 0) { auto table = txCtx.TableByIdMap.FindPtr(pathId); if (!table) { - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); } - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << *table << "`"); } else { // Olap tables don't return SchemeShard in locks, thus we use tableId here. for (const auto& [pathId, table] : txCtx.TableByIdMap) { if (pathId.TableId() == pathId.TableId()) { - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << table << "`"); } } - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); } } @@ -36,6 +36,27 @@ TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpT invalidatedLock.GetPathId())); } +NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId) { + TStringBuilder message; + message << "Transaction locks invalidated."; + + if (auto tableInfoPtr = shardIdToTableInfo.GetPtr(shardId); tableInfoPtr) { + message << " Tables: "; + bool first = true; + for (const auto& path : tableInfoPtr->Pathes) { + if (!first) { + message << ", "; + first = false; + } + message << "`" << path << "`"; + } + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message); + } else { + message << " Unknown table."; + } + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message); +} + std::pair> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx) { diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index a668c0ea4977..ae355f3dc36d 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -121,6 +121,50 @@ struct TDeferredEffects { friend class TKqpTransactionContext; }; +struct TTableInfo { + bool IsOlap = false; + THashSet Pathes; +}; + + +class TShardIdToTableInfo { +public: + const TTableInfo& Get(ui64 shardId) const { + const auto* result = GetPtr(shardId); + AFL_ENSURE(result); + return *result; + } + + const TTableInfo* GetPtr(ui64 shardId) const { + auto it = ShardIdToInfo.find(shardId); + return it != std::end(ShardIdToInfo) + ? &it->second + : nullptr; + } + + void Add(ui64 shardId, bool isOlap, const TString& path) { + const auto [stringsIter, _] = Strings.insert(path); + const TStringBuf pathBuf = *stringsIter; + auto infoIter = ShardIdToInfo.find(shardId); + if (infoIter != std::end(ShardIdToInfo)) { + AFL_ENSURE(infoIter->second.IsOlap == isOlap); + infoIter->second.Pathes.insert(pathBuf); + } else { + ShardIdToInfo.emplace( + shardId, + TTableInfo{ + .IsOlap = isOlap, + .Pathes = {pathBuf}, + }); + } + } + +private: + THashMap ShardIdToInfo; + std::unordered_set Strings;// Pointers aren't invalidated. +}; +using TShardIdToTableInfoPtr = std::shared_ptr; + class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { public: explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry, @@ -287,6 +331,12 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { TTxAllocatorState::TPtr TxAlloc; IKqpGateway::TKqpSnapshotHandle SnapshotHandle; + + bool HasOlapTable = false; + bool HasOltpTable = false; + bool HasTableWrite = false; + + TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared(); }; struct TTxId { @@ -435,6 +485,7 @@ class TTransactionsCache { }; NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId); +NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId); std::pair> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx); diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 7f1a4f2e4bb7..652504402ca2 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -603,6 +603,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableCreateTableAs = serviceConfig.GetEnableCreateTableAs(); kqpConfig.EnableOlapSink = serviceConfig.GetEnableOlapSink(); kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink(); + kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx(); kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode(); kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit(); kqpConfig.OldLookupJoinBehaviour = serviceConfig.GetOldLookupJoinBehaviour(); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index add71b771dc7..7311fba18377 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -525,6 +525,7 @@ class TKqpCompileService : public TActorBootstrapped { bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault(); bool enableOlapSink = TableServiceConfig.GetEnableOlapSink(); bool enableOltpSink = TableServiceConfig.GetEnableOltpSink(); + bool enableHtapTx = TableServiceConfig.GetEnableHtapTx(); bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs(); auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode(); @@ -558,6 +559,7 @@ class TKqpCompileService : public TActorBootstrapped { TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault || TableServiceConfig.GetEnableOlapSink() != enableOlapSink || TableServiceConfig.GetEnableOltpSink() != enableOltpSink || + TableServiceConfig.GetEnableHtapTx() != enableHtapTx || TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs || TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode || TableServiceConfig.GetOldLookupJoinBehaviour() != oldLookupJoinBehaviour || diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 7133090d97f0..07771776059e 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -129,15 +129,16 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, - const TGUCSettings::TPtr& GUCSettings) + const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, + const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) - , EnableOlapSink(enableOlapSink) , UseEvWrite(useEvWrite) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) + , ShardIdToTableInfo(shardIdToTableInfo) + , HtapTx(htapTx) { Target = creator; @@ -210,42 +211,49 @@ class TKqpDataExecuter : public TKqpExecuterBaseBrokenLockShardId); + return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); } ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS); Counters->TxProxyMon->ReportStatusOK->Inc(); - auto addLocks = [this](const auto& data) { + auto addLocks = [this](const ui64 taskId, const auto& data) { if (data.GetData().template Is()) { NKikimrTxDataShard::TEvKqpInputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); for (auto& lock : info.GetLocks()) { Locks.push_back(lock); + + const auto& task = TasksGraph.GetTask(taskId); + const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); + ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath); } } else if (data.GetData().template Is()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); for (auto& lock : info.GetLocks()) { Locks.push_back(lock); + + const auto& task = TasksGraph.GetTask(taskId); + const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); + ShardIdToTableInfo->Add(lock.GetDataShard(), stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath); } } }; - for (auto& [_, data] : ExtraData) { - for (const auto& source : data.GetSourcesExtraData()) { - addLocks(source); + for (auto& [_, extraData] : ExtraData) { + for (const auto& source : extraData.Data.GetSourcesExtraData()) { + addLocks(extraData.TaskId, source); } - for (const auto& transform : data.GetInputTransformsData()) { - addLocks(transform); + for (const auto& transform : extraData.Data.GetInputTransformsData()) { + addLocks(extraData.TaskId, transform); } - for (const auto& sink : data.GetSinksExtraData()) { - addLocks(sink); + for (const auto& sink : extraData.Data.GetSinksExtraData()) { + addLocks(extraData.TaskId, sink); } - if (data.HasComputeExtraData()) { - addLocks(data.GetComputeExtraData()); + if (extraData.Data.HasComputeExtraData()) { + addLocks(extraData.TaskId, extraData.Data.GetComputeExtraData()); } } @@ -499,12 +507,15 @@ class TKqpDataExecuter : public TKqpExecuterBaseState == TShardState::EState::Preparing); Counters->TxProxyMon->TxResultAborted->Inc(); LocksBroken = true; + ResponseEv->BrokenLockShardId = shardId; - YQL_ENSURE(!res->Record.GetTxLocks().empty()); - ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( - res->Record.GetTxLocks(0).GetSchemeShard(), - res->Record.GetTxLocks(0).GetPathId()); + if (!res->Record.GetTxLocks().empty()) { + ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( + res->Record.GetTxLocks(0).GetSchemeShard(), + res->Record.GetTxLocks(0).GetPathId()); + } ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); + return; } default: { @@ -1191,11 +1202,16 @@ class TKqpDataExecuter : public TKqpExecuterBaseState = TShardState::EState::Finished; Counters->TxProxyMon->TxResultAborted->Inc(); LocksBroken = true; - YQL_ENSURE(!res->Record.GetTxLocks().empty()); - ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( - res->Record.GetTxLocks(0).GetSchemeShard(), - res->Record.GetTxLocks(0).GetPathId()); - ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); + ResponseEv->BrokenLockShardId = shardId; + + if (!res->Record.GetTxLocks().empty()) { + ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( + res->Record.GetTxLocks(0).GetSchemeShard(), + res->Record.GetTxLocks(0).GetPathId()); + return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); + } + CheckExecutionComplete(); + return; } default: { @@ -1250,9 +1266,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseTxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter? LocksBroken = true; + ResponseEv->BrokenLockShardId = shardId; if (!res->Record.GetTxLocks().empty()) { - ResponseEv->BrokenLockPathId = TKikimrPathId( + ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( res->Record.GetTxLocks(0).GetSchemeShard(), res->Record.GetTxLocks(0).GetPathId()); return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); @@ -1884,9 +1901,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetTables()); - if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) - || (!EnableOlapSink && hasOlapSink)) { + if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, @@ -1955,6 +1970,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseAdd(task.Meta.ShardId, stageInfo.Meta.TableKind == ETableKind::Olap, stageInfo.Meta.TablePath); } else if (stageInfo.Meta.IsSysView()) { computeTasks.emplace_back(task.Id); } else { @@ -2330,7 +2347,10 @@ class TKqpDataExecuter : public TKqpExecuterBase sendingShardsSet; absl::flat_hash_set receivingShardsSet; + absl::flat_hash_set sendingColumnShardsSet; + absl::flat_hash_set receivingColumnShardsSet; ui64 arbiter = 0; + std::optional columnShardArbiter; // Gather shards that need to send/receive readsets (shards with effects) if (needCommit) { @@ -2350,17 +2370,32 @@ class TKqpDataExecuter : public TKqpExecuterBaseHasLocks()) { - // Locks may be broken so shards with locks need to send readsets - sendingShardsSet.insert(shardId); - } - if (ShardsWithEffects.contains(shardId)) { - // Volatile transactions may abort effects, so they send readsets - if (VolatileTx) { + if (ShardIdToTableInfo->Get(shardId).IsOlap && HtapTx) { + if (tx->HasLocks()) { + // Locks may be broken so shards with locks need to send readsets + sendingColumnShardsSet.insert(shardId); + } + if (ShardsWithEffects.contains(shardId)) { + // Volatile transactions may abort effects, so they send readsets + if (VolatileTx) { + sendingColumnShardsSet.insert(shardId); + } + // Effects are only applied when all locks are valid + receivingColumnShardsSet.insert(shardId); + } + } else { + if (tx->HasLocks()) { + // Locks may be broken so shards with locks need to send readsets sendingShardsSet.insert(shardId); } - // Effects are only applied when all locks are valid - receivingShardsSet.insert(shardId); + if (ShardsWithEffects.contains(shardId)) { + // Volatile transactions may abort effects, so they send readsets + if (VolatileTx) { + sendingShardsSet.insert(shardId); + } + // Effects are only applied when all locks are valid + receivingShardsSet.insert(shardId); + } } } @@ -2397,9 +2432,9 @@ class TKqpDataExecuter : public TKqpExecuterBase= minArbiterMeshSize && - AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters()) + AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())) { std::vector candidates; candidates.reserve(receivingShardsSet.size()); @@ -2411,10 +2446,20 @@ class TKqpDataExecuter : public TKqpExecuterBase= minArbiterMeshSize) { // Select a random arbiter - ui32 index = RandomNumber(candidates.size()); + const ui32 index = RandomNumber(candidates.size()); arbiter = candidates.at(index); } } + + if (!receivingColumnShardsSet.empty()) { + const ui32 index = RandomNumber(receivingColumnShardsSet.size()); + auto arbiterIterator = std::begin(receivingColumnShardsSet); + std::advance(arbiterIterator, index); + columnShardArbiter = *arbiterIterator; + + sendingShardsSet.insert(*columnShardArbiter); + receivingShardsSet.insert(*columnShardArbiter); + } } @@ -2426,6 +2471,12 @@ class TKqpDataExecuter : public TKqpExecuterBase sendingColumnShards(sendingColumnShardsSet.begin(), sendingColumnShardsSet.end()); + NProtoBuf::RepeatedField receivingColumnShards(receivingColumnShardsSet.begin(), receivingColumnShardsSet.end()); + + std::sort(sendingColumnShards.begin(), sendingColumnShards.end()); + std::sort(receivingColumnShards.begin(), receivingColumnShards.end()); + for (auto& [shardId, shardTx] : datashardTxs) { shardTx->MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); *shardTx->MutableLocks()->MutableSendingShards() = sendingShards; @@ -2439,9 +2490,14 @@ class TKqpDataExecuter : public TKqpExecuterBaseMutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); *tx->MutableLocks()->MutableSendingShards() = sendingShards; *tx->MutableLocks()->MutableReceivingShards() = receivingShards; + *tx->MutableLocks()->MutableSendingColumnShards() = sendingColumnShards; + *tx->MutableLocks()->MutableReceivingColumnShards() = receivingColumnShards; if (arbiter) { tx->MutableLocks()->SetArbiterShard(arbiter); } + if (columnShardArbiter) { + tx->MutableLocks()->SetArbiterColumnShard(*columnShardArbiter); + } } for (auto& [_, tx] : topicTxs) { @@ -2781,10 +2837,11 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; + TShardIdToTableInfoPtr ShardIdToTableInfo; + const bool HtapTx = false; bool HasExternalSources = false; bool SecretSnapshotRequired = false; @@ -2828,12 +2885,13 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) + const bool useEvWrite, ui32 statementResultIndex, + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, userRequestContext, - enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); + useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 203f6666214a..03d739b4f6c0 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -27,6 +28,8 @@ struct TEvKqpExecuter { NLWTrace::TOrbit Orbit; IKqpGateway::TKqpSnapshot Snapshot; std::optional BrokenLockPathId; + std::optional BrokenLockShardId; + ui64 ResultRowsCount = 0; ui64 ResultRowsBytes = 0; @@ -104,8 +107,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const bool useEvWrite, ui32 statementResultIndex, + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); IActor* CreateKqpSchemeExecuter( TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 40adee90bd9c..c1c617a3a277 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -39,7 +39,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch ResultRowsBytes += rows.Size(); auto guard = AllocState->TypeEnv.BindAllocator(); auto& result = TxResults[idx]; - if (rows.RowCount() || !result.IsStream) { + if (rows.RowCount()) { NDq::TDqDataSerializer dataSerializer( AllocState->TypeEnv, AllocState->HolderFactory, static_cast(rows.Proto.GetTransportVersion())); @@ -83,16 +83,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) + const bool useEvWrite, ui32 statementResultIndex, + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr + userRequestContext, useEvWrite, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx ); } @@ -114,8 +115,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr + userRequestContext, useEvWrite, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo, htapTx ); case NKqpProto::TKqpPhyTx::TYPE_SCAN: @@ -129,8 +130,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter( std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, - userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, - federatedQuerySetup, GUCSettings + userRequestContext, useEvWrite, statementResultIndex, + federatedQuerySetup, GUCSettings, shardIdToTableInfo, htapTx ); default: diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index b49814fdf457..dd02138d83ad 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -417,7 +417,10 @@ class TKqpExecuterBase : public TActorBootstrapped { case NYql::NDqProto::COMPUTE_STATE_FINISHED: // Don't finalize stats twice. if (Planner->CompletedCA(taskId, computeActor)) { - ExtraData[computeActor].Swap(state.MutableExtraData()); + auto& extraData = ExtraData[computeActor]; + extraData.TaskId = taskId; + extraData.Data.Swap(state.MutableExtraData()); + Stats->AddComputeActorStats( computeActor.NodeId(), @@ -1941,7 +1944,12 @@ class TKqpExecuterBase : public TActorBootstrapped { TActorId KqpTableResolverId; TActorId KqpShardsResolverId; - THashMap ExtraData; + + struct TExtraData { + ui64 TaskId; + NYql::NDqProto::TComputeActorExtraData Data; + }; + THashMap ExtraData; TInstant StartResolveTime; TInstant LastResourceUsageUpdate; @@ -1989,8 +1997,9 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const bool useEvWrite, ui32 statementResultIndex, + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + const TShardIdToTableInfoPtr& shardIdToTableInfo, const bool htapTx); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/node_service/kqp_node_service.cpp b/ydb/core/kqp/node_service/kqp_node_service.cpp index 162806f1de15..dd996b3249d4 100644 --- a/ydb/core/kqp/node_service/kqp_node_service.cpp +++ b/ydb/core/kqp/node_service/kqp_node_service.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -81,6 +82,9 @@ class TKqpNodeService : public TActorBootstrapped { if (config.HasIteratorReadQuotaSettings()) { SetIteratorReadsQuotaSettings(config.GetIteratorReadQuotaSettings()); } + if (config.HasWriteActorSettings()) { + SetWriteActorSettings(config.GetWriteActorSettings()); + } SchedulerOptions = { .AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()), @@ -418,6 +422,24 @@ class TKqpNodeService : public TActorBootstrapped { SetReadIteratorBackoffSettings(ptr); } + void SetWriteActorSettings(const NKikimrConfig::TTableServiceConfig::TWriteActorSettings& settings) { + auto ptr = MakeIntrusive(); + + ptr->InFlightMemoryLimitPerActorBytes = settings.GetInFlightMemoryLimitPerActorBytes(); + ptr->MemoryLimitPerMessageBytes = settings.GetMemoryLimitPerMessageBytes(); + ptr->MaxBatchesPerMessage = settings.GetMaxBatchesPerMessage(); + + ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartRetryDelayMs()); + ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxRetryDelayMs()); + ptr->UnsertaintyRatio = settings.GetUnsertaintyRatio(); + ptr->Multiplier = settings.GetMultiplier(); + + ptr->MaxWriteAttempts = settings.GetMaxWriteAttempts(); + ptr->MaxResolveAttempts = settings.GetMaxResolveAttempts(); + + NKikimr::NKqp::SetWriteActorSettings(ptr); + } + void HandleWork(TEvents::TEvUndelivered::TPtr& ev) { switch (ev->Get()->SourceType) { case TEvKqpNode::TEvStartKqpTasksResponse::EventType: { diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp index 87cca8148424..aade00fdafd3 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_delete_index.cpp @@ -88,14 +88,18 @@ TExprBase KqpBuildDeleteIndexStages(TExprBase node, TExprContext& ctx, const TKq effects.emplace_back(tableDelete); for (const auto& [tableNode, indexDesc] : indexes) { - THashSet indexTableColumns; + THashSet indexTableColumnsSet; + TVector indexTableColumns; for (const auto& column : indexDesc->KeyColumns) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); } for (const auto& column : pk) { - indexTableColumns.insert(column); + if (indexTableColumnsSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } } auto deleteIndexKeys = MakeRowsFromDict(lookupDict.Cast(), pk, indexTableColumns, del.Pos(), ctx); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h index bbb42ef50236..c56ab9096fb4 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_effects_impl.h @@ -62,11 +62,11 @@ NYql::NNodes::TCoLambda MakeRowsPayloadSelector(const NYql::NNodes::TCoAtomList& const NYql::TKikimrTableDescription& table, NYql::TPositionHandle pos, NYql::TExprContext& ctx); NYql::NNodes::TExprBase MakeRowsFromDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); + const TVector& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); // Same as MakeRowsFromDict but skip rows which marked as non changed (true in second tuple) NYql::NNodes::TExprBase MakeRowsFromTupleDict(const NYql::NNodes::TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); + const TVector& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); NYql::NNodes::TMaybeNode MakeConditionalInsertRows(const NYql::NNodes::TExprBase& input, const NYql::TKikimrTableDescription& table, const TMaybe>& inputColumn, bool abortOnError, diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp index 3bb2e9a50dc2..891ae7164183 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_indexes.cpp @@ -238,7 +238,7 @@ TMaybeNode PrecomputeTableLookupDict(const TDqPhyPrecompute& l } TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, TPositionHandle pos, TExprContext& ctx) + const TVector& columns, TPositionHandle pos, TExprContext& ctx) { THashSet dictKeysSet(dictKeys.begin(), dictKeys.end()); auto dictTupleArg = TCoArgument(ctx.NewArgument(pos, "dict_tuple")); @@ -296,7 +296,7 @@ TExprBase MakeRowsFromDict(const TDqPhyPrecompute& dict, const TVector& } TExprBase MakeRowsFromTupleDict(const TDqPhyPrecompute& dict, const TVector& dictKeys, - const THashSet& columns, TPositionHandle pos, TExprContext& ctx) + const TVector& columns, TPositionHandle pos, TExprContext& ctx) { THashSet dictKeysSet(dictKeys.begin(), dictKeys.end()); auto dictTupleArg = TCoArgument(ctx.NewArgument(pos, "dict_tuple")); diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp index 6553dc98514b..b158dbb9ab73 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_insert_index.cpp @@ -12,7 +12,7 @@ using namespace NYql::NNodes; namespace { TExprBase MakeInsertIndexRows(const TDqPhyPrecompute& inputRows, const TKikimrTableDescription& table, - const THashSet& inputColumns, const THashSet& indexColumns, + const THashSet& inputColumns, const TVector& indexColumns, TPositionHandle pos, TExprContext& ctx) { auto inputRowArg = TCoArgument(ctx.NewArgument(pos, "input_row")); @@ -113,19 +113,24 @@ TExprBase KqpBuildInsertIndexStages(TExprBase node, TExprContext& ctx, const TKq effects.emplace_back(upsertTable); for (const auto& [tableNode, indexDesc] : indexes) { - THashSet indexTableColumns; + THashSet indexTableColumnsSet; + TVector indexTableColumns; for (const auto& column : indexDesc->KeyColumns) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); } for (const auto& column : table.Metadata->KeyColumnNames) { - indexTableColumns.insert(column); + if (indexTableColumnsSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } } for (const auto& column : indexDesc->DataColumns) { if (inputColumnsSet.contains(column)) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); } } diff --git a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp index 032bff0e69e9..6fa52f0e5cde 100644 --- a/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp +++ b/ydb/core/kqp/opt/physical/effects/kqp_opt_phy_upsert_index.cpp @@ -150,7 +150,7 @@ TExprBase MakeNonexistingRowsFilter(const TDqPhyPrecompute& inputRows, const TDq TExprBase MakeUpsertIndexRows(TKqpPhyUpsertIndexMode mode, const TDqPhyPrecompute& inputRows, const TDqPhyPrecompute& lookupDict, const THashSet& inputColumns, - const THashSet& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos, + const TVector& indexColumns, const TKikimrTableDescription& table, TPositionHandle pos, TExprContext& ctx, bool opt) { // Check if we can update index table from just input data @@ -686,9 +686,11 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, for (const auto& [tableNode, indexDesc] : indexes) { bool indexKeyColumnsUpdated = false; - THashSet indexTableColumns; + THashSet indexTableColumnsSet; + TVector indexTableColumns; for (const auto& column : indexDesc->KeyColumns) { - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); if (mode == TKqpPhyUpsertIndexMode::UpdateOn && table.GetKeyColumnIndex(column)) { // Table PK cannot be updated, so don't consider PK columns update as index update @@ -701,7 +703,9 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, } for (const auto& column : pk) { - indexTableColumns.insert(column); + if (indexTableColumnsSet.insert(column).second) { + indexTableColumns.emplace_back(column); + } } auto indexTableColumnsWithoutData = indexTableColumns; @@ -710,7 +714,8 @@ TMaybeNode KqpPhyUpsertIndexEffectsImpl(TKqpPhyUpsertIndexMode mode, bool optUpsert = true; for (const auto& column : indexDesc->DataColumns) { // TODO: Conder not fetching/updating data columns without input value. - YQL_ENSURE(indexTableColumns.emplace(column).second); + YQL_ENSURE(indexTableColumnsSet.emplace(column).second); + indexTableColumns.emplace_back(column); if (inputColumnsSet.contains(column)) { indexDataColumnsUpdated = true; diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp index 18a439af0c21..779788fbad9c 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_helpers.cpp @@ -83,6 +83,10 @@ TCoAtomList BuildColumnsList(const THashSet& columns, TPositionHandl return BuildColumnsListImpl(columns, pos, ctx); } +TCoAtomList BuildColumnsList(const TVector& columns, NYql::TPositionHandle pos,NYql::TExprContext& ctx) { + return BuildColumnsListImpl(columns, pos, ctx); +} + TCoAtomList BuildColumnsList(const TVector& columns, TPositionHandle pos, TExprContext& ctx) { return BuildColumnsListImpl(columns, pos, ctx); } diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h index 0e22dbac8e4f..867694931fa9 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_impl.h @@ -19,6 +19,9 @@ NYql::NNodes::TMaybeNode BuildLookupKeysPrecompu NYql::NNodes::TCoAtomList BuildColumnsList(const THashSet& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); +NYql::NNodes::TCoAtomList BuildColumnsList(const TVector& columns, NYql::TPositionHandle pos, + NYql::TExprContext& ctx); + NYql::NNodes::TCoAtomList BuildColumnsList(const TVector& columns, NYql::TPositionHandle pos, NYql::TExprContext& ctx); diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index c3963f079fcc..f9e46de62fe3 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -172,6 +172,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi bool OldLookupJoinBehaviour = true; bool EnableOlapSink = false; bool EnableOltpSink = false; + bool EnableHtapTx = false; NKikimrConfig::TTableServiceConfig_EBlockChannelsMode BlockChannelsMode; bool EnableSpillingGenericQuery = false; ui32 DefaultCostBasedOptimizationLevel = 4; diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 085b02a1f68f..d57096e8c9dc 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1,6 +1,7 @@ #include "kqp_write_actor.h" #include "kqp_write_table.h" +#include "kqp_write_actor_settings.h" #include #include @@ -23,32 +24,14 @@ namespace { - constexpr i64 kInFlightMemoryLimitPerActor = 64_MB; - constexpr i64 kMemoryLimitPerMessage = 64_MB; - constexpr i64 kMaxBatchesPerMessage = 8; - - struct TWriteActorBackoffSettings { - TDuration StartRetryDelay = TDuration::MilliSeconds(250); - TDuration MaxRetryDelay = TDuration::Seconds(5); - double UnsertaintyRatio = 0.5; - double Multiplier = 2.0; - - ui64 MaxWriteAttempts = 32; - ui64 MaxResolveAttempts = 5; - }; - - const TWriteActorBackoffSettings* BackoffSettings() { - return Singleton(); - } - - TDuration CalculateNextAttemptDelay(ui64 attempt) { - auto delay = BackoffSettings()->StartRetryDelay; + TDuration CalculateNextAttemptDelay(const NKikimr::NKqp::TWriteActorSettings& settings, ui64 attempt) { + auto delay = settings.StartRetryDelay; for (ui64 index = 0; index < attempt; ++index) { - delay *= BackoffSettings()->Multiplier; + delay *= settings.Multiplier; } - delay *= 1 + BackoffSettings()->UnsertaintyRatio * (1 - 2 * RandomNumber()); - delay = Min(delay, BackoffSettings()->MaxRetryDelay); + delay *= 1 + settings.UnsertaintyRatio * (1 - 2 * RandomNumber()); + delay = Min(delay, settings.MaxRetryDelay); return delay; } @@ -133,6 +116,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu TIntrusivePtr counters) : LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ". ") , Settings(std::move(settings)) + , MessageSettings(GetWriteActorSettings()) , OutputIndex(args.OutputIndex) , Callbacks(args.Callback) , Counters(counters) @@ -149,6 +133,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu Settings.GetImmediateTx()) , InconsistentTx( Settings.GetInconsistentTx()) + , MemoryLimit(MessageSettings.InFlightMemoryLimitPerActorBytes) { YQL_ENSURE(std::holds_alternative(TxId)); YQL_ENSURE(!ImmediateTx); @@ -248,9 +233,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu } void PlanResolveTable() { - CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(ResolveAttempts)); + CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(MessageSettings, ResolveAttempts)); TlsActivationContext->Schedule( - CalculateNextAttemptDelay(ResolveAttempts), + CalculateNextAttemptDelay(MessageSettings, ResolveAttempts), new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0)); } @@ -262,7 +247,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu SchemeEntry.reset(); SchemeRequest.reset(); - if (ResolveAttempts++ >= BackoffSettings()->MaxResolveAttempts) { + if (ResolveAttempts++ >= MessageSettings.MaxResolveAttempts) { CA_LOG_E(TStringBuilder() << "Too many table resolve attempts for table " << TableId << "."); RuntimeError( @@ -391,8 +376,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got UNSPECIFIED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Unspecified error for table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::UNSPECIFIED, getIssues()); return; @@ -411,8 +397,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got ABORTED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Aborted for table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::ABORTED, getIssues()); return; @@ -430,8 +417,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu RetryResolveTable(); } else { RuntimeError( - TStringBuilder() << "Got INTERNAL ERROR for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Internal error for table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::INTERNAL_ERROR, getIssues()); } @@ -447,8 +435,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu // TODO: support waiting if (!InconsistentTx) { RuntimeError( - TStringBuilder() << "Got OVERLOADED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::OVERLOADED, getIssues()); } @@ -461,8 +450,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got CANCELLED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Cancelled request to table `" + << SchemeEntry->TableId.PathId.ToString() << "`." + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::CANCELLED, getIssues()); return; @@ -474,8 +464,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got BAD REQUEST for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Bad request. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::BAD_REQUEST, getIssues()); return; @@ -491,8 +482,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu RetryResolveTable(); } else { RuntimeError( - TStringBuilder() << "Got SCHEME CHANGED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Scheme changed. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::SCHEME_ERROR, getIssues()); } @@ -505,8 +497,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got LOCKS BROKEN for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Transaction locks invalidated. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::ABORTED, getIssues()); return; @@ -531,7 +524,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu for (const auto& lock : ev->Get()->Record.GetTxLocks()) { if (!LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock)) { RuntimeError( - TStringBuilder() << "Got LOCKS BROKEN for table `" + TStringBuilder() << "Transaction locks invalidated. Table `" << SchemeEntry->TableId.PathId.ToString() << "`.", NYql::NDqProto::StatusIds::ABORTED, NYql::TIssues{}); @@ -573,7 +566,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu void SendDataToShard(const ui64 shardId) { const auto metadata = ShardedWriteController->GetMessageMetadata(shardId); YQL_ENSURE(metadata); - if (metadata->SendAttempts >= BackoffSettings()->MaxWriteAttempts) { + if (metadata->SendAttempts >= MessageSettings.MaxWriteAttempts) { CA_LOG_E("ShardId=" << shardId << " for table '" << Settings.GetTable().GetPath() << "': retry limit exceeded." @@ -643,7 +636,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu if (InconsistentTx) { TlsActivationContext->Schedule( - CalculateNextAttemptDelay(metadata->SendAttempts), + CalculateNextAttemptDelay(MessageSettings, metadata->SendAttempts), new IEventHandle( SelfId(), SelfId(), @@ -699,7 +692,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu RetryShard(ev->Get()->TabletId, std::nullopt); } else { RuntimeError( - TStringBuilder() << "Error while delivering message to tablet " << ev->Get()->TabletId, + TStringBuilder() + << "Error writing to table `" << SchemeEntry->TableId.PathId.ToString() << "`" + << ": can't deliver message to tablet " << ev->Get()->TabletId << ".", NYql::NDqProto::StatusIds::UNAVAILABLE); } } @@ -735,11 +730,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu try { ShardedWriteController = CreateShardedWriteController( TShardedWriteControllerSettings { - .MemoryLimitTotal = kInFlightMemoryLimitPerActor, - .MemoryLimitPerMessage = kMemoryLimitPerMessage, + .MemoryLimitTotal = MessageSettings.InFlightMemoryLimitPerActorBytes, + .MemoryLimitPerMessage = MessageSettings.MemoryLimitPerMessageBytes, .MaxBatchesPerMessage = (SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable ? 1 - : kMaxBatchesPerMessage), + : MessageSettings.MaxBatchesPerMessage), }, std::move(columnsMetadata), TypeEnv, @@ -775,6 +770,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu TString LogPrefix; const NKikimrKqp::TKqpTableSinkSettings Settings; + TWriteActorSettings MessageSettings; const ui64 OutputIndex; NYql::NDq::TDqAsyncStats EgressStats; NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr; @@ -795,7 +791,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu THashMap LocksInfo; bool Finished = false; - const i64 MemoryLimit = kInFlightMemoryLimitPerActor; + const i64 MemoryLimit; IShardedWriteControllerPtr ShardedWriteController = nullptr; }; diff --git a/ydb/core/kqp/runtime/kqp_write_actor_settings.cpp b/ydb/core/kqp/runtime/kqp_write_actor_settings.cpp new file mode 100644 index 000000000000..5c7478f33bf9 --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_write_actor_settings.cpp @@ -0,0 +1,28 @@ +#include "kqp_write_actor_settings.h" + +#include +#include + + +namespace NKikimr { +namespace NKqp { + +struct TWriteActorDefaultSettings { + THotSwap SettingsPtr; + + TWriteActorDefaultSettings() { + SettingsPtr.AtomicStore(new TWriteActorSettings()); + } + +}; + +TWriteActorSettings GetWriteActorSettings() { + return *Singleton()->SettingsPtr.AtomicLoad(); +} + +void SetWriteActorSettings(TIntrusivePtr ptr) { + Singleton()->SettingsPtr.AtomicStore(ptr); +} + +} +} diff --git a/ydb/core/kqp/runtime/kqp_write_actor_settings.h b/ydb/core/kqp/runtime/kqp_write_actor_settings.h new file mode 100644 index 000000000000..328dcd5120a7 --- /dev/null +++ b/ydb/core/kqp/runtime/kqp_write_actor_settings.h @@ -0,0 +1,28 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr { +namespace NKqp { + +struct TWriteActorSettings : TAtomicRefCount { + i64 InFlightMemoryLimitPerActorBytes = 64_MB; + i64 MemoryLimitPerMessageBytes = 64_MB; + i64 MaxBatchesPerMessage = 1000; + + TDuration StartRetryDelay = TDuration::Seconds(1); + TDuration MaxRetryDelay = TDuration::Seconds(10); + double UnsertaintyRatio = 0.5; + double Multiplier = 2.0; + + ui64 MaxWriteAttempts = 100; + ui64 MaxResolveAttempts = 5; +}; + +TWriteActorSettings GetWriteActorSettings(); +void SetWriteActorSettings(TIntrusivePtr ptr); + +} +} diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 21dc4b1f2734..ee9bffcce2ac 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -18,7 +18,7 @@ namespace NKqp { namespace { constexpr ui64 DataShardMaxOperationBytes = 8_MB; -constexpr ui64 ColumnShardMaxOperationBytes = 8_MB; +constexpr ui64 ColumnShardMaxOperationBytes = 64_MB; constexpr ui64 MaxUnshardedBatchBytes = 0_MB; class IPayloadSerializer : public TThrRefBase { diff --git a/ydb/core/kqp/runtime/ya.make b/ydb/core/kqp/runtime/ya.make index 615ccbc9cbf4..d4192b460ff7 100644 --- a/ydb/core/kqp/runtime/ya.make +++ b/ydb/core/kqp/runtime/ya.make @@ -22,6 +22,7 @@ SRCS( kqp_stream_lookup_worker.h kqp_tasks_runner.cpp kqp_transport.cpp + kqp_write_actor_settings.cpp kqp_write_actor.cpp kqp_write_table.cpp ) diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 4e40cc4da0d8..1fc69f5d0b7e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -851,10 +851,10 @@ class TKqpSessionActor : public TActorBootstrapped { } const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); - HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); - HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - if (HasOlapTable && HasOltpTable && HasTableWrite) { + QueryState->TxCtx->HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); + QueryState->TxCtx->HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + QueryState->TxCtx->HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite && !Settings.TableService.GetEnableHtapTx()) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "Write transactions between column and row tables are disabled at current time."); return false; @@ -1108,7 +1108,7 @@ class TKqpSessionActor : public TActorBootstrapped { txCtx.HasImmediateEffects = true; txCtx.ClearDeferredEffects(); - SendToExecuter(std::move(request)); + SendToExecuter(QueryState->TxCtx.Get(), std::move(request)); } bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) { @@ -1227,7 +1227,7 @@ class TKqpSessionActor : public TActorBootstrapped { } request.TopicOperations = std::move(txCtx.TopicOperations); - } else if (QueryState->ShouldAcquireLocks(tx)) { + } else if (QueryState->ShouldAcquireLocks(tx) && (!txCtx.HasOlapTable || Settings.TableService.GetEnableOlapSink())) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); if (txCtx.HasUncommittedChangesRead || Config->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { @@ -1243,7 +1243,7 @@ class TKqpSessionActor : public TActorBootstrapped { txCtx.Locks.Size(), request.AcquireLocksTxId.Defined()); - SendToExecuter(std::move(request)); + SendToExecuter(QueryState->TxCtx.Get(), std::move(request)); ++QueryState->CurrentTx; return false; } @@ -1278,7 +1278,7 @@ class TKqpSessionActor : public TActorBootstrapped { return results; } - void SendToExecuter(IKqpGateway::TExecPhysicalRequest&& request, bool isRollback = false) { + void SendToExecuter(TKqpTransactionContext* txCtx, IKqpGateway::TExecPhysicalRequest&& request, bool isRollback = false) { if (QueryState) { request.Orbit = std::move(QueryState->Orbit); QueryState->StatementResultSize = GetResultsCount(request); @@ -1290,18 +1290,29 @@ class TKqpSessionActor : public TActorBootstrapped { request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - const bool useEvWrite = ((HasOlapTable && Settings.TableService.GetEnableOlapSink()) || (!HasOlapTable && Settings.TableService.GetEnableOltpSink())) + bool useEvWrite = ( + (txCtx->HasOlapTable // olap only + && !txCtx->HasOltpTable + && Settings.TableService.GetEnableOlapSink()) + || (txCtx->HasOltpTable // oltp only + && !txCtx->HasOlapTable + && Settings.TableService.GetEnableOltpSink()) + || (txCtx->HasOlapTable // htap + && txCtx->HasOltpTable + && Settings.TableService.GetEnableOlapSink() + && Settings.TableService.GetEnableHtapTx())) && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY - || (!HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) - || (!HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); + || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) + || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr(), RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - Settings.TableService.GetEnableOlapSink(), useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); + useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo, + Settings.TableService.GetEnableHtapTx()); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1460,6 +1471,8 @@ class TKqpSessionActor : public TActorBootstrapped { case Ydb::StatusIds::ABORTED: { if (ev->BrokenLockPathId) { issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId)); + } else if (ev->BrokenLockShardId) { + issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx->ShardIdToTableInfo, *ev->BrokenLockShardId)); } break; } @@ -2043,7 +2056,7 @@ class TKqpSessionActor : public TActorBootstrapped { request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock); } - SendToExecuter(std::move(request), true); + SendToExecuter(txCtx, std::move(request), true); } void ResetTxState() { @@ -2574,10 +2587,6 @@ class TKqpSessionActor : public TActorBootstrapped { TActorId KqpTempTablesAgentActor; std::shared_ptr> CompilationCookie; - bool HasOlapTable = false; - bool HasOltpTable = false; - bool HasTableWrite = false; - TGUCSettings::TPtr GUCSettings; }; diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 94bc489411bd..c875fe47cd87 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -2855,10 +2855,62 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(TableSink_Htap) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + const TString query = R"( + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/DataShard` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); + { + auto prepareResult = client.ExecuteQuery(R"( + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13); + INSERT INTO `/Root/ColumnShard` SELECT * FROM `/Root/DataShard`; + REPLACE INTO `/Root/DataShard` SELECT * FROM `/Root/ColumnShard`; + SELECT * FROM `/Root/ColumnShard`; + SELECT * FROM `/Root/DataShard`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + } + Y_UNIT_TEST(TableSink_BadTransactions) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableHtapTx(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig) .SetWithSampleTables(false); @@ -2973,6 +3025,24 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } + + { + auto session = client.GetSession().GetValueSync().GetSession(); + { + auto insertResult = session.ExecuteQuery(R"( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(insertResult.IsSuccess()); + } + { + auto insertResult = session.ExecuteQuery(R"( + REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(insertResult.IsSuccess()); + } + } } Y_UNIT_TEST(TableSink_ReplaceFromSelectLargeOlap) { diff --git a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp index 9ac7d3fabee7..96172c69d7a0 100644 --- a/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_locks_tricky_ut.cpp @@ -32,6 +32,7 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) { Y_UNIT_TEST_TWIN(TestNoLocksIssue, withSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); auto setting = NKikimrKqp::TKqpSetting(); TKikimrSettings settings; @@ -130,6 +131,7 @@ Y_UNIT_TEST_SUITE(KqpLocksTricky) { Y_UNIT_TEST_TWIN(TestNoLocksIssueInteractiveTx, withSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); auto setting = NKikimrKqp::TKqpSetting(); TKikimrSettings settings; diff --git a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp index f4eb9ee7a9ce..d7497273491f 100644 --- a/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_mvcc_ut.cpp @@ -132,6 +132,7 @@ Y_UNIT_TEST_SUITE(KqpSnapshotRead) { Y_UNIT_TEST_TWIN(ReadOnlyTxWithIndexCommitsOnConcurrentWrite, withSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOltpSink(withSink); + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); TKikimrRunner kikimr( TKikimrSettings() .SetAppConfig(appConfig) diff --git a/ydb/core/kqp/ut/tx/kqp_sink_common.h b/ydb/core/kqp/ut/tx/kqp_sink_common.h index 082f35e3d3f9..2d4a7e48e9cb 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_common.h +++ b/ydb/core/kqp/ut/tx/kqp_sink_common.h @@ -18,12 +18,13 @@ class TTableDataModificationTester { std::unique_ptr Kikimr; YDB_ACCESSOR(bool, IsOlap, false); YDB_ACCESSOR(bool, FastSnapshotExpiration, false); + YDB_ACCESSOR(bool, DisableSinks, false); virtual void DoExecute() = 0; public: void Execute() { - AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); - AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + AppConfig.MutableTableServiceConfig()->SetEnableOlapSink(!DisableSinks); + AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(!DisableSinks); AppConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); auto settings = TKikimrSettings().SetAppConfig(AppConfig).SetWithSampleTables(false); if (FastSnapshotExpiration) { diff --git a/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp index c434317e3a07..ec02a3211882 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp @@ -43,12 +43,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); result = session2.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; @@ -92,13 +90,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { auto commitResult = tx1->Commit().GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString()); commitResult.GetIssues().PrintTo(Cerr); - UNIT_ASSERT_C(commitResult.GetIssues().Size() != 0, commitResult.GetIssues().ToString()); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), commitResult.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), commitResult.GetIssues().ToString()); result = session2.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; @@ -182,12 +177,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); result = session1.ExecuteQuery(Q1_(R"( SELECT * FROM Test WHERE Group = 11; @@ -237,12 +230,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); result = session1.ExecuteQuery(Q1_(R"( SELECT * FROM Test WHERE Group = 11; diff --git a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp index d748bc4bc304..e50a9652b303 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp @@ -208,9 +208,7 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) { )"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); } }; @@ -270,6 +268,101 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) { tester.SetIsOlap(false); tester.Execute(); } + + Y_UNIT_TEST(OlapReadWriteTxFailsOnConcurrentWrite3) { + TReadWriteTxFailsOnConcurrentWrite3 tester; + tester.SetIsOlap(true); + tester.Execute(); + } + + class TNamedStatement : public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + + { + auto result = session1.ExecuteQuery(Q_(R"( + $data = SELECT * FROM `/Root/KV`; + DELETE FROM `/Root/KV` WHERE 1=1; + SELECT COUNT(*) FROM `/Root/KV`; + SELECT COUNT(*) FROM $data; + DELETE FROM `/Root/KV` ON SELECT 424242u AS Key, "One" As Value; + UPSERT INTO `/Root/KV` (Key, Value) VALUES (424242u, "One"); + SELECT COUNT(*) FROM `/Root/KV`; + SELECT COUNT(*) FROM $data; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[0u]])", FormatResultSetYson(result.GetResultSet(0))); + CompareYson(R"([[0u]])", FormatResultSetYson(result.GetResultSet(1))); + CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(2))); + CompareYson(R"([[1u]])", FormatResultSetYson(result.GetResultSet(3))); + } + } + }; + + Y_UNIT_TEST(OltpNamedStatementNoSink) { + TNamedStatement tester; + tester.SetDisableSinks(true); + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OltpNamedStatement) { + TNamedStatement tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OlapNamedStatement) { + TNamedStatement tester; + tester.SetIsOlap(true); + tester.Execute(); + } + + class TMultiSinks: public TTableDataModificationTester { + protected: + void DoExecute() override { + auto client = Kikimr->GetQueryClient(); + + auto session1 = client.GetSession().GetValueSync().GetSession(); + + { + auto result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "1"); + UPSERT INTO `/Root/KV` (Key, Value) VALUES (1u, "2"); + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + { + auto result = session1.ExecuteQuery(Q_(R"( + SELECT Value FROM `/Root/KV` WHERE Key = 1u; + )"), TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([[["2"]]])", FormatResultSetYson(result.GetResultSet(0))); + } + } + }; + + Y_UNIT_TEST(OltpMultiSinksNoSinks) { + TMultiSinks tester; + tester.SetDisableSinks(true); + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OltpMultiSinks) { + TMultiSinks tester; + tester.SetIsOlap(false); + tester.Execute(); + } + + Y_UNIT_TEST(OlapMultiSinks) { + TMultiSinks tester; + tester.SetIsOlap(true); + tester.Execute(); + } } } // namespace NKqp diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto index 627296a769a1..677318e04837 100644 --- a/ydb/core/protos/data_events.proto +++ b/ydb/core/protos/data_events.proto @@ -37,6 +37,10 @@ message TKqpLocks { // This may only be used with generic readsets without any other data and // currently limited to volatile transactions. optional uint64 ArbiterShard = 5; + + optional uint64 ArbiterColumnShard = 6; + repeated uint64 SendingColumnShards = 7; + repeated uint64 ReceivingColumnShards = 8; } message TTableId { diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index 2906289160dc..afa136bbf5f1 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -285,7 +285,7 @@ message TTableServiceConfig { optional bool EnableOlapSink = 55 [default = false]; optional bool EnablePerStatementQueryExecution = 56 [default = false]; - optional bool EnableCreateTableAs = 57 [default = true]; + optional bool EnableCreateTableAs = 57 [default = false]; optional uint64 IdxLookupJoinPointsLimit = 58 [default = 3]; optional bool OldLookupJoinBehaviour = 59 [default = false]; @@ -321,4 +321,22 @@ message TTableServiceConfig { optional TComputeSchedulerSettings ComputeSchedulerSettings = 70; optional bool EnableRowsDuplicationCheck = 69 [ default = false ]; + + optional bool EnableHtapTx = 71 [default = false]; + + message TWriteActorSettings { + optional uint64 InFlightMemoryLimitPerActorBytes = 1 [ default = 67108864 ]; + optional uint64 MemoryLimitPerMessageBytes = 2 [ default = 67108864 ]; + optional uint64 MaxBatchesPerMessage = 3 [ default = 1000 ]; + + optional uint64 StartRetryDelayMs = 4 [ default = 1000 ]; + optional uint64 MaxRetryDelayMs = 5 [ default = 10000 ]; + optional double UnsertaintyRatio = 6 [ default = 0.5 ]; + optional double Multiplier = 7 [ default = 2.0 ]; + + optional uint64 MaxWriteAttempts = 8 [ default = 100 ]; + optional uint64 MaxResolveAttempts = 9 [ default = 5 ]; + } + + optional TWriteActorSettings WriteActorSettings = 72; };