Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3184 support create table as select in kqprun #4412

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/tests/tools/kqprun/configuration/app_config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ LogConfig {

QueryServiceConfig {
MdbTransformHost: false
ProgressStatsPeriodMs: 1000
QueryArtifactsCompressionMethod: "zstd_6"
ScriptResultRowsLimit: 0
ScriptResultSizeLimit: 10485760
Expand Down Expand Up @@ -108,6 +109,8 @@ ResourceBrokerConfig {
TableServiceConfig {
BindingsMode: BM_DROP
CompileTimeoutMs: 600000
EnableCreateTableAs: true
EnablePerStatementQueryExecution: true
SessionsLimitPerNode: 1000

QueryLimits {
Expand Down
85 changes: 49 additions & 36 deletions ydb/tests/tools/kqprun/kqprun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct TExecutionOptions {
YqlScript
};

TString ScriptQuery;
std::vector<TString> ScriptQueries;
TString SchemeQuery;

bool ForgetExecution = false;
Expand All @@ -35,7 +35,7 @@ struct TExecutionOptions {
TString TraceId = "kqprun";

bool HasResults() const {
return ScriptQuery && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE;
}
};

Expand All @@ -53,11 +53,11 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
}
}

if (executionOptions.ScriptQuery) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script..." << colors.Default() << Endl;
for (size_t id = 0; id < executionOptions.ScriptQueries.size(); ++id) {
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script" << (executionOptions.ScriptQueries.size() > 1 ? TStringBuilder() << " " << id : TString()) << "..." << colors.Default() << Endl;
switch (executionOptions.ClearExecution) {
case TExecutionOptions::EClearExecutionCase::Disabled:
if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed";
}
Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching script results..." << colors.Default() << Endl;
Expand All @@ -73,13 +73,13 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
break;

case TExecutionOptions::EClearExecutionCase::GenericQuery:
if (!runner.ExecuteQuery(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
}
break;

case TExecutionOptions::EClearExecutionCase::YqlScript:
if (!runner.ExecuteYqlScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) {
ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed";
}
break;
Expand All @@ -106,6 +106,16 @@ THolder<TFileOutput> SetupDefaultFileOutput(const TString& filePath, IOutputStre
}


template <typename EnumType>
EnumType GetCaseVariant(const TString& optionName, const TString& caseName, const std::map<TString, EnumType>& casesMap) {
auto it = casesMap.find(caseName);
if (it == casesMap.end()) {
ythrow yexception() << "Option '" << optionName << "' has no case '" << caseName << "'";
}
return it->second;
}


void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) {
TString variableTemplate = TStringBuilder() << "${" << variableName << "}";
for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) {
Expand Down Expand Up @@ -143,7 +153,7 @@ void RunMain(int argc, const char* argv[]) {
TExecutionOptions executionOptions;
NKqpRun::TRunnerOptions runnerOptions;

TString scriptQueryFile;
std::vector<TString> scriptQueryFiles;
TString schemeQueryFile;
TString resultOutputFile = "-";
TString schemeQueryAstFile;
Expand All @@ -169,7 +179,7 @@ void RunMain(int argc, const char* argv[]) {
options.AddLongOption('p', "script-query", "Script query to execute")
.Optional()
.RequiredArgument("FILE")
.StoreResult(&scriptQueryFile);
.AppendTo(&scriptQueryFiles);
options.AddLongOption('s', "scheme-query", "Scheme query to execute")
.Optional()
.RequiredArgument("FILE")
Expand Down Expand Up @@ -273,27 +283,30 @@ void RunMain(int argc, const char* argv[]) {

// Execution options

if (!schemeQueryFile && !scriptQueryFile) {
if (!schemeQueryFile && scriptQueryFiles.empty()) {
ythrow yexception() << "Nothing to execute";
}

if (schemeQueryFile) {
executionOptions.SchemeQuery = TFileInput(schemeQueryFile).ReadAll();
ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, yqlToken, executionOptions.SchemeQuery);
}
if (scriptQueryFile) {
executionOptions.ScriptQuery = TFileInput(scriptQueryFile).ReadAll();

executionOptions.ScriptQueries.reserve(scriptQueryFiles.size());
for (const TString& scriptQueryFile : scriptQueryFiles) {
executionOptions.ScriptQueries.emplace_back(TFileInput(scriptQueryFile).ReadAll());
}

executionOptions.ClearExecution =
(clearExecutionType == TStringBuf("query")) ? TExecutionOptions::EClearExecutionCase::GenericQuery
: (clearExecutionType == TStringBuf("yql-script")) ? TExecutionOptions::EClearExecutionCase::YqlScript
: (clearExecutionType == TStringBuf("disabled")) ? TExecutionOptions::EClearExecutionCase::Disabled
: TExecutionOptions::EClearExecutionCase::Disabled;
executionOptions.ClearExecution = GetCaseVariant<TExecutionOptions::EClearExecutionCase>("clear-execution", clearExecutionType, {
{"query", TExecutionOptions::EClearExecutionCase::GenericQuery},
{"yql-script", TExecutionOptions::EClearExecutionCase::YqlScript},
{"disabled", TExecutionOptions::EClearExecutionCase::Disabled}
});

executionOptions.ScriptQueryAction =
(scriptQueryAction == TStringBuf("execute")) ? NKikimrKqp::QUERY_ACTION_EXECUTE
: (scriptQueryAction == TStringBuf("explain")) ? NKikimrKqp::QUERY_ACTION_EXPLAIN
: NKikimrKqp::QUERY_ACTION_EXECUTE;
executionOptions.ScriptQueryAction = GetCaseVariant<NKikimrKqp::EQueryAction>("script-action", scriptQueryAction, {
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},
{"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN}
});

// Runner options

Expand All @@ -302,24 +315,24 @@ void RunMain(int argc, const char* argv[]) {
THolder<TFileOutput> scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput);
THolder<TFileOutput> scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput);

runnerOptions.TraceOptType =
(traceOptType == TStringBuf("all")) ? NKqpRun::TRunnerOptions::ETraceOptType::All
: (traceOptType == TStringBuf("scheme")) ? NKqpRun::TRunnerOptions::ETraceOptType::Scheme
: (traceOptType == TStringBuf("script")) ? NKqpRun::TRunnerOptions::ETraceOptType::Script
: (traceOptType == TStringBuf("disabled")) ? NKqpRun::TRunnerOptions::ETraceOptType::Disabled
: NKqpRun::TRunnerOptions::ETraceOptType::All;
runnerOptions.TraceOptType = GetCaseVariant<NKqpRun::TRunnerOptions::ETraceOptType>("trace-opt", traceOptType, {
{"all", NKqpRun::TRunnerOptions::ETraceOptType::All},
{"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme},
{"script", NKqpRun::TRunnerOptions::ETraceOptType::Script},
{"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled}
});
runnerOptions.YdbSettings.TraceOptEnabled = runnerOptions.TraceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled;

runnerOptions.ResultOutputFormat =
(resultOutputFormat == TStringBuf("rows")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson
: (resultOutputFormat == TStringBuf("full")) ? NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson
: NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson;
runnerOptions.ResultOutputFormat = GetCaseVariant<NKqpRun::TRunnerOptions::EResultOutputFormat>("result-format", resultOutputFormat, {
{"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson},
{"full", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}
});

runnerOptions.PlanOutputFormat =
(planOutputFormat == TStringBuf("pretty")) ? NYdb::NConsoleClient::EOutputFormat::Pretty
: (planOutputFormat == TStringBuf("table")) ? NYdb::NConsoleClient::EOutputFormat::PrettyTable
: (planOutputFormat == TStringBuf("json")) ? NYdb::NConsoleClient::EOutputFormat::JsonUnicode
: NYdb::NConsoleClient::EOutputFormat::Default;
runnerOptions.PlanOutputFormat = GetCaseVariant<NYdb::NConsoleClient::EOutputFormat>("plan-format", planOutputFormat, {
{"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty},
{"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable},
{"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode},
});

// Ydb settings

Expand Down
13 changes: 10 additions & 3 deletions ydb/tests/tools/kqprun/src/actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
public:
TRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets)
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan)
: Request_(std::move(request))
, Promise_(promise)
, ResultRowsLimit_(std::numeric_limits<ui64>::max())
, ResultSizeLimit_(std::numeric_limits<i64>::max())
, ResultSets_(resultSets)
, QueryPlan_(queryPlan)
{
if (resultRowsLimit) {
ResultRowsLimit_ = resultRowsLimit;
Expand All @@ -36,6 +37,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
STRICT_STFUNC(StateFunc,
hFunc(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData, Handle);
hFunc(NKikimr::NKqp::TEvKqp::TEvQueryResponse, Handle);
hFunc(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress, Handle);
)

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
Expand Down Expand Up @@ -73,20 +75,25 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped<TRunScriptActorMo
PassAway();
}

void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvExecuterProgress::TPtr& ev) {
QueryPlan_ = ev->Get()->Record.GetQueryPlan();
}

private:
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> Request_;
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> Promise_;
ui64 ResultRowsLimit_;
ui64 ResultSizeLimit_;
std::vector<Ydb::ResultSet>& ResultSets_;
TString& QueryPlan_;
};

} // anonymous namespace

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets);
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan) {
return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, queryPlan);
}

} // namespace NKqpRun
2 changes: 1 addition & 1 deletion ydb/tests/tools/kqprun/src/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ namespace NKqpRun {

NActors::IActor* CreateRunScriptActorMock(THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> request,
NThreading::TPromise<NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr> promise,
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets);
ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector<Ydb::ResultSet>& resultSets, TString& queryPlan);

} // namespace NKqpRun
26 changes: 18 additions & 8 deletions ydb/tests/tools/kqprun/src/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "ydb_setup.h"

#include <library/cpp/colorizer/colors.h>
#include <library/cpp/json/json_reader.h>

#include <ydb/public/lib/json_value/ydb_json_value.h>
#include <ydb/public/lib/ydb_cli/common/format.h>
Expand Down Expand Up @@ -74,6 +75,8 @@ class TKqpRunner::TImpl {

PrintScriptAst(meta.Ast);

PrintScriptPlan(meta.Plan);

if (!status.IsSuccess()) {
Cerr << CerrColors_.Red() << "Failed to execute query, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl;
return false;
Expand All @@ -83,8 +86,6 @@ class TKqpRunner::TImpl {
Cerr << CerrColors_.Red() << "Request finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
}

PrintScriptPlan(meta.Plan);

return true;
}

Expand Down Expand Up @@ -130,6 +131,7 @@ class TKqpRunner::TImpl {

private:
bool WaitScriptExecutionOperation() {
ExecutionMeta_ = TExecutionMeta();
TRequestResult status;
while (true) {
status = YdbSetup_.GetScriptExecutionOperationRequest(ExecutionOperation_, ExecutionMeta_);
Expand All @@ -148,6 +150,8 @@ class TKqpRunner::TImpl {

PrintScriptAst(ExecutionMeta_.Ast);

PrintScriptPlan(ExecutionMeta_.Plan);

if (!status.IsSuccess() || ExecutionMeta_.ExecutionStatus != NYdb::NQuery::EExecStatus::Completed) {
Cerr << CerrColors_.Red() << "Failed to execute script, invalid final status, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl;
return false;
Expand All @@ -157,8 +161,6 @@ class TKqpRunner::TImpl {
Cerr << CerrColors_.Red() << "Request finished with issues:" << CerrColors_.Default() << Endl << status.Issues.ToString() << Endl;
}

PrintScriptPlan(ExecutionMeta_.Plan);

return true;
}

Expand Down Expand Up @@ -189,12 +191,20 @@ class TKqpRunner::TImpl {
}

void PrintScriptPlan(const TString& plan) const {
if (Options_.ScriptQueryPlanOutput) {
Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;
if (!Options_.ScriptQueryPlanOutput || !plan) {
return;
}

NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *Options_.ScriptQueryPlanOutput);
printer.Print(plan);
NJson::TJsonValue planJson;
NJson::ReadJsonTree(plan, &planJson, true);
if (!planJson.GetMapSafe().contains("meta")) {
return;
}

Cout << CoutColors_.Cyan() << "Writing script query plan" << CoutColors_.Default() << Endl;

NYdb::NConsoleClient::TQueryPlanPrinter printer(Options_.PlanOutputFormat, true, *Options_.ScriptQueryPlanOutput);
printer.Print(plan);
}

void PrintScriptResult(const Ydb::ResultSet& resultSet) const {
Expand Down
Loading
Loading