Skip to content

Commit

Permalink
Added async queries
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jun 11, 2024
1 parent e82ac97 commit 6f6984f
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 38 deletions.
42 changes: 32 additions & 10 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ struct TExecutionOptions {
enum class EExecutionCase {
GenericScript,
GenericQuery,
YqlScript
YqlScript,
AsyncQuery
};

std::vector<TString> ScriptQueries;
Expand All @@ -40,7 +41,16 @@ struct TExecutionOptions {
const TString TraceId = "kqprun_" + CreateGuidAsString();

bool HasResults() const {
return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) {
return false;
}

for (EExecutionCase executionCase : ExecutionCases) {
if (executionCase != EExecutionCase::AsyncQuery) {
return true;
}
}
return false;
}

EExecutionCase GetExecutionCase(size_t index) const {
Expand Down Expand Up @@ -71,14 +81,16 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
Sleep(executionOptions.LoopDelay);
}

Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script";
if (numberQueries > 1) {
Cout << " " << id;
}
if (numberLoops != 1) {
Cout << ", loop " << queryId / numberQueries;
if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script";
if (numberQueries > 1) {
Cout << " " << id;
}
if (numberLoops != 1) {
Cout << ", loop " << queryId / numberQueries;
}
Cout << "..." << colors.Default() << Endl;
}
Cout << "..." << colors.Default() << Endl;

switch (executionOptions.GetExecutionCase(id)) {
case TExecutionOptions::EExecutionCase::GenericScript:
Expand Down Expand Up @@ -108,8 +120,13 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
}
break;

case TExecutionOptions::EExecutionCase::AsyncQuery:
runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId);
break;
}
}
runner.WaitAsyncQueries();

if (executionOptions.HasResults()) {
try {
Expand Down Expand Up @@ -351,7 +368,8 @@ class TMain : public TMainClassArgs {
TChoices<TExecutionOptions::EExecutionCase> executionCase({
{"script", TExecutionOptions::EExecutionCase::GenericScript},
{"query", TExecutionOptions::EExecutionCase::GenericQuery},
{"yql-script", TExecutionOptions::EExecutionCase::YqlScript}
{"yql-script", TExecutionOptions::EExecutionCase::YqlScript},
{"async", TExecutionOptions::EExecutionCase::AsyncQuery}
});
options.AddLongOption('C', "execution-case", "Type of query for -p argument")
.RequiredArgument("query-type")
Expand All @@ -361,6 +379,10 @@ class TMain : public TMainClassArgs {
TString choice(option->CurValOrDef());
ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice));
});
options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)")
.RequiredArgument("uint")
.DefaultValue(RunnerOptions.InFlightLimit)
.StoreResult(&RunnerOptions.InFlightLimit);

TChoices<NKikimrKqp::EQueryAction> scriptAction({
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},
Expand Down
17 changes: 7 additions & 10 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ namespace {
class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMock> {
public:
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
TProgressCallback progressCallback)
: Request_(std::move(request))
, Promise_(promise)
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
, ResultSizeLimit_(std::numeric_limits<i64>::max())
, ResultSets_(resultSets)
, ProgressCallback_(progressCallback)
{
if (resultRowsLimit) {
Expand Down Expand Up @@ -79,7 +77,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
}

void Handle(NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
Promise_.SetValue(std::move(ev));
Promise_.SetValue(TQueryResponse{.Response = std::move(ev), .ResultSets = std::move(ResultSets_)});
PassAway();
}

Expand All @@ -91,12 +89,12 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo

private:
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
NThreading::TPromise<TQueryResponse> Promise_;
ui64 ResultRowsLimit_;
ui64 ResultSizeLimit_;
std::vector<Ydb::ResultSet>& ResultSets_;
std::vector<ui64> ResultSetSizes_;
TProgressCallback ProgressCallback_;
std::vector<Ydb::ResultSet> ResultSets_;
std::vector<ui64> ResultSetSizes_;
};

class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
Expand Down Expand Up @@ -186,10 +184,9 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaite
} // anonymous namespace

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
TProgressCallback progressCallback) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback);
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, progressCallback);
}

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount) {
Expand Down
8 changes: 6 additions & 2 deletions ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

namespace NKqpRun {

struct TQueryResponse {
NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response;
std::vector<Ydb::ResultSet> ResultSets;
};

using TProgressCallback = std::function<void(const NKikimrKqp::TEvExecuterProgress&)>;

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets,
NThreading::TPromise<TQueryResponse> promise, ui64 resultRowsLimit, ui64 resultSizeLimit,
TProgressCallback progressCallback);

NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, i32 expectedNodeCount);
Expand Down
2 changes: 2 additions & 0 deletions ydb/tests/tools/kqprun/src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct TRunnerOptions {
NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default;
ETraceOptType TraceOptType = ETraceOptType::Disabled;

ui64 InFlightLimit = 0;

TYdbSetupSettings YdbSettings;
};

