diff --git a/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp b/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp index 33f46b6f54e8..534b3c8dbdd1 100644 --- a/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp +++ b/ydb/core/kqp/opt/kqp_column_statistics_requester.cpp @@ -100,7 +100,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: NKikimr::NStat::TRequest req; req.ColumnTag = columnsMeta[column].Id; req.PathId = pathId; - getStatisticsRequest->StatRequests.push_back(req); + getStatisticsRequest->StatRequests.push_back(std::move(req)); tableMetaByPathId[pathId].TableName = table; tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column; @@ -113,15 +113,14 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: using TRequest = NStat::TEvStatistics::TEvGetStatistics; using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult; - struct TResult : public NYql::IKikimrGateway::TGenericResult { - THashMap columnStatisticsByTableName; - }; - auto promise = NewPromise(); + AsyncReadiness = NewPromise(); + auto promise = NewPromise(); auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)] - (TPromise promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable { + (TPromise promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable { if (!response.Success) { - promise.SetValue(NYql::NCommon::ResultFromError("can't get column statistics!")); + promise.SetValue(NYql::NCommon::ResultFromError("can't get column statistics!")); + return; } THashMap columnStatisticsByTableName; @@ -133,30 +132,39 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode: columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin); } - promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)}); + promise.SetValue(TColumnStatisticsResponse{.ColumnStatisticsByTableName = std::move(columnStatisticsByTableName)}); }; auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId); IActor* requestHandler = - new TActorRequestHandler(statServiceId, getStatisticsRequest.Release(), promise, callback); - auto actorId = ActorSystem + new TActorRequestHandler(statServiceId, getStatisticsRequest.Release(), promise, callback); + ActorSystem ->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData()->UserPoolId); - Y_UNUSED(actorId); - auto res = promise.GetFuture().GetValueSync(); - if (!res.Issues().Empty()) { - TStringStream ss; - res.Issues().PrintTo(ss); - YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str(); + promise.GetFuture().Subscribe([this](auto result){ ColumnStatisticsResponse = result.ExtractValue(); AsyncReadiness.SetValue(); }); + + return TStatus::Async; +} + +IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) { + Y_ENSURE(AsyncReadiness.IsReady() && ColumnStatisticsResponse.has_value()); + + if (!ColumnStatisticsResponse->Issues().Empty()) { + TStringStream ss; ColumnStatisticsResponse->Issues().PrintTo(ss); + YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str(); return IGraphTransformer::TStatus::Ok; } - for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) { + for (auto&& [tableName, columnStatistics]: ColumnStatisticsResponse->ColumnStatisticsByTableName) { TypesCtx.ColumnStatisticsByTableName.insert( {std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))} ); } - return IGraphTransformer::TStatus::Ok; + return TStatus::Ok; +} + +TFuture TKqpColumnStatisticsRequester::DoGetAsyncFuture(const TExprNode&) { + return AsyncReadiness.GetFuture(); } bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) { diff --git a/ydb/core/kqp/opt/kqp_column_statistics_requester.h b/ydb/core/kqp/opt/kqp_column_statistics_requester.h index 0f55c2ff06a5..5eac50b6a834 100644 --- a/ydb/core/kqp/opt/kqp_column_statistics_requester.h +++ b/ydb/core/kqp/opt/kqp_column_statistics_requester.h @@ -18,7 +18,7 @@ using namespace NYql::NNodes; * Then it requests column statistics for these attributes from the column statistics service * and stores it into a TTypeAnnotationContext. */ -class TKqpColumnStatisticsRequester : public TSyncTransformerBase { +class TKqpColumnStatisticsRequester : public TGraphTransformerBase { public: TKqpColumnStatisticsRequester( const TKikimrConfiguration::TPtr& config, @@ -37,6 +37,10 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase { // Main method of the transformer IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + NThreading::TFuture DoGetAsyncFuture(const TExprNode& input) final; + + IGraphTransformer::TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final; + void Rewind() override {} ~TKqpColumnStatisticsRequester() override = default; @@ -56,6 +60,16 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase { THashMap KqpTableByExprNode; THashMap> ColumnsByTableName; + ////////////////////////////////////////////////////////////// + /* for waiting response with column statistics */ + struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult { + THashMap ColumnStatisticsByTableName; + }; + std::optional ColumnStatisticsResponse; + NThreading::TPromise AsyncReadiness; + + ////////////////////////////////////////////////////////////// + const TKikimrConfiguration::TPtr& Config; TTypeAnnotationContext& TypesCtx; TKikimrTablesData& Tables;