From 7af1c5befe3a69723ad57f7c1c36e3682cb66a85 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Sat, 7 Sep 2024 13:42:24 +0300 Subject: [PATCH 01/12] fix --- ydb/core/kqp/session_actor/kqp_query_state.h | 4 ++++ .../kqp/session_actor/kqp_session_actor.cpp | 18 +++++++----------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 0a614cf8c7e0..cd1c79e36c11 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -164,6 +164,10 @@ class TKqpQueryState : public TNonCopyable { ui32 StatementResultIndex = 0; ui32 StatementResultSize = 0; + bool HasOlapTable = false; + bool HasOltpTable = false; + bool HasTableWrite = false; + TMaybe CommandTagName; NKikimrKqp::EQueryAction GetAction() const { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 61e61b8cbca3..98bd899ad17f 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -854,10 +854,10 @@ class TKqpSessionActor : public TActorBootstrapped { } const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); - HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); - HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - if (HasOlapTable && HasOltpTable && HasTableWrite) { + QueryState->HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); + QueryState->HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + QueryState->HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + if (QueryState->HasOlapTable && QueryState->HasOltpTable && QueryState->HasTableWrite) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "Write transactions between column and row tables are disabled at current time."); return false; @@ -1292,12 +1292,12 @@ class TKqpSessionActor : public TActorBootstrapped { request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - const bool useEvWrite = ((HasOlapTable && Settings.TableService.GetEnableOlapSink()) || (!HasOlapTable && Settings.TableService.GetEnableOltpSink())) + const bool useEvWrite = ((QueryState->HasOlapTable && Settings.TableService.GetEnableOlapSink()) || (!QueryState->HasOlapTable && Settings.TableService.GetEnableOltpSink())) && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY - || (!HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) - || (!HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); + || (!QueryState->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) + || (!QueryState->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr(), RequestCounters, Settings.TableService, @@ -2593,10 +2593,6 @@ class TKqpSessionActor : public TActorBootstrapped { TActorId KqpTempTablesAgentActor; std::shared_ptr> CompilationCookie; - bool HasOlapTable = false; - bool HasOltpTable = false; - bool HasTableWrite = false; - TGUCSettings::TPtr GUCSettings; }; From 5099a2d1bab3deb750e867935f5b1a210cecee1e Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Sat, 7 Sep 2024 14:13:33 +0300 Subject: [PATCH 02/12] test --- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 38d623f84ae6..8f56f0c37bba 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3155,6 +3155,24 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { insertResult.GetIssues().ToString().Contains("Write transactions between column and row tables are disabled at current time"), insertResult.GetIssues().ToString()); } + + { + auto session = client.GetSession().GetValueSync().GetSession(); + { + auto insertResult = session.ExecuteQuery(R"( + REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(insertResult.IsSuccess()); + } + { + auto insertResult = session.ExecuteQuery(R"( + REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + UNIT_ASSERT(insertResult.IsSuccess()); + } + } } Y_UNIT_TEST(TableSink_ReplaceFromSelectLargeOlap) { From d8afaaa7a626bd248d48bf3cb43aba5e79646714 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Sat, 7 Sep 2024 19:32:01 +0300 Subject: [PATCH 03/12] fix --- .github/config/muted_ya.txt | 1 + .../kqp/compile_service/kqp_compile_actor.cpp | 1 + .../compile_service/kqp_compile_service.cpp | 2 + .../kqp/executer_actor/kqp_data_executer.cpp | 13 ++--- ydb/core/kqp/executer_actor/kqp_executer.h | 2 +- ydb/core/kqp/provider/yql_kikimr_settings.h | 1 + .../kqp/session_actor/kqp_session_actor.cpp | 17 ++++-- ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp | 52 +++++++++++++++++++ ydb/core/protos/table_service_config.proto | 2 + 9 files changed, 78 insertions(+), 13 deletions(-) diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index a9f79d684ee7..382072811f52 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -21,6 +21,7 @@ ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns ydb/core/kqp/ut/scheme KqpScheme.AlterAsyncReplication ydb/core/kqp/ut/scheme KqpScheme.QueryWithAlter ydb/core/kqp/ut/service [*/*]* +ydb/core/kqp/ut/service KqpQueryService.TableSink_Htap ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index c3cee8822f21..70dc9981bdad 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -601,6 +601,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.EnableCreateTableAs = serviceConfig.GetEnableCreateTableAs(); kqpConfig.EnableOlapSink = serviceConfig.GetEnableOlapSink(); kqpConfig.EnableOltpSink = serviceConfig.GetEnableOltpSink(); + kqpConfig.EnableHtapTx = serviceConfig.GetEnableHtapTx(); kqpConfig.BlockChannelsMode = serviceConfig.GetBlockChannelsMode(); kqpConfig.IdxLookupJoinsPrefixPointLimit = serviceConfig.GetIdxLookupJoinPointsLimit(); kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling(); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index a78c153a7d53..a875e0e85f54 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -523,6 +523,7 @@ class TKqpCompileService : public TActorBootstrapped { bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault(); bool enableOlapSink = TableServiceConfig.GetEnableOlapSink(); bool enableOltpSink = TableServiceConfig.GetEnableOltpSink(); + bool enableHtapTx = TableServiceConfig.GetEnableHtapTx(); bool enableCreateTableAs = TableServiceConfig.GetEnableCreateTableAs(); auto blockChannelsMode = TableServiceConfig.GetBlockChannelsMode(); @@ -556,6 +557,7 @@ class TKqpCompileService : public TActorBootstrapped { TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault || TableServiceConfig.GetEnableOlapSink() != enableOlapSink || TableServiceConfig.GetEnableOltpSink() != enableOltpSink || + TableServiceConfig.GetEnableHtapTx() != enableHtapTx || TableServiceConfig.GetEnableCreateTableAs() != enableCreateTableAs || TableServiceConfig.GetBlockChannelsMode() != blockChannelsMode || TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit || diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 96d79c3ab4b3..f69849c75a92 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -128,12 +128,11 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, + const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) - , EnableOlapSink(enableOlapSink) , UseEvWrite(useEvWrite) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) @@ -1895,9 +1894,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseGetTables()); - if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) - || (!EnableOlapSink && hasOlapSink)) { + if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))) { auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables."; LOG_E(error); ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED, @@ -2791,7 +2788,6 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; @@ -2836,13 +2832,12 @@ class TKqpDataExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator, - const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, + const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext, - enableOlapSink, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); + useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index bec2aa2c593c..ace6c985023f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -102,7 +102,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, + const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); IActor* CreateKqpSchemeExecuter( diff --git a/ydb/core/kqp/provider/yql_kikimr_settings.h b/ydb/core/kqp/provider/yql_kikimr_settings.h index 0eaebbcb63df..d4d851c0fd61 100644 --- a/ydb/core/kqp/provider/yql_kikimr_settings.h +++ b/ydb/core/kqp/provider/yql_kikimr_settings.h @@ -167,6 +167,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi ui64 IdxLookupJoinsPrefixPointLimit = 1; bool EnableOlapSink = false; bool EnableOltpSink = false; + bool EnableHtapTx = false; NKikimrConfig::TTableServiceConfig_EBlockChannelsMode BlockChannelsMode; bool EnableSpillingGenericQuery = false; ui32 DefaultCostBasedOptimizationLevel = 4; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 98bd899ad17f..142bde89ed3e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -857,7 +857,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryState->HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); QueryState->HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); QueryState->HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - if (QueryState->HasOlapTable && QueryState->HasOltpTable && QueryState->HasTableWrite) { + if (QueryState->HasOlapTable && QueryState->HasOltpTable && QueryState->HasTableWrite && !Settings.TableService.GetEnableHtapTx()) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "Write transactions between column and row tables are disabled at current time."); return false; @@ -1292,7 +1292,18 @@ class TKqpSessionActor : public TActorBootstrapped { request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - const bool useEvWrite = ((QueryState->HasOlapTable && Settings.TableService.GetEnableOlapSink()) || (!QueryState->HasOlapTable && Settings.TableService.GetEnableOltpSink())) + const bool useEvWrite = ( + (QueryState->HasOlapTable // olap only + && !QueryState->HasOltpTable + && Settings.TableService.GetEnableOlapSink()) + || (QueryState->HasOltpTable // oltp only + && !QueryState->HasOlapTable + && Settings.TableService.GetEnableOltpSink()) + || (QueryState->HasOlapTable // htap + && QueryState->HasOltpTable + && Settings.TableService.GetEnableOlapSink() + && Settings.TableService.GetEnableOltpSink() + && Settings.TableService.GetEnableHtapTx())) && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY @@ -1303,7 +1314,7 @@ class TKqpSessionActor : public TActorBootstrapped { RequestCounters, Settings.TableService, AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - Settings.TableService.GetEnableOlapSink(), useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); + useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 8f56f0c37bba..0af07b0f5a2c 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -3037,10 +3037,62 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(TableSink_Htap) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableHtapTx(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); + + const TString query = R"( + CREATE TABLE `/Root/ColumnShard` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10); + + CREATE TABLE `/Root/DataShard` ( + Col1 Uint64 NOT NULL, + Col2 String, + Col3 Int32 NOT NULL, + PRIMARY KEY (Col1) + ) + WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + )"; + + auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + + auto client = kikimr.GetQueryClient(); + { + auto prepareResult = client.ExecuteQuery(R"( + UPSERT INTO `/Root/ColumnShard` (Col1, Col2, Col3) VALUES + (1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13); + UPSERT INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES + (10u, "test1", 10), (20u, "test2", 11), (30u, "test3", 12), (40u, NULL, 13); + INSERT INTO `/Root/ColumnShard` SELECT * FROM `/Root/DataShard`; + REPLACE INTO `/Root/DataShard` SELECT * FROM `/Root/ColumnShard`; + SELECT * FROM `/Root/ColumnShard`; + SELECT * FROM `/Root/DataShard`; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + } + Y_UNIT_TEST(TableSink_BadTransactions) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); + appConfig.MutableTableServiceConfig()->SetEnableHtapTx(false); auto settings = TKikimrSettings() .SetAppConfig(appConfig) .SetWithSampleTables(false); diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto index ef3a61cb74a5..48e8bc9f96b4 100644 --- a/ydb/core/protos/table_service_config.proto +++ b/ydb/core/protos/table_service_config.proto @@ -321,4 +321,6 @@ message TTableServiceConfig { optional TComputeSchedulerSettings ComputeSchedulerSettings = 70; optional bool EnableRowsDuplicationCheck = 69 [ default = false ]; + + optional bool EnableHtapTx = 71 [default = false]; }; From 457e995700462bce0cf25d15d4bc1cbc17b7d03d Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Sat, 7 Sep 2024 20:36:29 +0300 Subject: [PATCH 04/12] fix --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 13 ++++++++----- ydb/core/kqp/executer_actor/kqp_executer.h | 2 +- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 3 +-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index f69849c75a92..01554e0ed2a2 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -127,12 +127,13 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, + const TActorId& creator, const TIntrusivePtr& userRequestContext, const bool forceArbiterCommit, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) + , ForceArbiterCommit(forceArbiterCommit) , UseEvWrite(useEvWrite) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) @@ -2405,9 +2406,10 @@ class TKqpDataExecuter : public TKqpExecuterBase= minArbiterMeshSize && - AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters()) + AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())) { std::vector candidates; candidates.reserve(receivingShardsSet.size()); @@ -2788,6 +2790,7 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; @@ -2832,12 +2835,12 @@ class TKqpDataExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator, - const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, + const TIntrusivePtr& userRequestContext, const bool forceArbiterCommit, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext, - useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); + forceArbiterCommit, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index ace6c985023f..5d87fd248bda 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -102,7 +102,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool useEvWrite, ui32 statementResultIndex, + const bool forceArbiterCommit, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); IActor* CreateKqpSchemeExecuter( diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 142bde89ed3e..bbbf9bccb9f7 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1302,7 +1302,6 @@ class TKqpSessionActor : public TActorBootstrapped { || (QueryState->HasOlapTable // htap && QueryState->HasOltpTable && Settings.TableService.GetEnableOlapSink() - && Settings.TableService.GetEnableOltpSink() && Settings.TableService.GetEnableHtapTx())) && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY @@ -1314,7 +1313,7 @@ class TKqpSessionActor : public TActorBootstrapped { RequestCounters, Settings.TableService, AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); + QueryState->HasOlapTable, useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); From 655364e7abf374b28cb4e4546dda960827920f21 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Sat, 7 Sep 2024 20:54:09 +0300 Subject: [PATCH 05/12] fix --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 51 ++++++++++++++---------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index e95d62c979d1..3d50427549bf 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -391,8 +391,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got UNSPECIFIED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Unspecified error for table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::UNSPECIFIED, getIssues()); return; @@ -411,8 +412,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got ABORTED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Aborted for table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::ABORTED, getIssues()); return; @@ -430,8 +432,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu RetryResolveTable(); } else { RuntimeError( - TStringBuilder() << "Got INTERNAL ERROR for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Internal error for table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::INTERNAL_ERROR, getIssues()); } @@ -445,8 +448,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got DISK_SPACE_EXHAUSTED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Disk space exhausted for table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::PRECONDITION_FAILED, getIssues()); return; @@ -461,8 +465,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu // TODO: support waiting if (!InconsistentTx) { RuntimeError( - TStringBuilder() << "Got OVERLOADED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::OVERLOADED, getIssues()); } @@ -475,8 +480,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got CANCELLED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Cancelled request to table `" + << SchemeEntry->TableId.PathId.ToString() << "`." + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::CANCELLED, getIssues()); return; @@ -488,8 +494,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got BAD REQUEST for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Bad request. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::BAD_REQUEST, getIssues()); return; @@ -505,8 +512,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu RetryResolveTable(); } else { RuntimeError( - TStringBuilder() << "Got SCHEME CHANGED for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Scheme changed. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::SCHEME_ERROR, getIssues()); } @@ -519,8 +527,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); RuntimeError( - TStringBuilder() << "Got LOCKS BROKEN for table `" - << SchemeEntry->TableId.PathId.ToString() << "`.", + TStringBuilder() << "Transaction locks invalidated. Table `" + << SchemeEntry->TableId.PathId.ToString() << "`. " + << getIssues().ToOneLineString(), NYql::NDqProto::StatusIds::ABORTED, getIssues()); return; @@ -545,7 +554,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu for (const auto& lock : ev->Get()->Record.GetTxLocks()) { if (!LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock)) { RuntimeError( - TStringBuilder() << "Got LOCKS BROKEN for table `" + TStringBuilder() << "Transaction locks invalidated. Table `" << SchemeEntry->TableId.PathId.ToString() << "`.", NYql::NDqProto::StatusIds::ABORTED, NYql::TIssues{}); @@ -713,7 +722,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu RetryShard(ev->Get()->TabletId, std::nullopt); } else { RuntimeError( - TStringBuilder() << "Error while delivering message to tablet " << ev->Get()->TabletId, + TStringBuilder() + << "Error writing to table `" << SchemeEntry->TableId.PathId.ToString() << "`" + << ": can't deliver message to tablet " << ev->Get()->TabletId << ".", NYql::NDqProto::StatusIds::UNAVAILABLE); } } From f8caeeaea087c62ed0eda81ab46c86119694a294 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 9 Sep 2024 12:26:23 +0300 Subject: [PATCH 06/12] prepare --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 8 +++----- ydb/core/kqp/executer_actor/kqp_executer.h | 2 +- ydb/core/kqp/session_actor/kqp_session_actor.cpp | 2 +- ydb/core/protos/data_events.proto | 6 +++++- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 01554e0ed2a2..46655c79fae5 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2406,8 +2406,7 @@ class TKqpDataExecuter : public TKqpExecuterBase= minArbiterMeshSize && AppData()->FeatureFlags.GetEnableVolatileTransactionArbiters())) { @@ -2790,7 +2789,6 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; @@ -2835,12 +2833,12 @@ class TKqpDataExecuter : public TKqpExecuterBase& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator, - const TIntrusivePtr& userRequestContext, const bool forceArbiterCommit, const bool useEvWrite, ui32 statementResultIndex, + const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext, - forceArbiterCommit, useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); + useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 5d87fd248bda..ace6c985023f 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -102,7 +102,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool forceArbiterCommit, const bool useEvWrite, ui32 statementResultIndex, + const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); IActor* CreateKqpSchemeExecuter( diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index bbbf9bccb9f7..899666f846e7 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1313,7 +1313,7 @@ class TKqpSessionActor : public TActorBootstrapped { RequestCounters, Settings.TableService, AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - QueryState->HasOlapTable, useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); + useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto index ab04523e4081..33f2122387c3 100644 --- a/ydb/core/protos/data_events.proto +++ b/ydb/core/protos/data_events.proto @@ -37,6 +37,10 @@ message TKqpLocks { // This may only be used with generic readsets without any other data and // currently limited to volatile transactions. optional uint64 ArbiterShard = 5; + + optional uint64 ArbiterColumnShard = 6; + repeated uint64 SendingColumnShards = 7; + repeated uint64 ReceivingColumnShards = 8; } message TTableId { @@ -105,7 +109,7 @@ message TEvWrite { optional uint32 LockNodeId = 5; optional TKqpLocks Locks = 6; - // Other params + // Other paramss optional uint64 OverloadSubscribe = 7; // Writes are performed "over" the specified snapshot when specified From 726b6bdbba267b55fd362e20403a70f9cfdabaf2 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 9 Sep 2024 15:24:55 +0300 Subject: [PATCH 07/12] fix --- ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 3 +-- ydb/core/kqp/executer_actor/kqp_executer.h | 7 +++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 46655c79fae5..b2ff861827d9 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -127,13 +127,12 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, const bool forceArbiterCommit, + const TActorId& creator, const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) - , ForceArbiterCommit(forceArbiterCommit) , UseEvWrite(useEvWrite) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index ace6c985023f..53893c585f41 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -13,6 +13,12 @@ namespace NKikimr { namespace NKqp { +struct TTableInfo { + bool IsOlap = false; + TString Path; +}; +using TShardIdToTableInfoPtr = std::shared_ptr>>; + struct TEvKqpExecuter { struct TEvTxRequest : public TEventPB {}; @@ -27,6 +33,7 @@ struct TEvKqpExecuter { NLWTrace::TOrbit Orbit; IKqpGateway::TKqpSnapshot Snapshot; std::optional BrokenLockPathId; + TShardIdToTableInfoPtr ShardIdToTableInfo; ui64 ResultRowsCount = 0; ui64 ResultRowsBytes = 0; From 5e4eea8110d83646167e3821771f95cc11f7069e Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 9 Sep 2024 15:27:43 +0300 Subject: [PATCH 08/12] fix --- ydb/core/protos/data_events.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto index 33f2122387c3..8dae8b034510 100644 --- a/ydb/core/protos/data_events.proto +++ b/ydb/core/protos/data_events.proto @@ -109,7 +109,7 @@ message TEvWrite { optional uint32 LockNodeId = 5; optional TKqpLocks Locks = 6; - // Other paramss + // Other params optional uint64 OverloadSubscribe = 7; // Writes are performed "over" the specified snapshot when specified From 1294a58f2eb78642618e3e8e91e5888c19e0f901 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 9 Sep 2024 18:35:50 +0300 Subject: [PATCH 09/12] fix --- ydb/core/kqp/common/kqp_tx.h | 8 ++++ .../kqp/executer_actor/kqp_data_executer.cpp | 44 +++++++++++++------ ydb/core/kqp/executer_actor/kqp_executer.h | 11 ++--- .../kqp/executer_actor/kqp_executer_impl.cpp | 17 +++---- .../kqp/executer_actor/kqp_executer_impl.h | 16 +++++-- ydb/core/kqp/session_actor/kqp_query_state.h | 2 + .../kqp/session_actor/kqp_session_actor.cpp | 2 +- 7 files changed, 66 insertions(+), 34 deletions(-) diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 290838d60f82..6969dd87fd55 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -317,6 +317,14 @@ struct TTxId { } }; +struct TTableInfo { + bool IsOlap = false; + TString Path; +}; + +using TShardIdToTableInfo = THashMap; +using TShardIdToTableInfoPtr = std::shared_ptr; + } template<> diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index b2ff861827d9..5622455c3856 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -129,13 +129,14 @@ class TKqpDataExecuter : public TKqpExecuterBase& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, const std::optional& federatedQuerySetup, - const TGUCSettings::TPtr& GUCSettings) + const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo) : TBase(std::move(request), database, userToken, counters, tableServiceConfig, userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult) , AsyncIoFactory(std::move(asyncIoFactory)) , UseEvWrite(useEvWrite) , FederatedQuerySetup(federatedQuerySetup) , GUCSettings(GUCSettings) + , ShardIdToTableInfo(shardIdToTableInfo) { Target = creator; @@ -215,34 +216,46 @@ class TKqpDataExecuter : public TKqpExecuterBaseRecord.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS); Counters->TxProxyMon->ReportStatusOK->Inc(); - auto addLocks = [this](const auto& data) { + auto addLocks = [this](const ui64 taskId, const auto& data) { if (data.GetData().template Is()) { NKikimrTxDataShard::TEvKqpInputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); for (auto& lock : info.GetLocks()) { Locks.push_back(lock); + + const auto& task = TasksGraph.GetTask(taskId); + const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); + auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()]; + info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap); + info.Path = stageInfo.Meta.TablePath; } } else if (data.GetData().template Is()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; YQL_ENSURE(data.GetData().UnpackTo(&info), "Failed to unpack settings"); for (auto& lock : info.GetLocks()) { Locks.push_back(lock); + + const auto& task = TasksGraph.GetTask(taskId); + const auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); + auto& info = (*ShardIdToTableInfo)[lock.GetDataShard()]; + info.IsOlap = (stageInfo.Meta.TableKind == ETableKind::Olap); + info.Path = stageInfo.Meta.TablePath; } } }; - for (auto& [_, data] : ExtraData) { - for (const auto& source : data.GetSourcesExtraData()) { - addLocks(source); + for (auto& [_, extraData] : ExtraData) { + for (const auto& source : extraData.Data.GetSourcesExtraData()) { + addLocks(extraData.TaskId, source); } - for (const auto& transform : data.GetInputTransformsData()) { - addLocks(transform); + for (const auto& transform : extraData.Data.GetInputTransformsData()) { + addLocks(extraData.TaskId, transform); } - for (const auto& sink : data.GetSinksExtraData()) { - addLocks(sink); + for (const auto& sink : extraData.Data.GetSinksExtraData()) { + addLocks(extraData.TaskId, sink); } - if (data.HasComputeExtraData()) { - addLocks(data.GetComputeExtraData()); + if (extraData.Data.HasComputeExtraData()) { + addLocks(extraData.TaskId, extraData.Data.GetComputeExtraData()); } } @@ -1963,6 +1976,10 @@ class TKqpDataExecuter : public TKqpExecuterBase FederatedQuerySetup; const TGUCSettings::TPtr GUCSettings; + TShardIdToTableInfoPtr ShardIdToTableInfo; bool HasExternalSources = false; bool SecretSnapshotRequired = false; @@ -2833,11 +2851,11 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator, const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, tableServiceConfig, std::move(asyncIoFactory), creator, userRequestContext, - useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings); + useEvWrite, statementResultIndex, federatedQuerySetup, GUCSettings, shardIdToTableInfo); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 53893c585f41..77fe65116bb6 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -13,12 +14,6 @@ namespace NKikimr { namespace NKqp { -struct TTableInfo { - bool IsOlap = false; - TString Path; -}; -using TShardIdToTableInfoPtr = std::shared_ptr>>; - struct TEvKqpExecuter { struct TEvTxRequest : public TEventPB {}; @@ -33,7 +28,7 @@ struct TEvKqpExecuter { NLWTrace::TOrbit Orbit; IKqpGateway::TKqpSnapshot Snapshot; std::optional BrokenLockPathId; - TShardIdToTableInfoPtr ShardIdToTableInfo; + //TShardIdToTableInfoPtr ShardIdToTableInfo; ui64 ResultRowsCount = 0; ui64 ResultRowsBytes = 0; @@ -110,7 +105,7 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator, const TIntrusivePtr& userRequestContext, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpSchemeExecuter( TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index c7b4c26adb5e..efc8467cc71b 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -81,16 +81,17 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings) + const bool useEvWrite, ui32 statementResultIndex, + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + const TShardIdToTableInfoPtr& shardIdToTableInfo) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, tableServiceConfig, std::move(asyncIoFactory), creator, - userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr + userRequestContext, useEvWrite, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo ); } @@ -112,8 +113,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter( std::move(request), database, userToken, counters, false, tableServiceConfig, std::move(asyncIoFactory), creator, - userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, - federatedQuerySetup, /*GUCSettings*/nullptr + userRequestContext, useEvWrite, statementResultIndex, + federatedQuerySetup, /*GUCSettings*/nullptr, shardIdToTableInfo ); case NKqpProto::TKqpPhyTx::TYPE_SCAN: @@ -127,8 +128,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt return CreateKqpDataExecuter( std::move(request), database, userToken, counters, true, tableServiceConfig, std::move(asyncIoFactory), creator, - userRequestContext, enableOlapSink, useEvWrite, statementResultIndex, - federatedQuerySetup, GUCSettings + userRequestContext, useEvWrite, statementResultIndex, + federatedQuerySetup, GUCSettings, shardIdToTableInfo ); default: diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 9edf42cfc24b..e2023a3414c4 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -417,7 +417,9 @@ class TKqpExecuterBase : public TActorBootstrapped { case NYql::NDqProto::COMPUTE_STATE_FINISHED: // Don't finalize stats twice. if (Planner->CompletedCA(taskId, computeActor)) { - ExtraData[computeActor].Swap(state.MutableExtraData()); + auto& extraData = ExtraData[computeActor]; + extraData.TaskId = taskId; + extraData.Data.Swap(state.MutableExtraData()); Stats->AddComputeActorStats( computeActor.NodeId(), @@ -1977,7 +1979,12 @@ class TKqpExecuterBase : public TActorBootstrapped { TActorId KqpTableResolverId; TActorId KqpShardsResolverId; - THashMap ExtraData; + + struct TExtraData { + ui64 TaskId; + NYql::NDqProto::TComputeActorExtraData Data; + }; + THashMap ExtraData; TInstant StartResolveTime; TInstant LastResourceUsageUpdate; @@ -2025,8 +2032,9 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const NKikimrConfig::TTableServiceConfig& tableServiceConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const TActorId& creator, const TIntrusivePtr& userRequestContext, - const bool enableOlapSink, const bool useEvWrite, ui32 statementResultIndex, - const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings); + const bool useEvWrite, ui32 statementResultIndex, + const std::optional& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings, + const TShardIdToTableInfoPtr& shardIdToTableInfo); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr& userToken, TKqpRequestCounters::TPtr counters, diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index cd1c79e36c11..498cf1f55e59 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -168,6 +168,8 @@ class TKqpQueryState : public TNonCopyable { bool HasOltpTable = false; bool HasTableWrite = false; + TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared(); + TMaybe CommandTagName; NKikimrKqp::EQueryAction GetAction() const { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 899666f846e7..599bc4139f2d 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1313,7 +1313,7 @@ class TKqpSessionActor : public TActorBootstrapped { RequestCounters, Settings.TableService, AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings); + useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, QueryState->ShardIdToTableInfo); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); From d89aa387bf9d21e42cdac541f866f91b1f64518c Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Mon, 9 Sep 2024 22:47:47 +0300 Subject: [PATCH 10/12] shard --- ydb/core/kqp/common/kqp_tx.cpp | 29 ++++++++++++++++--- ydb/core/kqp/common/kqp_tx.h | 3 +- .../kqp/executer_actor/kqp_data_executer.cpp | 16 +++++----- ydb/core/kqp/executer_actor/kqp_executer.h | 3 +- .../kqp/session_actor/kqp_session_actor.cpp | 2 ++ 5 files changed, 40 insertions(+), 13 deletions(-) diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 67baf1a92e91..90ca0f90763b 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -14,17 +14,17 @@ NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const if (pathId.OwnerId() != 0) { auto table = txCtx.TableByIdMap.FindPtr(pathId); if (!table) { - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); } - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << *table << "`"); } else { // Olap tables don't return SchemeShard in locks, thus we use tableId here. for (const auto& [pathId, table] : txCtx.TableByIdMap) { if (pathId.TableId() == pathId.TableId()) { - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << "`" << table << "`"); } } - return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table."); } } @@ -36,6 +36,27 @@ TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpT invalidatedLock.GetPathId())); } +NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId) { + TStringBuilder message; + message << "Transaction locks invalidated."; + + if (auto it = shardIdToTableInfo.find(shardId); it != std::end(shardIdToTableInfo)) { + message << " Tables: "; + bool first = true; + for (const auto& path : it->second.Pathes) { + if (!first) { + message << ", "; + first = false; + } + message << "`" << path << "`"; + } + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message); + } else { + message << " Unknown table."; + } + return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message); +} + std::pair> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx) { diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 6969dd87fd55..6a20b25e71b2 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -319,7 +319,7 @@ struct TTxId { struct TTableInfo { bool IsOlap = false; - TString Path; + THashSet Pathes; }; using TShardIdToTableInfo = THashMap; @@ -441,6 +441,7 @@ class TTransactionsCache { }; NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId); +NYql::TIssue GetLocksInvalidatedIssue(const TShardIdToTableInfo& shardIdToTableInfo, const ui64& shardId); std::pair> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx); diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 5622455c3856..b2d5ac107df5 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -208,9 +208,8 @@ class TKqpDataExecuter : public TKqpExecuterBaseBrokenLockShardId); + return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); } ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS); @@ -227,7 +226,7 @@ class TKqpDataExecuter : public TKqpExecuterBase()) { NKikimrKqp::TEvKqpOutputActorResultInfo info; @@ -239,7 +238,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseState = TShardState::EState::Finished; Counters->TxProxyMon->TxResultAborted->Inc(); LocksBroken = true; + ResponseEv->BrokenLockShardId = shardId; + if (!res->Record.GetTxLocks().empty()) { ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( res->Record.GetTxLocks(0).GetSchemeShard(), @@ -1268,9 +1269,10 @@ class TKqpDataExecuter : public TKqpExecuterBaseTxProxyMon->TxResultAborted->Inc(); // TODO: dedicated counter? LocksBroken = true; + ResponseEv->BrokenLockShardId = shardId; // todo: without responseEv if (!res->Record.GetTxLocks().empty()) { - ResponseEv->BrokenLockPathId = TKikimrPathId( + ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( res->Record.GetTxLocks(0).GetSchemeShard(), res->Record.GetTxLocks(0).GetPathId()); return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {}); @@ -1979,7 +1981,7 @@ class TKqpDataExecuter : public TKqpExecuterBase BrokenLockPathId; - //TShardIdToTableInfoPtr ShardIdToTableInfo; + std::optional BrokenLockShardId; + ui64 ResultRowsCount = 0; ui64 ResultRowsBytes = 0; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 599bc4139f2d..13d7f0cdae1f 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1472,6 +1472,8 @@ class TKqpSessionActor : public TActorBootstrapped { case Ydb::StatusIds::ABORTED: { if (ev->BrokenLockPathId) { issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId)); + } else if (ev->BrokenLockShardId) { + issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->ShardIdToTableInfo, *ev->BrokenLockShardId)); } break; } From b34288f071982e3b812657f442c979eda0cf4e8f Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 10 Sep 2024 12:53:03 +0300 Subject: [PATCH 11/12] fix --- ydb/core/kqp/common/kqp_tx.h | 22 +++++++---- ydb/core/kqp/session_actor/kqp_query_state.h | 6 --- .../kqp/session_actor/kqp_session_actor.cpp | 38 +++++++++---------- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 6a20b25e71b2..d5b2d7173ba7 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -121,6 +121,14 @@ struct TDeferredEffects { friend class TKqpTransactionContext; }; +struct TTableInfo { + bool IsOlap = false; + THashSet Pathes; +}; + +using TShardIdToTableInfo = THashMap; +using TShardIdToTableInfoPtr = std::shared_ptr; + class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { public: explicit TKqpTransactionContext(bool implicit, const NMiniKQL::IFunctionRegistry* funcRegistry, @@ -285,6 +293,12 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { TTxAllocatorState::TPtr TxAlloc; IKqpGateway::TKqpSnapshotHandle SnapshotHandle; + + bool HasOlapTable = false; + bool HasOltpTable = false; + bool HasTableWrite = false; + + TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared(); }; struct TTxId { @@ -317,14 +331,6 @@ struct TTxId { } }; -struct TTableInfo { - bool IsOlap = false; - THashSet Pathes; -}; - -using TShardIdToTableInfo = THashMap; -using TShardIdToTableInfoPtr = std::shared_ptr; - } template<> diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 498cf1f55e59..0a614cf8c7e0 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -164,12 +164,6 @@ class TKqpQueryState : public TNonCopyable { ui32 StatementResultIndex = 0; ui32 StatementResultSize = 0; - bool HasOlapTable = false; - bool HasOltpTable = false; - bool HasTableWrite = false; - - TShardIdToTableInfoPtr ShardIdToTableInfo = std::make_shared(); - TMaybe CommandTagName; NKikimrKqp::EQueryAction GetAction() const { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 13d7f0cdae1f..9b55c0ebb128 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -854,10 +854,10 @@ class TKqpSessionActor : public TActorBootstrapped { } const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); - QueryState->HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); - QueryState->HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - QueryState->HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); - if (QueryState->HasOlapTable && QueryState->HasOltpTable && QueryState->HasTableWrite && !Settings.TableService.GetEnableHtapTx()) { + QueryState->TxCtx->HasOlapTable |= ::NKikimr::NKqp::HasOlapTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery); + QueryState->TxCtx->HasOltpTable |= ::NKikimr::NKqp::HasOltpTableReadInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + QueryState->TxCtx->HasTableWrite |= ::NKikimr::NKqp::HasOlapTableWriteInTx(phyQuery) || ::NKikimr::NKqp::HasOltpTableWriteInTx(phyQuery); + if (QueryState->TxCtx->HasOlapTable && QueryState->TxCtx->HasOltpTable && QueryState->TxCtx->HasTableWrite && !Settings.TableService.GetEnableHtapTx()) { ReplyQueryError(Ydb::StatusIds::PRECONDITION_FAILED, "Write transactions between column and row tables are disabled at current time."); return false; @@ -1112,7 +1112,7 @@ class TKqpSessionActor : public TActorBootstrapped { txCtx.HasImmediateEffects = true; txCtx.ClearDeferredEffects(); - SendToExecuter(std::move(request)); + SendToExecuter(QueryState->TxCtx.Get(), std::move(request)); } bool ExecutePhyTx(const TKqpPhyTxHolder::TConstPtr& tx, bool commit) { @@ -1245,7 +1245,7 @@ class TKqpSessionActor : public TActorBootstrapped { txCtx.Locks.Size(), request.AcquireLocksTxId.Defined()); - SendToExecuter(std::move(request)); + SendToExecuter(QueryState->TxCtx.Get(), std::move(request)); ++QueryState->CurrentTx; return false; } @@ -1280,7 +1280,7 @@ class TKqpSessionActor : public TActorBootstrapped { return results; } - void SendToExecuter(IKqpGateway::TExecPhysicalRequest&& request, bool isRollback = false) { + void SendToExecuter(TKqpTransactionContext* txCtx, IKqpGateway::TExecPhysicalRequest&& request, bool isRollback = false) { if (QueryState) { request.Orbit = std::move(QueryState->Orbit); QueryState->StatementResultSize = GetResultsCount(request); @@ -1292,28 +1292,28 @@ class TKqpSessionActor : public TActorBootstrapped { request.ResourceManager_ = ResourceManager_; LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - const bool useEvWrite = ( - (QueryState->HasOlapTable // olap only - && !QueryState->HasOltpTable + bool useEvWrite = ( + (txCtx->HasOlapTable // olap only + && !txCtx->HasOltpTable && Settings.TableService.GetEnableOlapSink()) - || (QueryState->HasOltpTable // oltp only - && !QueryState->HasOlapTable + || (txCtx->HasOltpTable // oltp only + && !txCtx->HasOlapTable && Settings.TableService.GetEnableOltpSink()) - || (QueryState->HasOlapTable // htap - && QueryState->HasOltpTable + || (txCtx->HasOlapTable // htap + && txCtx->HasOltpTable && Settings.TableService.GetEnableOlapSink() && Settings.TableService.GetEnableHtapTx())) && (request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_UNDEFINED || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_QUERY || request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY - || (!QueryState->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) - || (!QueryState->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); + || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_SQL_DML) + || (!txCtx->HasOlapTable && request.QueryType == NKikimrKqp::EQueryType::QUERY_TYPE_PREPARED_DML)); auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr(), RequestCounters, Settings.TableService, AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, SelfId(), QueryState ? QueryState->UserRequestContext : MakeIntrusive("", Settings.Database, SessionId), - useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, QueryState->ShardIdToTableInfo); + useEvWrite, QueryState ? QueryState->StatementResultIndex : 0, FederatedQuerySetup, GUCSettings, txCtx->ShardIdToTableInfo); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -1473,7 +1473,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (ev->BrokenLockPathId) { issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx, *ev->BrokenLockPathId)); } else if (ev->BrokenLockShardId) { - issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->ShardIdToTableInfo, *ev->BrokenLockShardId)); + issues.AddIssue(GetLocksInvalidatedIssue(*QueryState->TxCtx->ShardIdToTableInfo, *ev->BrokenLockShardId)); } break; } @@ -2081,7 +2081,7 @@ class TKqpSessionActor : public TActorBootstrapped { request.DataShardLocks[dsLock.GetDataShard()].emplace_back(dsLock); } - SendToExecuter(std::move(request), true); + SendToExecuter(txCtx, std::move(request), true); } void ResetTxState() { From 657ef377879ae628877a036df728c21bd96cc109 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 10 Sep 2024 15:18:17 +0300 Subject: [PATCH 12/12] locks-errors --- .../kqp/executer_actor/kqp_data_executer.cpp | 1 + ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp | 40 ++++++++----------- ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp | 4 +- 3 files changed, 18 insertions(+), 27 deletions(-) diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index b2d5ac107df5..c8bdcceb7596 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -508,6 +508,7 @@ class TKqpDataExecuter : public TKqpExecuterBaseState == TShardState::EState::Preparing); Counters->TxProxyMon->TxResultAborted->Inc(); LocksBroken = true; + ResponseEv->BrokenLockShardId = shardId; if (!res->Record.GetTxLocks().empty()) { ResponseEv->BrokenLockPathId = NYql::TKikimrPathId( diff --git a/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp index 45428d86c8a7..4d2361114b15 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_locks_ut.cpp @@ -43,12 +43,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); result = session2.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; @@ -98,12 +96,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { auto commitResult = tx1->Commit().GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(commitResult.GetStatus(), EStatus::ABORTED, commitResult.GetIssues().ToString()); commitResult.GetIssues().PrintTo(Cerr); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), commitResult.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(commitResult.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), commitResult.GetIssues().ToString()); result = session2.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; @@ -197,12 +193,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); result = session1.ExecuteQuery(Q1_(R"( SELECT * FROM Test WHERE Group = 11; @@ -258,12 +252,10 @@ Y_UNIT_TEST_SUITE(KqpSinkLocks) { )"), TTxControl::Tx(tx1->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); result.GetIssues().PrintTo(Cerr); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, - [] (const NYql::TIssue& issue) { - return issue.GetMessage().Contains("/Root/Test"); - }), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, + [] (const NYql::TIssue& issue) { + return issue.GetMessage().Contains("/Root/Test"); + }), result.GetIssues().ToString()); result = session1.ExecuteQuery(Q1_(R"( SELECT * FROM Test WHERE Group = 11; diff --git a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp index a0ac82ad151d..159d8154a544 100644 --- a/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_sink_mvcc_ut.cpp @@ -227,9 +227,7 @@ Y_UNIT_TEST_SUITE(KqpSinkMvcc) { )"), TTxControl::Tx(tx->GetId()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); - if (!GetIsOlap()) { - UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); - } + UNIT_ASSERT_C(HasIssue(result.GetIssues(), NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED), result.GetIssues().ToString()); } };