Expand Down
83 changes: 83 additions & 0 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <library/cpp/colorizer/colors.h>
#include <library/cpp/json/json_reader.h>

#include <util/system/condvar.h>

#include <ydb/core/blob_depot/mon_main.h>
#include <ydb/core/fq/libs/compute/common/utils.h>

Expand Down Expand Up @@ -86,6 +88,34 @@ void PrintStatistics(const TString& fullStat, const THashMap<TString, i64>& flat
//// TKqpRunner::TImpl

class TKqpRunner::TImpl {
struct TAsyncState {
ui64 OnStartRequest() {
InFlight++;
MaxInFlight = std::max(MaxInFlight, InFlight);
return RequestId++;
}

void OnRequestFinished(bool success) {
InFlight--;
if (success) {
Completed++;
} else {
Failed++;
}
}

TString GetInfoString() const {
return TStringBuilder() << "completed: " << Completed << ", failed: " << Failed << ", in flight: " << InFlight << ", max in flight: " << MaxInFlight << ", spend time: " << TInstant::Now() - Start;
}

const TInstant Start = TInstant::Now();
ui64 RequestId = 1;
ui64 MaxInFlight = 0;
ui64 InFlight = 0;
ui64 Completed = 0;
ui64 Failed = 0;
};

public:
enum class EQueryType {
ScriptQuery,
Expand Down Expand Up @@ -163,6 +193,45 @@ class TKqpRunner::TImpl {
return true;
}

void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) {
TGuard<TMutex> lock(Mutex_);

if (Options_.InFlightLimit && AsyncState_.InFlight >= Options_.InFlightLimit) {
AwaitInFlight_.WaitI(Mutex_);
}

ui64 requestId = AsyncState_.OnStartRequest();
RunningQueries_[requestId] = YdbSetup_.QueryRequestAsync(query, action, traceId, nullptr).Subscribe([this, requestId](const NThreading::TFuture<NKqpRun::TQueryResult>& f) {
TGuard<TMutex> lock(Mutex_);

auto response = f.GetValue().Response;
AsyncState_.OnRequestFinished(response.IsSuccess());
if (response.IsSuccess()) {
Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n";
} else {
Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.Status << ". " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.Issues.ToString() << CoutColors_.Default();
}

if (AsyncState_.InFlight < Options_.InFlightLimit) {
AwaitInFlight_.Signal();
}
if (!AsyncState_.InFlight) {
AwaitFinish_.Signal();
}
RunningQueries_.erase(requestId);
}).IgnoreResult();
Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " started. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n";
}

void WaitAsyncQueries() {
TGuard<TMutex> lock(Mutex_);

if (AsyncState_.InFlight) {
Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for async queries..." << CoutColors_.Default() << Endl;
AwaitFinish_.WaitI(Mutex_);
}
}

bool FetchScriptResults() {
TYdbSetup::StopTraceOpt();

Expand Down Expand Up @@ -373,6 +442,12 @@ class TKqpRunner::TImpl {
TString ExecutionOperation_;
TExecutionMeta ExecutionMeta_;
std::vector<Ydb::ResultSet> ResultSets_;

TMutex Mutex_;
TCondVar AwaitInFlight_;
TCondVar AwaitFinish_;
TAsyncState AsyncState_;
std::unordered_map<ui64, NThreading::TFuture<void>> RunningQueries_;
};


Expand All @@ -394,10 +469,18 @@ bool TKqpRunner::ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction act
return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::ScriptQuery);
}

void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const {
Impl_->ExecuteQueryAsync(query, action, traceId);
}

bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const {
return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery);
}

