From fb42bd43e65d12c242df9eeceddc57ed7c7d16cf Mon Sep 17 00:00:00 2001 From: Pavel Ivanov Date: Tue, 8 Oct 2024 16:18:16 +0000 Subject: [PATCH 1/2] [KQP] Make TKqpColumnStatisticsRequester async --- .../opt/kqp_column_statistics_requester.cpp | 46 +++++++++++-------- .../kqp/opt/kqp_column_statistics_requester.h | 18 +++++++- 2 files changed, 45 insertions(+), 19 deletions(-) diff --git a/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp b/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp index 33f46b6f54e8..1f7ff650962e 100644 --- a/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp +++ b/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp @@ -100,7 +100,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: NKikimr::NStat::TRequest req; req.ColumnTag = columnsMeta[column].Id; req.PathId = pathId; - getStatisticsRequest->StatRequests.push_back(req); + getStatisticsRequest->StatRequests.push_back(std::move(req)); tableMetaByPathId[pathId].TableName = table; tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column; @@ -113,15 +113,13 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: using TRequest = NStat::TEvStatistics::TEvGetStatistics; using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult; - struct TResult : public NYql::IKikimrGateway::TGenericResult { - THashMap columnStatisticsByTableName; - }; - auto promise = NewPromise(); + auto promise = NewPromise(); auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)] - (TPromise promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable { + (TPromise promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable { if (!response.Success) { - promise.SetValue(NYql::NCommon::ResultFromError("can't get column statistics!")); + promise.SetValue(NYql::NCommon::ResultFromError("can't get column statistics!")); + return; } THashMap columnStatisticsByTableName; @@ -133,30 +131,42 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin); } - promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)}); + promise.SetValue(TColumnStatisticsResponse{.ColumnStatisticsByTableName = std::move(columnStatisticsByTableName)}); }; auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId); IActor* requestHandler = - new TActorRequestHandler(statServiceId, getStatisticsRequest.Release(), promise, callback); - auto actorId = ActorSystem + new TActorRequestHandler(statServiceId, getStatisticsRequest.Release(), promise, callback); + ActorSystem ->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData()->UserPoolId); - Y_UNUSED(actorId); - auto res = promise.GetFuture().GetValueSync(); - if (!res.Issues().Empty()) { - TStringStream ss; - res.Issues().PrintTo(ss); - YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str(); + FutureWithColumnStatistics = promise.GetFuture(); + FutureWithColumnStatistics.Subscribe([this](auto){ AsyncReadiness.SetValue(); }); + + return TStatus::Async; +} + +IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) { + Y_ENSURE(FutureWithColumnStatistics.IsReady()); + + auto columnStatisticsResponse = FutureWithColumnStatistics.ExtractValue(); + + if (!columnStatisticsResponse.Issues().Empty()) { + TStringStream ss; columnStatisticsResponse.Issues().PrintTo(ss); + YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str(); return IGraphTransformer::TStatus::Ok; } - for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) { + for (auto&& [tableName, columnStatistics]: columnStatisticsResponse.ColumnStatisticsByTableName) { TypesCtx.ColumnStatisticsByTableName.insert( {std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))} ); } - return IGraphTransformer::TStatus::Ok; + return TStatus::Ok; +} + +TFuture TKqpColumnStatisticsRequester::DoGetAsyncFuture(const TExprNode&) { + return AsyncReadiness.GetFuture(); } bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) { diff --git a/ydb/core/kqp/opt/kqp_column_statistics_requester.h b/ydb/core/kqp/opt/kqp_column_statistics_requester.h index 0f55c2ff06a5..10d9930bbce3 100644 --- a/ydb/core/kqp/opt/kqp_column_statistics_requester.h +++ b/ydb/core/kqp/opt/kqp_column_statistics_requester.h @@ -18,7 +18,7 @@ using namespace NYql::NNodes; * Then it requests column statistics for these attributes from the column statistics service * and stores it into a TTypeAnnotationContext. */ -class TKqpColumnStatisticsRequester : public TSyncTransformerBase { +class TKqpColumnStatisticsRequester : public TGraphTransformerBase { public: TKqpColumnStatisticsRequester( const TKikimrConfiguration::TPtr& config, @@ -37,6 +37,10 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase { // Main method of the transformer IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + NThreading::TFuture DoGetAsyncFuture(const TExprNode& input) final; + + IGraphTransformer::TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + void Rewind() override {} ~TKqpColumnStatisticsRequester() override = default; @@ -56,6 +60,18 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase { THashMap KqpTableByExprNode; THashMap> ColumnsByTableName; + ////////////////////////////////////////////////////////////// + /* for waiting response with column statistics */ + struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult { + THashMap ColumnStatisticsByTableName; + }; + + NThreading::TFuture FutureWithColumnStatistics; + // NThreading::TPromise Promise; + NThreading::TPromise AsyncReadiness; + + ////////////////////////////////////////////////////////////// + const TKikimrConfiguration::TPtr& Config; TTypeAnnotationContext& TypesCtx; TKikimrTablesData& Tables; From a2c01b41cc683f11c49fb551567fff5889fb4f18 Mon Sep 17 00:00:00 2001 From: Pavel Ivanov Date: Tue, 8 Oct 2024 19:20:46 +0000 Subject: [PATCH 2/2] fix --- .../kqp/opt/kqp_column_statistics_requester.cpp | 14 ++++++-------- ydb/core/kqp/opt/kqp_column_statistics_requester.h | 4 +--- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp b/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp index 1f7ff650962e..534b3c8dbdd1 100644 --- a/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp +++ b/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp @@ -114,6 +114,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: using TRequest = NStat::TEvStatistics::TEvGetStatistics; using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult; + AsyncReadiness = NewPromise(); auto promise = NewPromise(); auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)] (TPromise promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable { @@ -139,24 +140,21 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: ActorSystem ->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData()->UserPoolId); - FutureWithColumnStatistics = promise.GetFuture(); - FutureWithColumnStatistics.Subscribe([this](auto){ AsyncReadiness.SetValue(); }); + promise.GetFuture().Subscribe([this](auto result){ ColumnStatisticsResponse = result.ExtractValue(); AsyncReadiness.SetValue(); }); return TStatus::Async; } IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) { - Y_ENSURE(FutureWithColumnStatistics.IsReady()); + Y_ENSURE(AsyncReadiness.IsReady() && ColumnStatisticsResponse.has_value()); - auto columnStatisticsResponse = FutureWithColumnStatistics.ExtractValue(); - - if (!columnStatisticsResponse.Issues().Empty()) { - TStringStream ss; columnStatisticsResponse.Issues().PrintTo(ss); + if (!ColumnStatisticsResponse->Issues().Empty()) { + TStringStream ss; ColumnStatisticsResponse->Issues().PrintTo(ss); YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str(); return IGraphTransformer::TStatus::Ok; } - for (auto&& [tableName, columnStatistics]: columnStatisticsResponse.ColumnStatisticsByTableName) { + for (auto&& [tableName, columnStatistics]: ColumnStatisticsResponse->ColumnStatisticsByTableName) { TypesCtx.ColumnStatisticsByTableName.insert( {std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))} ); diff --git a/ydb/core/kqp/opt/kqp_column_statistics_requester.h b/ydb/core/kqp/opt/kqp_column_statistics_requester.h index 10d9930bbce3..5eac50b6a834 100644 --- a/ydb/core/kqp/opt/kqp_column_statistics_requester.h +++ b/ydb/core/kqp/opt/kqp_column_statistics_requester.h @@ -65,9 +65,7 @@ class TKqpColumnStatisticsRequester : public TGraphTransformerBase { struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult { THashMap ColumnStatisticsByTableName; }; - - NThreading::TFuture FutureWithColumnStatistics; - // NThreading::TPromise Promise; + std::optional ColumnStatisticsResponse; NThreading::TPromise AsyncReadiness; //////////////////////////////////////////////////////////////