From 6f6984ff5f908c79c0c2028cde8400c4953d1909 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 11 Jun 2024 17:00:54 +0000 Subject: [PATCH] Added async queries --- ydb/tests/tools/kqprun/kqprun.cpp | 42 +++++++++--- ydb/tests/tools/kqprun/src/actors.cpp | 17 ++--- ydb/tests/tools/kqprun/src/actors.h | 8 ++- ydb/tests/tools/kqprun/src/common.h | 2 + ydb/tests/tools/kqprun/src/kqp_runner.cpp | 83 +++++++++++++++++++++++ ydb/tests/tools/kqprun/src/kqp_runner.h | 4 ++ ydb/tests/tools/kqprun/src/ydb_setup.cpp | 45 +++++++----- ydb/tests/tools/kqprun/src/ydb_setup.h | 9 +++ 8 files changed, 172 insertions(+), 38 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index b9cb1c9646c6..b48172fa6c42 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -24,7 +24,8 @@ struct TExecutionOptions { enum class EExecutionCase { GenericScript, GenericQuery, - YqlScript + YqlScript, + AsyncQuery }; std::vector ScriptQueries; @@ -40,7 +41,16 @@ struct TExecutionOptions { const TString TraceId = "kqprun_" + CreateGuidAsString(); bool HasResults() const { - return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE; + if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) { + return false; + } + + for (EExecutionCase executionCase : ExecutionCases) { + if (executionCase != EExecutionCase::AsyncQuery) { + return true; + } + } + return false; } EExecutionCase GetExecutionCase(size_t index) const { @@ -71,14 +81,16 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner Sleep(executionOptions.LoopDelay); } - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; - if (numberQueries > 1) { - Cout << " " << id; - } - if (numberLoops != 1) { - Cout << ", loop " << queryId / numberQueries; + if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) { + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; + if (numberQueries > 1) { + Cout << " " << id; + } + if (numberLoops != 1) { + Cout << ", loop " << queryId / numberQueries; + } + Cout << "..." << colors.Default() << Endl; } - Cout << "..." << colors.Default() << Endl; switch (executionOptions.GetExecutionCase(id)) { case TExecutionOptions::EExecutionCase::GenericScript: @@ -108,8 +120,13 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } break; + + case TExecutionOptions::EExecutionCase::AsyncQuery: + runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId); + break; } } + runner.WaitAsyncQueries(); if (executionOptions.HasResults()) { try { @@ -351,7 +368,8 @@ class TMain : public TMainClassArgs { TChoices executionCase({ {"script", TExecutionOptions::EExecutionCase::GenericScript}, {"query", TExecutionOptions::EExecutionCase::GenericQuery}, - {"yql-script", TExecutionOptions::EExecutionCase::YqlScript} + {"yql-script", TExecutionOptions::EExecutionCase::YqlScript}, + {"async", TExecutionOptions::EExecutionCase::AsyncQuery} }); options.AddLongOption('C', "execution-case", "Type of query for -p argument") .RequiredArgument("query-type") @@ -361,6 +379,10 @@ class TMain : public TMainClassArgs { TString choice(option->CurValOrDef()); ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice)); }); + options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") + .RequiredArgument("uint") + .DefaultValue(RunnerOptions.InFlightLimit) + .StoreResult(&RunnerOptions.InFlightLimit); TChoices scriptAction({ {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 7ddf0fc4bc7e..3ead3c724998 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -11,14 +11,12 @@ namespace { class TRunScriptActorMock : public NActors::TActorBootstrapped { public: TRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, TProgressCallback progressCallback) : Request_(std::move(request)) , Promise_(promise) , ResultRowsLimit_(std::numeric_limits::max()) , ResultSizeLimit_(std::numeric_limits::max()) - , ResultSets_(resultSets) , ProgressCallback_(progressCallback) { if (resultRowsLimit) { @@ -79,7 +77,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped Request_; - NThreading::TPromise Promise_; + NThreading::TPromise Promise_; ui64 ResultRowsLimit_; ui64 ResultSizeLimit_; - std::vector& ResultSets_; - std::vector ResultSetSizes_; TProgressCallback ProgressCallback_; + std::vector ResultSets_; + std::vector ResultSetSizes_; }; class TResourcesWaiterActor : public NActors::TActorBootstrapped { @@ -186,10 +184,9 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, TProgressCallback progressCallback) { - return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback); + return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, progressCallback); } NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) { diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index a222f4b3e3b0..f0a8ef3d5b2d 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -4,11 +4,15 @@ namespace NKqpRun { +struct TQueryResponse { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response; + std::vector ResultSets; +}; + using TProgressCallback = std::function; NActors::IActor* CreateRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, TProgressCallback progressCallback); NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index dd315c974e2c..8aab9649936d 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -55,6 +55,8 @@ struct TRunnerOptions { NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default; ETraceOptType TraceOptType = ETraceOptType::Disabled; + ui64 InFlightLimit = 0; + TYdbSetupSettings YdbSettings; }; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index da3ff9c4e353..4c890d38dea3 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -4,6 +4,8 @@ #include #include +#include + #include #include @@ -86,6 +88,34 @@ void PrintStatistics(const TString& fullStat, const THashMap& flat //// TKqpRunner::TImpl class TKqpRunner::TImpl { + struct TAsyncState { + ui64 OnStartRequest() { + InFlight++; + MaxInFlight = std::max(MaxInFlight, InFlight); + return RequestId++; + } + + void OnRequestFinished(bool success) { + InFlight--; + if (success) { + Completed++; + } else { + Failed++; + } + } + + TString GetInfoString() const { + return TStringBuilder() << "completed: " << Completed << ", failed: " << Failed << ", in flight: " << InFlight << ", max in flight: " << MaxInFlight << ", spend time: " << TInstant::Now() - Start; + } + + const TInstant Start = TInstant::Now(); + ui64 RequestId = 1; + ui64 MaxInFlight = 0; + ui64 InFlight = 0; + ui64 Completed = 0; + ui64 Failed = 0; + }; + public: enum class EQueryType { ScriptQuery, @@ -163,6 +193,45 @@ class TKqpRunner::TImpl { return true; } + void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { + TGuard lock(Mutex_); + + if (Options_.InFlightLimit && AsyncState_.InFlight >= Options_.InFlightLimit) { + AwaitInFlight_.WaitI(Mutex_); + } + + ui64 requestId = AsyncState_.OnStartRequest(); + RunningQueries_[requestId] = YdbSetup_.QueryRequestAsync(query, action, traceId, nullptr).Subscribe([this, requestId](const NThreading::TFuture& f) { + TGuard lock(Mutex_); + + auto response = f.GetValue().Response; + AsyncState_.OnRequestFinished(response.IsSuccess()); + if (response.IsSuccess()) { + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; + } else { + Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.Status << ". " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.Issues.ToString() << CoutColors_.Default(); + } + + if (AsyncState_.InFlight < Options_.InFlightLimit) { + AwaitInFlight_.Signal(); + } + if (!AsyncState_.InFlight) { + AwaitFinish_.Signal(); + } + RunningQueries_.erase(requestId); + }).IgnoreResult(); + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " started. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; + } + + void WaitAsyncQueries() { + TGuard lock(Mutex_); + + if (AsyncState_.InFlight) { + Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for async queries..." << CoutColors_.Default() << Endl; + AwaitFinish_.WaitI(Mutex_); + } + } + bool FetchScriptResults() { TYdbSetup::StopTraceOpt(); @@ -373,6 +442,12 @@ class TKqpRunner::TImpl { TString ExecutionOperation_; TExecutionMeta ExecutionMeta_; std::vector ResultSets_; + + TMutex Mutex_; + TCondVar AwaitInFlight_; + TCondVar AwaitFinish_; + TAsyncState AsyncState_; + std::unordered_map> RunningQueries_; }; @@ -394,10 +469,18 @@ bool TKqpRunner::ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction act return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::ScriptQuery); } +void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->ExecuteQueryAsync(query, action, traceId); +} + bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery); } +void TKqpRunner::WaitAsyncQueries() { + Impl_->WaitAsyncQueries(); +} + bool TKqpRunner::FetchScriptResults() { return Impl_->FetchScriptResults(); } diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index b263bbedbf55..bcff9cc8d01a 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -16,8 +16,12 @@ class TKqpRunner { bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void WaitAsyncQueries(); + bool FetchScriptResults(); bool ForgetExecutionOperation(); diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index e98786344c44..cd0f7261b3e8 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -60,6 +60,20 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred TString YqlToken_; }; +TRequestResult GetQueryResult(TQueryResponse response, TQueryMeta& meta, std::vector& resultSets) { + resultSets = std::move(response.ResultSets); + + auto queryOperationResponse = response.Response->Get()->Record.GetRef(); + const auto& responseRecord = queryOperationResponse.GetResponse(); + + meta.Ast = responseRecord.GetQueryAst(); + if (const auto& plan = responseRecord.GetQueryPlan()) { + meta.Plan = plan; + } + + return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); +} + } // anonymous namespace @@ -218,7 +232,7 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector& resultSets, TProgressCallback progressCallback) const { + NThreading::TFuture QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { auto event = MakeHolder(); FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); @@ -226,12 +240,12 @@ class TYdbSetup::TImpl { event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); } - auto promise = NThreading::NewPromise(); + auto promise = NThreading::NewPromise(); auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(); auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets, progressCallback)); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, progressCallback), RandomNumber(Settings_.NodeCount)); - return promise.GetFuture().GetValueSync(); + return promise.GetFuture(); } NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { @@ -256,7 +270,7 @@ class TYdbSetup::TImpl { auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max()); - GetRuntime()->Register(fetchActor); + GetRuntime()->Register(fetchActor, RandomNumber(Settings_.NodeCount)); return GetRuntime()->GrabEdgeEvent(edgeActor); } @@ -288,7 +302,7 @@ class TYdbSetup::TImpl { template typename TResponse::TPtr RunKqpProxyRequest(THolder event) const { NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); - NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId()); + NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount))); GetRuntime()->Send(kqpProxy, edgeActor, event.Release()); @@ -378,17 +392,16 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer } TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { - resultSets.clear(); - - auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets, progressCallback)->Get()->Record.GetRef(); - const auto& responseRecord = queryOperationResponse.GetResponse(); - - meta.Ast = responseRecord.GetQueryAst(); - if (const auto& plan = responseRecord.GetQueryPlan()) { - meta.Plan = plan; - } + auto response = Impl_->QueryRequestAsync(query, action, traceId, progressCallback).GetValueSync(); + return GetQueryResult(std::move(response), meta, resultSets); +} - return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); +NThreading::TFuture TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { + return Impl_->QueryRequestAsync(query, action, traceId, progressCallback).Apply([](const NThreading::TFuture& f) { + TQueryResult result; + result.Response = GetQueryResult(f.GetValue(), result.Meta, result.ResultSets); + return result; + }); } TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const { diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index 745cdae8a659..4d707f28b050 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -47,6 +47,13 @@ struct TRequestResult { }; +struct TQueryResult { + TRequestResult Response; + TQueryMeta Meta; + std::vector ResultSets; +}; + + class TYdbSetup { public: explicit TYdbSetup(const TYdbSetupSettings& settings); @@ -57,6 +64,8 @@ class TYdbSetup { TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; + NThreading::TFuture QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const; + TRequestResult YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; TRequestResult GetScriptExecutionOperationRequest(const TString& operation, TExecutionMeta& meta) const;