Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KQP] Make TKqpColumnStatisticsRequester async #10224

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions ydb/core/kqp/opt/kqp_column_statistics_requester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
NKikimr::NStat::TRequest req;
req.ColumnTag = columnsMeta[column].Id;
req.PathId = pathId;
getStatisticsRequest->StatRequests.push_back(req);
getStatisticsRequest->StatRequests.push_back(std::move(req));

tableMetaByPathId[pathId].TableName = table;
tableMetaByPathId[pathId].ColumnNameByTag[req.ColumnTag.value()] = column;
Expand All @@ -113,15 +113,14 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:

using TRequest = NStat::TEvStatistics::TEvGetStatistics;
using TResponse = NStat::TEvStatistics::TEvGetStatisticsResult;
struct TResult : public NYql::IKikimrGateway::TGenericResult {
THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
};

auto promise = NewPromise<TResult>();
AsyncReadiness = NewPromise<void>();
auto promise = NewPromise<TColumnStatisticsResponse>();
auto callback = [tableMetaByPathId = std::move(tableMetaByPathId)]
(TPromise<TResult> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
(TPromise<TColumnStatisticsResponse> promise, NStat::TEvStatistics::TEvGetStatisticsResult&& response) mutable {
if (!response.Success) {
promise.SetValue(NYql::NCommon::ResultFromError<TResult>("can't get column statistics!"));
promise.SetValue(NYql::NCommon::ResultFromError<TColumnStatisticsResponse>("can't get column statistics!"));
return;
}

THashMap<TString, TOptimizerStatistics::TColumnStatMap> columnStatisticsByTableName;
Expand All @@ -133,30 +132,39 @@ IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoTransform(TExprNode:
columnStatistics.CountMinSketch = std::move(stat.CountMinSketch.CountMin);
}

promise.SetValue(TResult{.columnStatisticsByTableName = std::move(columnStatisticsByTableName)});
promise.SetValue(TColumnStatisticsResponse{.ColumnStatisticsByTableName = std::move(columnStatisticsByTableName)});
};
auto statServiceId = NStat::MakeStatServiceID(ActorSystem->NodeId);
IActor* requestHandler =
new TActorRequestHandler<TRequest, TResponse, TResult>(statServiceId, getStatisticsRequest.Release(), promise, callback);
auto actorId = ActorSystem
new TActorRequestHandler<TRequest, TResponse, TColumnStatisticsResponse>(statServiceId, getStatisticsRequest.Release(), promise, callback);
ActorSystem
->Register(requestHandler, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
Y_UNUSED(actorId);

auto res = promise.GetFuture().GetValueSync();
if (!res.Issues().Empty()) {
TStringStream ss;
res.Issues().PrintTo(ss);
YQL_CLOG(DEBUG, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
promise.GetFuture().Subscribe([this](auto result){ ColumnStatisticsResponse = result.ExtractValue(); AsyncReadiness.SetValue(); });

return TStatus::Async;
}

IGraphTransformer::TStatus TKqpColumnStatisticsRequester::DoApplyAsyncChanges(TExprNode::TPtr, TExprNode::TPtr&, TExprContext&) {
Y_ENSURE(AsyncReadiness.IsReady() && ColumnStatisticsResponse.has_value());

if (!ColumnStatisticsResponse->Issues().Empty()) {
TStringStream ss; ColumnStatisticsResponse->Issues().PrintTo(ss);
YQL_CLOG(TRACE, ProviderKikimr) << "Can't load columns statistics for request: " << ss.Str();
return IGraphTransformer::TStatus::Ok;
}

for (auto&& [tableName, columnStatistics]: res.columnStatisticsByTableName) {
for (auto&& [tableName, columnStatistics]: ColumnStatisticsResponse->ColumnStatisticsByTableName) {
TypesCtx.ColumnStatisticsByTableName.insert(
{std::move(tableName), new TOptimizerStatistics::TColumnStatMap(std::move(columnStatistics))}
);
}

return IGraphTransformer::TStatus::Ok;
return TStatus::Ok;
}

TFuture<void> TKqpColumnStatisticsRequester::DoGetAsyncFuture(const TExprNode&) {
return AsyncReadiness.GetFuture();
}

bool TKqpColumnStatisticsRequester::BeforeLambdas(const TExprNode::TPtr& input) {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/kqp/opt/kqp_column_statistics_requester.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ using namespace NYql::NNodes;
* Then it requests column statistics for these attributes from the column statistics service
* and stores it into a TTypeAnnotationContext.
*/
class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
class TKqpColumnStatisticsRequester : public TGraphTransformerBase {
public:
TKqpColumnStatisticsRequester(
const TKikimrConfiguration::TPtr& config,
Expand All @@ -37,6 +37,10 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
// Main method of the transformer
IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;

NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final;

IGraphTransformer::TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final;

void Rewind() override {}

~TKqpColumnStatisticsRequester() override = default;
Expand All @@ -56,6 +60,16 @@ class TKqpColumnStatisticsRequester : public TSyncTransformerBase {
THashMap<TExprNode::TPtr, TExprNode::TPtr> KqpTableByExprNode;
THashMap<TString, THashSet<TString>> ColumnsByTableName;

//////////////////////////////////////////////////////////////
/* for waiting response with column statistics */
struct TColumnStatisticsResponse : public NYql::IKikimrGateway::TGenericResult {
THashMap<TString, TOptimizerStatistics::TColumnStatMap> ColumnStatisticsByTableName;
};
std::optional<TColumnStatisticsResponse> ColumnStatisticsResponse;
NThreading::TPromise<void> AsyncReadiness;

//////////////////////////////////////////////////////////////

const TKikimrConfiguration::TPtr& Config;
TTypeAnnotationContext& TypesCtx;
TKikimrTablesData& Tables;
Expand Down
Loading