void TKqpRunner::WaitAsyncQueries() {
Impl_->WaitAsyncQueries();
}

bool TKqpRunner::FetchScriptResults() {
return Impl_->FetchScriptResults();
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/tests/tools/kqprun/src/kqp_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ class TKqpRunner {

bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const;

void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const;

bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const;

void WaitAsyncQueries();

bool FetchScriptResults();

bool ForgetExecutionOperation();
Expand Down
45 changes: 29 additions & 16 deletions ydb/tests/tools/kqprun/src/ydb_setup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred
TString YqlToken_;
};

TRequestResult GetQueryResult(TQueryResponse response, TQueryMeta& meta, std::vector<Ydb::ResultSet>& resultSets) {
resultSets = std::move(response.ResultSets);

auto queryOperationResponse = response.Response->Get()->Record.GetRef();
const auto& responseRecord = queryOperationResponse.GetResponse();

meta.Ast = responseRecord.GetQueryAst();
if (const auto& plan = responseRecord.GetQueryPlan()) {
meta.Plan = plan;
}

return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues());
}

} // anonymous namespace


Expand Down Expand Up @@ -218,20 +232,20 @@ class TYdbSetup::TImpl {
return RunKqpProxyRequest<NKikimr::NKqp::TEvKqp::TEvScriptRequest, NKikimr::NKqp::TEvKqp::TEvScriptResponse>(std::move(event));
}

NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector<Ydb::ResultSet>& resultSets, TProgressCallback progressCallback) const {
NThreading::TFuture<TQueryResponse> QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const {
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest>();
FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record);

if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) {
event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs));
}

auto promise = NThreading::NewPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr>();
auto promise = NThreading::NewPromise<TQueryResponse>();
auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit();
auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit();
GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets, progressCallback));
GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, progressCallback), RandomNumber(Settings_.NodeCount));

return promise.GetFuture().GetValueSync();
return promise.GetFuture();
}

NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const {
Expand All @@ -256,7 +270,7 @@ class TYdbSetup::TImpl {
auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit();
NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max());

GetRuntime()->Register(fetchActor);
GetRuntime()->Register(fetchActor, RandomNumber(Settings_.NodeCount));

return GetRuntime()->GrabEdgeEvent<NKikimr::NKqp::TEvFetchScriptResultsResponse>(edgeActor);
}
Expand Down Expand Up @@ -288,7 +302,7 @@ class TYdbSetup::TImpl {
template <typename TRequest, typename TResponse>
typename TResponse::TPtr RunKqpProxyRequest(THolder<TRequest> event) const {
NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor();
NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId());
NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount)));

GetRuntime()->Send(kqpProxy, edgeActor, event.Release());

Expand Down Expand Up @@ -378,17 +392,16 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer
}

TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector<Ydb::ResultSet>& resultSets, TProgressCallback progressCallback) const {
resultSets.clear();

auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets, progressCallback)->Get()->Record.GetRef();
const auto& responseRecord = queryOperationResponse.GetResponse();

meta.Ast = responseRecord.GetQueryAst();
if (const auto& plan = responseRecord.GetQueryPlan()) {
meta.Plan = plan;
}
auto response = Impl_->QueryRequestAsync(query, action, traceId, progressCallback).GetValueSync();
return GetQueryResult(std::move(response), meta, resultSets);
}

return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues());
NThreading::TFuture<TQueryResult> TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const {
return Impl_->QueryRequestAsync(query, action, traceId, progressCallback).Apply([](const NThreading::TFuture<TQueryResponse>& f) {
TQueryResult result;
result.Response = GetQueryResult(f.GetValue(), result.Meta, result.ResultSets);
return result;
});
}

TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector<Ydb::ResultSet>& resultSets) const {
Expand Down
Loading

0 comments on commit 6f6984f

Please sign in to comment.