Skip to content

Commit

Permalink
[KQP] Make TKqpColumnStatisticsRequester async (#10224)
Browse files Browse the repository at this point in the history
  • Loading branch information
pashandor789 authored Oct 9, 2024
1 parent 246cf31 commit 36f2bde
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 19 deletions.
44 changes: 26 additions & 18 deletions ydb/core/kqp/opt/kqp_column_statistics_requester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
NKikimr::NStat::TRequest req;
req.ColumnTag = columnsMeta[column].Id;
req.PathId = pathId;
getStatisticsRequest->StatRequests.push_back(req);
getStatisticsRequest->StatRequests.push_back(std::move(req));

tableMetaByPathId[pathId].TableName = table;
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
Expand All @@ -113,15 +113,14 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:

using TRequest = NStat::TEvStatistics::TEvGetStatistics;
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
struct TResult : public NYql::IKikimrGateway::TGenericResult {
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
};

auto promise = NewPromise<TResult>();
AsyncReadiness = NewPromise<void>();
auto promise = NewPromise<TColumnStatisticsResponse>();
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
(TPromise<TColumnStatisticsResponse> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
if (!response.Success) {
promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!"));
promise.SetValue(NYql::NCommon::ResultFromError<TColumnStatisticsResponse>("can't get column statistics!"));
return;
}

THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
Expand All @@ -133,30 +132,39 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
}

promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
promise.SetValue(TColumnStatisticsResponse{.ColumnStatisticsByTableName = std::move(columnStatisticsByTableName)});
};
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
IActor* requestHandler =
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
auto actorId = ActorSystem
new TActorRequestHandler<TRequest, TResponse, TColumnStatisticsResponse>(statServiceId, getStatisticsRequest.Release(), promise, callback);
ActorSystem
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
Y_UNUSED(actorId);

auto res = promise.GetFuture().GetValueSync();
if (!res.Issues().Empty()) {
TStringStream ss;
res.Issues().PrintTo(ss);
YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
promise.GetFuture().Subscribe([this](auto result){ ColumnStatisticsResponse = result.ExtractValue(); AsyncReadiness.SetValue(); });

return TStatus::Async;
}

IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) {
Y_ENSURE(AsyncReadiness.IsReady() && ColumnStatisticsResponse.has_value());

if (!ColumnStatisticsResponse->Issues().Empty()) {
TStringStream ss; ColumnStatisticsResponse->Issues().PrintTo(ss);
YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
return IGraphTransformer::TStatus::Ok;
}

for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) {
for (auto&& [tableName, columnStatistics]: ColumnStatisticsResponse->ColumnStatisticsByTableName) {
TypesCtx.ColumnStatisticsByTableName.insert(
{std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))}
);
}

return IGraphTransformer::TStatus::Ok;
return TStatus::Ok;
}

TFuture<void> TKqpColumnStatisticsRequester::DoGetAsyncFuture(const TExprNode&) {
return AsyncReadiness.GetFuture();
}

bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/kqp/opt/kqp_column_statistics_requester.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ using namespace NYql::NNodes;
* Then it requests column statistics for these attributes from the column statistics service
* and stores it into a TTypeAnnotationContext.
*/
class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
class TKqpColumnStatisticsRequester : public TGraphTransformerBase {
public:
TKqpColumnStatisticsRequester(
const TKikimrConfiguration::TPtr& config,
Expand All @@ -37,6 +37,10 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
// Main method of the transformer
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;

NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final;

IGraphTransformer::TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;

void Rewind() override {}

~TKqpColumnStatisticsRequester() override = default;
Expand All @@ -56,6 +60,16 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
THashMap<TExprNode::TPtr, TExprNode::TPtr> KqpTableByExprNode;
THashMap<TString, THashSet<TString>> ColumnsByTableName;

//////////////////////////////////////////////////////////////
/* for waiting response with column statistics */
struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult {
THashMap<TString, TOptimizerStatistics::TColumnStatMap> ColumnStatisticsByTableName;
};
std::optional<TColumnStatisticsResponse> ColumnStatisticsResponse;
NThreading::TPromise<void> AsyncReadiness;

//////////////////////////////////////////////////////////////

const TKikimrConfiguration::TPtr& Config;
TTypeAnnotationContext& TypesCtx;
TKikimrTablesData& Tables;
Expand Down

0 comments on commit 36f2bde

Please sign in to comment.