Skip to content

Commit

Permalink
YQ-3184 support create table as select in kqprun (#4412)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored May 13, 2024
1 parent 8cbb144 commit 3c85e58
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 64 deletions.
3 changes: 3 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ LogConfig {

QueryServiceConfig {
MdbTransformHost: false
ProgressStatsPeriodMs: 1000
QueryArtifactsCompressionMethod: "zstd_6"
ScriptResultRowsLimit: 0
ScriptResultSizeLimit: 10485760
Expand Down Expand Up @@ -108,6 +109,8 @@ ResourceBrokerConfig {
TableServiceConfig {
BindingsMode: BM_DROP
CompileTimeoutMs: 600000
EnableCreateTableAs: true
EnablePerStatementQueryExecution: true
SessionsLimitPerNode: 1000

QueryLimits {
Expand Down
85 changes: 49 additions & 36 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct TExecutionOptions {
YqlScript
};

TString ScriptQuery;
std::vector<TString> ScriptQueries;
TString SchemeQuery;

bool ForgetExecution = false;
Expand All @@ -35,7 +35,7 @@ struct TExecutionOptions {
TString TraceId = "kqprun";

bool HasResults() const {
return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
}
};

Expand All @@ -53,11 +53,11 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
}
}

if (executionOptions.ScriptQuery) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script..." << colors.Default() << Endl;
for (size_t id = 0; id < executionOptions.ScriptQueries.size(); ++id) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script" << (executionOptions.ScriptQueries.size() > 1 ? TStringBuilder() << " " << id : TString()) << "..." << colors.Default() << Endl;
switch (executionOptions.ClearExecution) {
case TExecutionOptions::EClearExecutionCase::Disabled:
if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
}
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
Expand All @@ -73,13 +73,13 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
break;

case TExecutionOptions::EClearExecutionCase::GenericQuery:
if (!runner.ExecuteQuery(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
}
break;

case TExecutionOptions::EClearExecutionCase::YqlScript:
if (!runner.ExecuteYqlScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
}
break;
Expand All @@ -106,6 +106,16 @@ THolder<TFileOutput> SetupDefaultFileOutput(const TString& filePath, IOutputStre
}


template <typename EnumType>
EnumType GetCaseVariant(const TString& optionName, const TString& caseName, const std::map<TString, EnumType>& casesMap) {
auto it = casesMap.find(caseName);
if (it == casesMap.end()) {
ythrow yexception() << "Option '" << optionName << "' has no case '" << caseName << "'";
}
return it->second;
}


void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) {
TString variableTemplate = TStringBuilder() << "${" << variableName << "}";
for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) {
Expand Down Expand Up @@ -143,7 +153,7 @@ void RunMain(int argc, const char* argv[]) {
TExecutionOptions executionOptions;
NKqpRun::TRunnerOptions runnerOptions;

TString scriptQueryFile;
std::vector<TString> scriptQueryFiles;
TString schemeQueryFile;
TString resultOutputFile = "-";
TString schemeQueryAstFile;
Expand All @@ -169,7 +179,7 @@ void RunMain(int argc, const char* argv[]) {
options.AddLongOption('p', "script-query", "Script query to execute")
.Optional()
.RequiredArgument("FILE")
.StoreResult(&scriptQueryFile);
.AppendTo(&scriptQueryFiles);
options.AddLongOption('s', "scheme-query", "Scheme query to execute")
.Optional()
.RequiredArgument("FILE")
Expand Down Expand Up @@ -273,27 +283,30 @@ void RunMain(int argc, const char* argv[]) {

// Execution options

if (!schemeQueryFile && !scriptQueryFile) {
if (!schemeQueryFile && scriptQueryFiles.empty()) {
ythrow yexception() << "Nothing to execute";
}

if (schemeQueryFile) {
executionOptions.SchemeQuery = TFileInput(schemeQueryFile).ReadAll();
ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, yqlToken, executionOptions.SchemeQuery);
}
if (scriptQueryFile) {
executionOptions.ScriptQuery = TFileInput(scriptQueryFile).ReadAll();

executionOptions.ScriptQueries.reserve(scriptQueryFiles.size());
for (const TString& scriptQueryFile : scriptQueryFiles) {
executionOptions.ScriptQueries.emplace_back(TFileInput(scriptQueryFile).ReadAll());
}

executionOptions.ClearExecution =
(clearExecutionType == TStringBuf("query")) ? TExecutionOptions::EClearExecutionCase::GenericQuery
: (clearExecutionType == TStringBuf("yql-script")) ? TExecutionOptions::EClearExecutionCase::YqlScript
: (clearExecutionType == TStringBuf("disabled")) ? TExecutionOptions::EClearExecutionCase::Disabled
: TExecutionOptions::EClearExecutionCase::Disabled;
executionOptions.ClearExecution = GetCaseVariant<TExecutionOptions::EClearExecutionCase>("clear-execution", clearExecutionType, {
{"query", TExecutionOptions::EClearExecutionCase::GenericQuery},
{"yql-script", TExecutionOptions::EClearExecutionCase::YqlScript},
{"disabled", TExecutionOptions::EClearExecutionCase::Disabled}
});

executionOptions.ScriptQueryAction =
(scriptQueryAction == TStringBuf("execute")) ? NKikimrKqp::QUERY_ACTION_EXECUTE
: (scriptQueryAction == TStringBuf("explain")) ? NKikimrKqp::QUERY_ACTION_EXPLAIN
: NKikimrKqp::QUERY_ACTION_EXECUTE;
executionOptions.ScriptQueryAction = GetCaseVariant<NKikimrKqp::EQueryAction>("script-action", scriptQueryAction, {
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},
{"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN}
});

// Runner options

Expand All @@ -302,24 +315,24 @@ void RunMain(int argc, const char* argv[]) {
THolder<TFileOutput> scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput);
THolder<TFileOutput> scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput);

runnerOptions.TraceOptType =
(traceOptType == TStringBuf("all")) ? NKqpRun::TRunnerOptions::ETraceOptType::All
: (traceOptType == TStringBuf("scheme")) ? NKqpRun::TRunnerOptions::ETraceOptType::Scheme
: (traceOptType == TStringBuf("script")) ? NKqpRun::TRunnerOptions::ETraceOptType::Script
: (traceOptType == TStringBuf("disabled")) ? NKqpRun::TRunnerOptions::ETraceOptType::Disabled
: NKqpRun::TRunnerOptions::ETraceOptType::All;
runnerOptions.TraceOptType = GetCaseVariant<NKqpRun::TRunnerOptions::ETraceOptType>("trace-opt", traceOptType, {
{"all", NKqpRun::TRunnerOptions::ETraceOptType::All},
{"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme},
{"script", NKqpRun::TRunnerOptions::ETraceOptType::Script},
{"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled}
});
runnerOptions.YdbSettings.TraceOptEnabled = runnerOptions.TraceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled;

runnerOptions.ResultOutputFormat =
(resultOutputFormat == TStringBuf("rows")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson
: (resultOutputFormat == TStringBuf("full")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson
: NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson;
runnerOptions.ResultOutputFormat = GetCaseVariant<NKqpRun::TRunnerOptions::EResultOutputFormat>("result-format", resultOutputFormat, {
{"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson},
{"full", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}
});

runnerOptions.PlanOutputFormat =
(planOutputFormat == TStringBuf("pretty")) ? NYdb::NConsoleClient::EOutputFormat::Pretty
: (planOutputFormat == TStringBuf("table")) ? NYdb::NConsoleClient::EOutputFormat::PrettyTable
: (planOutputFormat == TStringBuf("json")) ? NYdb::NConsoleClient::EOutputFormat::JsonUnicode
: NYdb::NConsoleClient::EOutputFormat::Default;
runnerOptions.PlanOutputFormat = GetCaseVariant<NYdb::NConsoleClient::EOutputFormat>("plan-format", planOutputFormat, {
{"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty},
{"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable},
{"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode},
});

// Ydb settings

Expand Down
13 changes: 10 additions & 3 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
public:
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets)
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan)
: Request_(std::move(request))
, Promise_(promise)
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
, ResultSizeLimit_(std::numeric_limits<i64>::max())
, ResultSets_(resultSets)
, QueryPlan_(queryPlan)
{
if (resultRowsLimit) {
ResultRowsLimit_ = resultRowsLimit;
Expand All @@ -36,6 +37,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
STRICT_STFUNC(StateFunc,
hFunc(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData, Handle);
hFunc(NKikimr::NKqp::TEvKqp::TEvQueryResponse, Handle);
hFunc(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
)

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
Expand Down Expand Up @@ -73,20 +75,25 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
PassAway();
}

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
QueryPlan_ = ev->Get()->Record.GetQueryPlan();
}

private:
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
ui64 ResultRowsLimit_;
ui64 ResultSizeLimit_;
std::vector<Ydb::ResultSet>& ResultSets_;
TString& QueryPlan_;
};

} // anonymous namespace

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets);
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, queryPlan);
}

} // namespace NKqpRun
2 changes: 1 addition & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ namespace NKqpRun {

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets);
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan);

} // namespace NKqpRun
26 changes: 18 additions & 8 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "ydb_setup.h"

#include <library/cpp/colorizer/colors.h>
#include <library/cpp/json/json_reader.h>

#include <ydb/public/lib/json_value/ydb_json_value.h>
#include <ydb/public/lib/ydb_cli/common/format.h>
Expand Down Expand Up @@ -74,6 +75,8 @@ class TKqpRunner::TImpl {

PrintScriptAst(meta.Ast);

PrintScriptPlan(meta.Plan);

if (!status.IsSuccess()) {
Cerr << CerrColors_.Red() << "Failed to execute query, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl;
return false;
Expand All @@ -83,8 +86,6 @@ class TKqpRunner::TImpl {
Cerr << CerrColors_.Red() << "Request finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
}

PrintScriptPlan(meta.Plan);

return true;
}

Expand Down Expand Up @@ -130,6 +131,7 @@ class TKqpRunner::TImpl {

private:
bool WaitScriptExecutionOperation() {
ExecutionMeta_ = TExecutionMeta();
TRequestResult status;
while (true) {
status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionOperation_, ExecutionMeta_);
Expand All @@ -148,6 +150,8 @@ class TKqpRunner::TImpl {

PrintScriptAst(ExecutionMeta_.Ast);

PrintScriptPlan(ExecutionMeta_.Plan);

if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) {
Cerr << CerrColors_.Red() << "Failed to execute script, invalid final status, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl;
return false;
Expand All @@ -157,8 +161,6 @@ class TKqpRunner::TImpl {
Cerr << CerrColors_.Red() << "Request finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
}

PrintScriptPlan(ExecutionMeta_.Plan);

return true;
}

Expand Down Expand Up @@ -189,12 +191,20 @@ class TKqpRunner::TImpl {
}

void PrintScriptPlan(const TString& plan) const {
if (Options_.ScriptQueryPlanOutput) {
Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;
if (!Options_.ScriptQueryPlanOutput || !plan) {
return;
}

NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *Options_.ScriptQueryPlanOutput);
printer.Print(plan);
NJson::TJsonValue planJson;
NJson::ReadJsonTree(plan, &planJson, true);
if (!planJson.GetMapSafe().contains("meta")) {
return;
}

Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;

NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *Options_.ScriptQueryPlanOutput);
printer.Print(plan);
}

void PrintScriptResult(const Ydb::ResultSet& resultSet) const {
Expand Down
Loading

0 comments on commit 3c85e58

Please sign in to comment.