From 2b7a4f6fbde37130d0c7e633f64dea9f5d748b34 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Mon, 10 Jun 2024 07:35:04 +0000 Subject: [PATCH 1/5] Fixed performance for -C query --- ydb/tests/tools/kqprun/src/actors.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 16dd8e0c1e28..7ddf0fc4bc7e 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -50,23 +50,29 @@ class TRunScriptActorMock : public NActors::TActorBootstrappedGet()->Record.GetQueryResultIndex(); if (resultSetIndex >= ResultSets_.size()) { ResultSets_.resize(resultSetIndex + 1); + ResultSetSizes_.resize(resultSetIndex + 1, 0); } if (!ResultSets_[resultSetIndex].truncated()) { + ui64& resultSetSize = ResultSetSizes_[resultSetIndex]; for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) { if (static_cast(ResultSets_[resultSetIndex].rows_size()) >= ResultRowsLimit_) { ResultSets_[resultSetIndex].set_truncated(true); break; } - if (ResultSets_[resultSetIndex].ByteSizeLong() + row.ByteSizeLong() > ResultSizeLimit_) { + auto rowSize = row.ByteSizeLong(); + if (resultSetSize + rowSize > ResultSizeLimit_) { ResultSets_[resultSetIndex].set_truncated(true); break; } + resultSetSize += rowSize; *ResultSets_[resultSetIndex].add_rows() = std::move(row); } - *ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns(); + if (!ResultSets_[resultSetIndex].columns_size()) { + *ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns(); + } } Send(ev->Sender, response.Release()); @@ -89,6 +95,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped& ResultSets_; + std::vector ResultSetSizes_; TProgressCallback ProgressCallback_; }; From 4eb393aaccade49ebc062a76a45339872bf7f2da Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 11 Jun 2024 07:55:24 +0000 Subject: [PATCH 2/5] Refactored arguments parsing --- ydb/core/testlib/actors/test_runtime.cpp | 2 +- ydb/core/testlib/test_client.cpp | 2 +- ydb/core/testlib/test_client.h | 2 + ydb/library/actors/testlib/test_runtime.cpp | 3 +- ydb/library/actors/testlib/test_runtime.h | 3 +- ydb/tests/fq/yt/kqprun.py | 2 +- ydb/tests/tools/kqprun/kqprun.cpp | 545 ++++++++++---------- ydb/tests/tools/kqprun/src/common.h | 9 +- ydb/tests/tools/kqprun/src/kqp_runner.cpp | 2 +- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 3 +- 10 files changed, 289 insertions(+), 284 deletions(-) diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 69009c3ce184..9f8d7ced7d28 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -170,7 +170,7 @@ namespace NActors { } if (NeedMonitoring && !SingleSysEnv) { - ui16 port = GetPortManager().GetPort(); + ui16 port = MonitoringPortOffset ? MonitoringPortOffset + nodeIndex : GetPortManager().GetPort(); node->Mon.Reset(new NActors::TSyncHttpMon({ .Port = port, .Threads = 10, diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index c855d300dd8c..94279d5265d8 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -227,7 +227,7 @@ namespace Tests { NKikimr::SetupChannelProfiles(app); - Runtime->SetupMonitoring(); + Runtime->SetupMonitoring(Settings->MonitoringPortOffset); Runtime->SetLogBackend(Settings->LogBackend); Runtime->AddAppDataInit([this](ui32 nodeIdx, NKikimr::TAppData& appData) { diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index fc88e99579d7..2b473b6b9fd8 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -105,6 +105,7 @@ namespace Tests { ui16 Port; ui16 GrpcPort = 0; int GrpcMaxMessageSize = 0; // 0 - default (4_MB), -1 - no limit + ui16 MonitoringPortOffset = 0; NKikimrProto::TAuthConfig AuthConfig; NKikimrPQ::TPQConfig PQConfig; NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig; @@ -157,6 +158,7 @@ namespace Tests { TServerSettings& SetGrpcPort(ui16 value) { GrpcPort = value; return *this; } TServerSettings& SetGrpcMaxMessageSize(int value) { GrpcMaxMessageSize = value; return *this; } + TServerSettings& SetMonitoringPortOffset(ui16 value) { MonitoringPortOffset = value; return *this; } TServerSettings& SetSupportsRedirect(bool value) { SupportsRedirect = value; return *this; } TServerSettings& SetTracePath(const TString& value) { TracePath = value; return *this; } TServerSettings& SetDomain(ui32 value) { Domain = value; return *this; } diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index 200690ad98fc..c93d8ea27ff6 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -1594,8 +1594,9 @@ namespace NActors { return node->DynamicCounters; } - void TTestActorRuntimeBase::SetupMonitoring() { + void TTestActorRuntimeBase::SetupMonitoring(ui16 monitoringPortOffset) { NeedMonitoring = true; + MonitoringPortOffset = monitoringPortOffset; } void TTestActorRuntimeBase::SendInternal(TAutoPtr ev, ui32 nodeIndex, bool viaActorSystem) { diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index d6b20ca9a357..eef097c1b7b9 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -295,7 +295,7 @@ namespace NActors { void EnableScheduleForActor(const TActorId& actorId, bool allow = true); bool IsScheduleForActorEnabled(const TActorId& actorId) const; TIntrusivePtr GetDynamicCounters(ui32 nodeIndex = 0); - void SetupMonitoring(); + void SetupMonitoring(ui16 monitoringPortOffset = 0); using TEventObserverCollection = std::list& event)>>; class TEventObserverHolder { @@ -655,6 +655,7 @@ namespace NActors { TIntrusivePtr DispatcherRandomProvider; TAutoPtr LogBackend; bool NeedMonitoring; + ui16 MonitoringPortOffset = 0; TIntrusivePtr RandomProvider; TIntrusivePtr TimeProvider; diff --git a/ydb/tests/fq/yt/kqprun.py b/ydb/tests/fq/yt/kqprun.py index 3179fee6bee7..4e0f86b9d1be 100644 --- a/ydb/tests/fq/yt/kqprun.py +++ b/ydb/tests/fq/yt/kqprun.py @@ -37,7 +37,7 @@ def yql_exec(self, program=None, program_file=None, verbose=False, check_error=T cmd += '--emulate-yt ' \ '--exclude-linked-udfs ' \ - '--clear-execution query ' \ + '--execution-case query ' \ '--app-config=%(config_file)s ' \ '--script-query=%(program_file)s ' \ '--scheme-query=%(scheme_file)s ' \ diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 30ad08e628e8..1d85b47044cf 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -6,21 +6,23 @@ #include #include +#include #include #include +#include + #include #include #include #include #include -#include struct TExecutionOptions { - enum class EClearExecutionCase { - Disabled, + enum class EExecutionCase { + GenericScript, GenericQuery, YqlScript }; @@ -29,10 +31,10 @@ struct TExecutionOptions { TString SchemeQuery; bool ForgetExecution = false; - EClearExecutionCase ClearExecution = EClearExecutionCase::Disabled; + EExecutionCase ExecutionCase = EExecutionCase::GenericScript; NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; - TString TraceId = "kqprun_" + CreateGuidAsString(); + const TString TraceId = "kqprun_" + CreateGuidAsString(); bool HasResults() const { return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE; @@ -55,8 +57,8 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner for (size_t id = 0; id < executionOptions.ScriptQueries.size(); ++id) { Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script" << (executionOptions.ScriptQueries.size() > 1 ? TStringBuilder() << " " << id : TString()) << "..." << colors.Default() << Endl; - switch (executionOptions.ClearExecution) { - case TExecutionOptions::EClearExecutionCase::Disabled: + switch (executionOptions.ExecutionCase) { + case TExecutionOptions::EExecutionCase::GenericScript: if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; } @@ -72,13 +74,13 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } break; - case TExecutionOptions::EClearExecutionCase::GenericQuery: + case TExecutionOptions::EExecutionCase::GenericQuery: if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } break; - case TExecutionOptions::EClearExecutionCase::YqlScript: + case TExecutionOptions::EExecutionCase::YqlScript: if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } @@ -111,37 +113,6 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } -THolder SetupDefaultFileOutput(const TString& filePath, IOutputStream*& stream) { - THolder fileHolder; - if (filePath == "-") { - stream = &Cout; - } else if (filePath) { - fileHolder.Reset(new TFileOutput(filePath)); - stream = fileHolder.Get(); - } - return fileHolder; -} - - -template -EnumType GetCaseVariant(const TString& optionName, const TString& caseName, const std::map& casesMap) { - auto it = casesMap.find(caseName); - if (it == casesMap.end()) { - ythrow yexception() << "Option '" << optionName << "' has no case '" << caseName << "'"; - } - return it->second; -} - - -void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { - TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; - for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { - query.replace(position, variableTemplate.size(), variableValue); - position += variableValue.size(); - } -} - - TIntrusivePtr CreateFunctionRegistry(const TString& udfsDirectory, TVector udfsPaths, bool excludeLinkedUdfs) { if (!udfsDirectory.empty() || !udfsPaths.empty()) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); @@ -149,7 +120,7 @@ TIntrusivePtr CreateFunctionRegistr } NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths); - auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); if (excludeLinkedUdfs) { for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) { @@ -166,253 +137,281 @@ TIntrusivePtr CreateFunctionRegistr } -void RunMain(int argc, const char* argv[]) { - TExecutionOptions executionOptions; - NKqpRun::TRunnerOptions runnerOptions; - - std::vector scriptQueryFiles; - TString schemeQueryFile; - TString resultOutputFile = "-"; - TString schemeQueryAstFile; - TString scriptQueryAstFile; - TString scriptQueryPlanFile; - TString inProgressStatisticsFile; - TString logFile = "-"; - TString appConfigFile = "./configuration/app_config.conf"; - std::vector tablesMappingList; - - TString clearExecutionType = "disabled"; - TString traceOptType = "disabled"; - TString scriptQueryAction = "execute"; - TString planOutputFormat = "pretty"; - TString resultOutputFormat = "rows"; - i64 resultsRowsLimit = 1000; - bool emulateYt = false; - - TVector udfsPaths; - TString udfsDirectory; - bool excludeLinkedUdfs = false; - - NLastGetopt::TOpts options = NLastGetopt::TOpts::Default(); - options.AddLongOption('p', "script-query", "Script query to execute") - .Optional() - .RequiredArgument("FILE") - .AppendTo(&scriptQueryFiles); - options.AddLongOption('s', "scheme-query", "Scheme query to execute") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&schemeQueryFile); - options.AddLongOption('c', "app-config", "File with app config (TAppConfig)") - .Optional() - .RequiredArgument("FILE") - .DefaultValue(appConfigFile) - .StoreResult(&appConfigFile); - options.AddLongOption('t', "table", "File with table (can be used by YT with -E flag), table@file") - .Optional() - .RequiredArgument("FILE") - .AppendTo(&tablesMappingList); - - options.AddLongOption("log-file", "File with execution logs (use '-' to write in stderr)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&logFile); - options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&resultOutputFile); - options.AddLongOption("scheme-ast-file", "File with scheme query ast (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&schemeQueryAstFile); - options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&scriptQueryAstFile); - options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&scriptQueryPlanFile); - options.AddLongOption("in-progress-statistics", "File with script inprogress statistics") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&inProgressStatisticsFile); - - options.AddLongOption('C', "clear-execution", "Execute script query without creating additional tables, one of { query | yql-script }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(clearExecutionType) - .StoreResult(&clearExecutionType); - options.AddLongOption('F', "forget", "Forget script execution operation after fetching results, cannot be used with -C") - .Optional() - .NoArgument() - .DefaultValue(executionOptions.ForgetExecution) - .SetFlag(&executionOptions.ForgetExecution); - options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation, one of { scheme | script | all }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(traceOptType) - .StoreResult(&traceOptType); - options.AddLongOption('A', "script-action", "Script query execute action, one of { execute | explain }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(scriptQueryAction) - .StoreResult(&scriptQueryAction); - options.AddLongOption('P', "plan-format", "Script query plan format, one of { pretty | table | json }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(planOutputFormat) - .StoreResult(&planOutputFormat); - options.AddLongOption('R', "result-format", "Script query result format, one of { rows | full-json | full-proto }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(resultOutputFormat) - .StoreResult(&resultOutputFormat); - options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") - .Optional() - .RequiredArgument("INT") - .DefaultValue(resultsRowsLimit) - .StoreResult(&resultsRowsLimit); - options.AddLongOption('E', "emulate-yt", "Emulate YT tables") - .Optional() - .NoArgument() - .DefaultValue(emulateYt) - .SetFlag(&emulateYt); - options.AddLongOption('N', "node-count", "Number of nodes to create") - .Optional() - .RequiredArgument("INT") - .DefaultValue(runnerOptions.YdbSettings.NodeCount) - .StoreResult(&runnerOptions.YdbSettings.NodeCount); - options.AddLongOption('M', "monitoring", "Enable embedded UI access and run kqprun as deamon") - .Optional() - .NoArgument() - .DefaultValue(runnerOptions.YdbSettings.MonitoringEnabled) - .SetFlag(&runnerOptions.YdbSettings.MonitoringEnabled); - - options.AddLongOption('u', "udf", "Load shared library with UDF by given path") - .Optional() - .RequiredArgument("FILE") - .AppendTo(&udfsPaths); - options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") - .Optional() - .RequiredArgument("PATH") - .StoreResult(&udfsDirectory); - options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") - .Optional() - .NoArgument() - .DefaultValue(excludeLinkedUdfs) - .SetFlag(&excludeLinkedUdfs); - - NLastGetopt::TOptsParseResult parsedOptions(&options, argc, argv); - - // Environment variables - - const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE); - - // Execution options - - if (!schemeQueryFile && scriptQueryFiles.empty() && !runnerOptions.YdbSettings.MonitoringEnabled) { - ythrow yexception() << "Nothing to execute"; - } - - if (schemeQueryFile) { - executionOptions.SchemeQuery = TFileInput(schemeQueryFile).ReadAll(); - ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, yqlToken, executionOptions.SchemeQuery); - } - - executionOptions.ScriptQueries.reserve(scriptQueryFiles.size()); - for (const TString& scriptQueryFile : scriptQueryFiles) { - executionOptions.ScriptQueries.emplace_back(TFileInput(scriptQueryFile).ReadAll()); - } - - executionOptions.ClearExecution = GetCaseVariant("clear-execution", clearExecutionType, { - {"query", TExecutionOptions::EClearExecutionCase::GenericQuery}, - {"yql-script", TExecutionOptions::EClearExecutionCase::YqlScript}, - {"disabled", TExecutionOptions::EClearExecutionCase::Disabled} - }); +class TMain : public TMainClassArgs { + inline static const TString YqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE); + inline static TVector> FileHolders; - executionOptions.ScriptQueryAction = GetCaseVariant("script-action", scriptQueryAction, { - {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, - {"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN} - }); + TExecutionOptions ExecutionOptions; + NKqpRun::TRunnerOptions RunnerOptions; - // Runner options + THashMap TablesMapping; + TVector UdfsPaths; + TString UdfsDirectory; + bool ExcludeLinkedUdfs = false; + ui64 ResultsRowsLimit = 1000; + bool EmulateYt = false; - THolder resultFileHolder = SetupDefaultFileOutput(resultOutputFile, runnerOptions.ResultOutput); - THolder schemeQueryAstFileHolder = SetupDefaultFileOutput(schemeQueryAstFile, runnerOptions.SchemeQueryAstOutput); - THolder scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput); - THolder scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput); - - if (inProgressStatisticsFile) { - runnerOptions.InProgressStatisticsOutputFile = inProgressStatisticsFile; + static TString LoadFile(const TString& file) { + return TFileInput(file).ReadAll(); } - runnerOptions.TraceOptType = GetCaseVariant("trace-opt", traceOptType, { - {"all", NKqpRun::TRunnerOptions::ETraceOptType::All}, - {"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme}, - {"script", NKqpRun::TRunnerOptions::ETraceOptType::Script}, - {"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled} - }); - runnerOptions.YdbSettings.TraceOptEnabled = runnerOptions.TraceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; - - runnerOptions.ResultOutputFormat = GetCaseVariant("result-format", resultOutputFormat, { - {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, - {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, - {"full-proto", NKqpRun::TRunnerOptions::EResultOutputFormat::FullProto} - }); - - runnerOptions.PlanOutputFormat = GetCaseVariant("plan-format", planOutputFormat, { - {"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty}, - {"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable}, - {"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode}, - }); - - // Ydb settings - - if (runnerOptions.YdbSettings.NodeCount < 1) { - ythrow yexception() << "Number of nodes less than one"; + static void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { + TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; + for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { + query.replace(position, variableTemplate.size(), variableValue); + position += variableValue.size(); + } } - if (logFile != "-") { - runnerOptions.YdbSettings.LogOutputFile = logFile; - std::remove(logFile.c_str()); + static IOutputStream* GetDefaultOutput(const TString& file) { + if (file == "-") { + return &Cout; + } + if (file) { + FileHolders.emplace_back(new TFileOutput(file)); + return FileHolders.back().get(); + } + return nullptr; } - runnerOptions.YdbSettings.YqlToken = yqlToken; - runnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(udfsDirectory, udfsPaths, excludeLinkedUdfs).Get(); + template + class TChoices { + public: + explicit TChoices(std::map choicesMap) + : ChoicesMap(std::move(choicesMap)) + {} - TString appConfigData = TFileInput(appConfigFile).ReadAll(); - if (!google::protobuf::TextFormat::ParseFromString(appConfigData, &runnerOptions.YdbSettings.AppConfig)) { - ythrow yexception() << "Bad format of app configuration"; - } + TResult operator()(const TString& choice) const { + return ChoicesMap.at(choice); + } - if (resultsRowsLimit < 0) { - ythrow yexception() << "Results rows limit less than zero"; - } - runnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(resultsRowsLimit); - - if (emulateYt) { - THashMap tablesMapping; - for (const auto& tablesMappingItem: tablesMappingList) { - TStringBuf tableName; - TStringBuf filePath; - TStringBuf(tablesMappingItem).Split('@', tableName, filePath); - if (tableName.empty() || filePath.empty()) { - ythrow yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.Root/plato.Input@input.txt"; + TVector GetChoices() const { + TVector choices; + choices.reserve(ChoicesMap.size()); + for (const auto& [choice, _] : ChoicesMap) { + choices.emplace_back(choice); } - tablesMapping[tableName] = filePath; + return choices; } - const auto& fileStorageConfig = runnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); - auto fileStorage = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)})); - auto ytFileServices = NYql::NFile::TYtFileServices::Make(runnerOptions.YdbSettings.FunctionRegistry.Get(), tablesMapping, fileStorage); - runnerOptions.YdbSettings.YtGateway = NYql::CreateYtFileGateway(ytFileServices); - runnerOptions.YdbSettings.ComputationFactory = NYql::NFile::GetYtFileFactory(ytFileServices); - } else if (!tablesMappingList.empty()) { - ythrow yexception() << "Tables mapping is not supported without emulate YT mode"; + private: + const std::map ChoicesMap; + }; + +protected: + void RegisterOptions(NLastGetopt::TOpts& options) override { + options.SetTitle("KqpRun -- tool to execute queries by using kikimr provider (instead of dq provider in DQrun tool)"); + options.AddHelpOption('h'); + options.SetFreeArgsNum(0); + + // Inputs + + options.AddLongOption('s', "scheme-query", "Scheme query to execute (typically DDL/DCL query)") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + ExecutionOptions.SchemeQuery = LoadFile(option->CurVal()); + ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, YqlToken, ExecutionOptions.SchemeQuery); + }); + options.AddLongOption('p', "script-query", "Script query to execute (typically DML query)") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal())); + }); + + options.AddLongOption('t', "table", "File with table (can be used by YT with -E flag), table@file") + .RequiredArgument("table@file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TStringBuf tableName; + TStringBuf filePath; + TStringBuf(option->CurVal()).Split('@', tableName, filePath); + if (tableName.empty() || filePath.empty()) { + ythrow yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.Root/plato.Input@input.txt"; + } + if (TablesMapping.contains(tableName)) { + ythrow yexception() << "Got duplicate table name: " << tableName; + } + TablesMapping[tableName] = filePath; + }); + + options.AddLongOption('c', "app-config", "File with app config (TAppConfig for ydb tennant)") + .RequiredArgument("file") + .DefaultValue("./configuration/app_config.conf") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TString file(option->CurValOrDef()); + if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &RunnerOptions.YdbSettings.AppConfig)) { + ythrow yexception() << "Bad format of app configuration"; + } + }); + + options.AddLongOption('u', "udf", "Load shared library with UDF by given path") + .RequiredArgument("file") + .EmplaceTo(&UdfsPaths); + options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") + .RequiredArgument("directory") + .StoreResult(&UdfsDirectory); + options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") + .NoArgument() + .SetFlag(&ExcludeLinkedUdfs); + + // Outputs + + options.AddLongOption("log-file", "File with execution logs (write in stderr if empty)") + .RequiredArgument("file") + .StoreResult(&RunnerOptions.YdbSettings.LogOutputFile) + .Handler1([](const NLastGetopt::TOptsParser* option) { + if (const TString& file = option->CurVal()) { + std::remove(file.c_str()); + } + }); + TChoices traceOpt({ + {"all", NKqpRun::TRunnerOptions::ETraceOptType::All}, + {"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme}, + {"script", NKqpRun::TRunnerOptions::ETraceOptType::Script}, + {"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled} + }); + options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation") + .RequiredArgument("trace-opt-query") + .DefaultValue("disabled") + .Choices(traceOpt.GetChoices()) + .StoreMappedResultT(&RunnerOptions.TraceOptType, [this, traceOpt](const TString& choise) { + auto traceOptType = traceOpt(choise); + RunnerOptions.YdbSettings.TraceOptEnabled = traceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; + return traceOptType; + }); + + options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") + .RequiredArgument("file") + .DefaultValue("-") + .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); + options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") + .RequiredArgument("uint") + .DefaultValue(ResultsRowsLimit) + .StoreResult(&ResultsRowsLimit); + TChoices resultFormat({ + {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, + {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, + {"full-proto", NKqpRun::TRunnerOptions::EResultOutputFormat::FullProto} + }); + options.AddLongOption('R', "result-format", "Script query result format") + .RequiredArgument("result-format") + .DefaultValue("rows") + .Choices(resultFormat.GetChoices()) + .StoreMappedResultT(&RunnerOptions.ResultOutputFormat, resultFormat); + + options.AddLongOption("scheme-ast-file", "File with scheme query ast (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.SchemeQueryAstOutput, &GetDefaultOutput); + + options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryAstOutput, &GetDefaultOutput); + + options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput); + options.AddLongOption("script-statistics", "File with script inprogress statistics") + .RequiredArgument("file") + .StoreResult(&RunnerOptions.InProgressStatisticsOutputFile); + TChoices planFormat({ + {"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty}, + {"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable}, + {"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode}, + }); + options.AddLongOption('P', "plan-format", "Script query plan format") + .RequiredArgument("plan-format") + .DefaultValue("pretty") + .Choices(planFormat.GetChoices()) + .StoreMappedResultT(&RunnerOptions.PlanOutputFormat, planFormat); + + // Pipeline settings + + TChoices executionCase({ + {"script", TExecutionOptions::EExecutionCase::GenericScript}, + {"query", TExecutionOptions::EExecutionCase::GenericQuery}, + {"yql-script", TExecutionOptions::EExecutionCase::YqlScript} + }); + options.AddLongOption('C', "execution-case", "Type of query for -p argument") + .RequiredArgument("query-type") + .DefaultValue("script") + .Choices(executionCase.GetChoices()) + .StoreMappedResultT(&ExecutionOptions.ExecutionCase, executionCase); + + TChoices scriptAction({ + {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, + {"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN} + }); + options.AddLongOption('A', "script-action", "Script query execute action") + .RequiredArgument("script-action") + .DefaultValue("execute") + .Choices(scriptAction.GetChoices()) + .StoreMappedResultT(&ExecutionOptions.ScriptQueryAction, scriptAction); + + options.AddLongOption('F', "forget", "Forget script execution operation after fetching results") + .NoArgument() + .SetFlag(&ExecutionOptions.ForgetExecution); + + // Cluster settings + + options.AddLongOption('N', "node-count", "Number of nodes to create") + .RequiredArgument("uint") + .DefaultValue(RunnerOptions.YdbSettings.NodeCount) + .StoreMappedResultT(&RunnerOptions.YdbSettings.NodeCount, [](ui32 nodeCount) { + if (nodeCount < 1) { + ythrow yexception() << "Number of nodes less than one"; + } + return nodeCount; + }); + + options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be runs as daemon") + .RequiredArgument("uint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + if (const TString& port = option->CurVal()) { + RunnerOptions.YdbSettings.MonitoringEnabled = true; + RunnerOptions.YdbSettings.MonitoringPortOffset = FromString(port.c_str()); + } + }); + + options.AddLongOption('E', "emulate-yt", "Emulate YT tables (use file gateway instead of native gateway)") + .NoArgument() + .SetFlag(&EmulateYt); + + TChoices> backtrace({ + {"heavy", &NKikimr::EnableYDBBacktraceFormat}, + {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} + }); + options.AddLongOption("backtrace", "Default backtrace format function") + .RequiredArgument("backtrace-type") + .DefaultValue("heavy") + .Choices(backtrace.GetChoices()) + .Handler1([backtrace](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + backtrace(choice)(); + }); } - RunScript(executionOptions, runnerOptions); -} + int DoRun(NLastGetopt::TOptsParseResult&&) override { + if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled) { + ythrow yexception() << "Nothing to execute"; + } + + RunnerOptions.YdbSettings.YqlToken = YqlToken; + RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); + RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit); + + if (EmulateYt) { + const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); + auto fileStorage = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)})); + auto ytFileServices = NYql::NFile::TYtFileServices::Make(RunnerOptions.YdbSettings.FunctionRegistry.Get(), TablesMapping, fileStorage); + RunnerOptions.YdbSettings.YtGateway = NYql::CreateYtFileGateway(ytFileServices); + RunnerOptions.YdbSettings.ComputationFactory = NYql::NFile::GetYtFileFactory(ytFileServices); + } else if (!TablesMapping.empty()) { + ythrow yexception() << "Tables mapping is not supported without emulate YT mode"; + } + + RunScript(ExecutionOptions, RunnerOptions); + return 0; + } +}; void KqprunTerminateHandler() { @@ -442,7 +441,7 @@ int main(int argc, const char* argv[]) { signal(SIGSEGV, &SegmentationFaultHandler); try { - RunMain(argc, argv); + TMain().Run(argc, argv); } catch (...) { NColorizer::TColors colors = NColorizer::AutoColors(Cerr); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index e7dd43b8dc6d..dd315c974e2c 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -14,13 +14,14 @@ namespace NKqpRun { constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; struct TYdbSetupSettings { - i32 NodeCount = 1; + ui32 NodeCount = 1; TString DomainName = "Root"; TDuration InitializationTimeout = TDuration::Seconds(10); bool MonitoringEnabled = false; + ui16 MonitoringPortOffset = 0; bool TraceOptEnabled = false; - TMaybe LogOutputFile; + TString LogOutputFile; TString YqlToken; TIntrusivePtr FunctionRegistry; @@ -44,11 +45,11 @@ struct TRunnerOptions { FullProto, // Columns, rows and types in proto string format }; - IOutputStream* ResultOutput = &Cout; + IOutputStream* ResultOutput = nullptr; IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; - TMaybe InProgressStatisticsOutputFile; + TString InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 4901a512e26e..da3ff9c4e353 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -295,7 +295,7 @@ class TKqpRunner::TImpl { void PrintScriptProgress(const TString& plan) const { if (Options_.InProgressStatisticsOutputFile) { - TFileOutput outputStream(*Options_.InProgressStatisticsOutputFile); + TFileOutput outputStream(Options_.InProgressStatisticsOutputFile); outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl; auto convertedPlan = plan; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index eb767fa31cdd..7d86d501494a 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -69,7 +69,7 @@ class TYdbSetup::TImpl { private: TAutoPtr CreateLogBackend() const { if (Settings_.LogOutputFile) { - return NActors::CreateFileBackend(*Settings_.LogOutputFile); + return NActors::CreateFileBackend(Settings_.LogOutputFile); } else { return NActors::CreateStderrBackend(); } @@ -138,6 +138,7 @@ class TYdbSetup::TImpl { if (Settings_.MonitoringEnabled) { serverSettings.InitKikimrRunConfig(); + serverSettings.SetMonitoringPortOffset(Settings_.MonitoringPortOffset); } return serverSettings; From e82ac973a0637a9b7f3f44c82a0fd97f275ebbaf Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 11 Jun 2024 12:43:09 +0000 Subject: [PATCH 3/5] Added loop option --- ydb/tests/tools/kqprun/kqprun.cpp | 45 +++++++++++++++++++++--- ydb/tests/tools/kqprun/src/ydb_setup.cpp | 4 ++- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 1d85b47044cf..b9cb1c9646c6 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -30,8 +30,11 @@ struct TExecutionOptions { std::vector ScriptQueries; TString SchemeQuery; + ui32 LoopCount = 1; + TDuration LoopDelay; + bool ForgetExecution = false; - EExecutionCase ExecutionCase = EExecutionCase::GenericScript; + std::vector ExecutionCases; NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; const TString TraceId = "kqprun_" + CreateGuidAsString(); @@ -39,6 +42,11 @@ struct TExecutionOptions { bool HasResults() const { return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE; } + + EExecutionCase GetExecutionCase(size_t index) const { + Y_ABORT_UNLESS(!ExecutionCases.empty()); + return ExecutionCases[std::min(index, ExecutionCases.size() - 1)]; + } }; @@ -55,9 +63,24 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } } - for (size_t id = 0; id < executionOptions.ScriptQueries.size(); ++id) { - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script" << (executionOptions.ScriptQueries.size() > 1 ? TStringBuilder() << " " << id : TString()) << "..." << colors.Default() << Endl; - switch (executionOptions.ExecutionCase) { + const size_t numberQueries = executionOptions.ScriptQueries.size(); + const size_t numberLoops = executionOptions.LoopCount; + for (size_t queryId = 0; queryId < numberQueries * numberLoops || numberLoops == 0; ++queryId) { + size_t id = queryId % numberQueries; + if (id == 0 && queryId > 0) { + Sleep(executionOptions.LoopDelay); + } + + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; + if (numberQueries > 1) { + Cout << " " << id; + } + if (numberLoops != 1) { + Cout << ", loop " << queryId / numberQueries; + } + Cout << "..." << colors.Default() << Endl; + + switch (executionOptions.GetExecutionCase(id)) { case TExecutionOptions::EExecutionCase::GenericScript: if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; @@ -334,7 +357,10 @@ class TMain : public TMainClassArgs { .RequiredArgument("query-type") .DefaultValue("script") .Choices(executionCase.GetChoices()) - .StoreMappedResultT(&ExecutionOptions.ExecutionCase, executionCase); + .Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice)); + }); TChoices scriptAction({ {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, @@ -350,6 +376,15 @@ class TMain : public TMainClassArgs { .NoArgument() .SetFlag(&ExecutionOptions.ForgetExecution); + options.AddLongOption("loop-count", "Number of runs of the script query (use 0 to start infinite loop)") + .RequiredArgument("uint") + .DefaultValue(ExecutionOptions.LoopCount) + .StoreResult(&ExecutionOptions.LoopCount); + options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") + .RequiredArgument("uint") + .DefaultValue(1000) + .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); + // Cluster settings options.AddLongOption('N', "node-count", "Number of nodes to create") diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 7d86d501494a..e98786344c44 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -198,7 +198,9 @@ class TYdbSetup::TImpl { WaitResourcesPublishing(); if (Settings_.MonitoringEnabled) { - Cout << CoutColors_.Cyan() << "Monitoring port: " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort() << Endl; + for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) { + Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl; + } } } From 6f6984ff5f908c79c0c2028cde8400c4953d1909 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 11 Jun 2024 17:00:54 +0000 Subject: [PATCH 4/5] Added async queries --- ydb/tests/tools/kqprun/kqprun.cpp | 42 +++++++++--- ydb/tests/tools/kqprun/src/actors.cpp | 17 ++--- ydb/tests/tools/kqprun/src/actors.h | 8 ++- ydb/tests/tools/kqprun/src/common.h | 2 + ydb/tests/tools/kqprun/src/kqp_runner.cpp | 83 +++++++++++++++++++++++ ydb/tests/tools/kqprun/src/kqp_runner.h | 4 ++ ydb/tests/tools/kqprun/src/ydb_setup.cpp | 45 +++++++----- ydb/tests/tools/kqprun/src/ydb_setup.h | 9 +++ 8 files changed, 172 insertions(+), 38 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index b9cb1c9646c6..b48172fa6c42 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -24,7 +24,8 @@ struct TExecutionOptions { enum class EExecutionCase { GenericScript, GenericQuery, - YqlScript + YqlScript, + AsyncQuery }; std::vector ScriptQueries; @@ -40,7 +41,16 @@ struct TExecutionOptions { const TString TraceId = "kqprun_" + CreateGuidAsString(); bool HasResults() const { - return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE; + if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) { + return false; + } + + for (EExecutionCase executionCase : ExecutionCases) { + if (executionCase != EExecutionCase::AsyncQuery) { + return true; + } + } + return false; } EExecutionCase GetExecutionCase(size_t index) const { @@ -71,14 +81,16 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner Sleep(executionOptions.LoopDelay); } - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; - if (numberQueries > 1) { - Cout << " " << id; - } - if (numberLoops != 1) { - Cout << ", loop " << queryId / numberQueries; + if (executionOptions.GetExecutionCase(id) != TExecutionOptions::EExecutionCase::AsyncQuery) { + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; + if (numberQueries > 1) { + Cout << " " << id; + } + if (numberLoops != 1) { + Cout << ", loop " << queryId / numberQueries; + } + Cout << "..." << colors.Default() << Endl; } - Cout << "..." << colors.Default() << Endl; switch (executionOptions.GetExecutionCase(id)) { case TExecutionOptions::EExecutionCase::GenericScript: @@ -108,8 +120,13 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } break; + + case TExecutionOptions::EExecutionCase::AsyncQuery: + runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId); + break; } } + runner.WaitAsyncQueries(); if (executionOptions.HasResults()) { try { @@ -351,7 +368,8 @@ class TMain : public TMainClassArgs { TChoices executionCase({ {"script", TExecutionOptions::EExecutionCase::GenericScript}, {"query", TExecutionOptions::EExecutionCase::GenericQuery}, - {"yql-script", TExecutionOptions::EExecutionCase::YqlScript} + {"yql-script", TExecutionOptions::EExecutionCase::YqlScript}, + {"async", TExecutionOptions::EExecutionCase::AsyncQuery} }); options.AddLongOption('C', "execution-case", "Type of query for -p argument") .RequiredArgument("query-type") @@ -361,6 +379,10 @@ class TMain : public TMainClassArgs { TString choice(option->CurValOrDef()); ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice)); }); + options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") + .RequiredArgument("uint") + .DefaultValue(RunnerOptions.InFlightLimit) + .StoreResult(&RunnerOptions.InFlightLimit); TChoices scriptAction({ {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 7ddf0fc4bc7e..3ead3c724998 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -11,14 +11,12 @@ namespace { class TRunScriptActorMock : public NActors::TActorBootstrapped { public: TRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, TProgressCallback progressCallback) : Request_(std::move(request)) , Promise_(promise) , ResultRowsLimit_(std::numeric_limits::max()) , ResultSizeLimit_(std::numeric_limits::max()) - , ResultSets_(resultSets) , ProgressCallback_(progressCallback) { if (resultRowsLimit) { @@ -79,7 +77,7 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped Request_; - NThreading::TPromise Promise_; + NThreading::TPromise Promise_; ui64 ResultRowsLimit_; ui64 ResultSizeLimit_; - std::vector& ResultSets_; - std::vector ResultSetSizes_; TProgressCallback ProgressCallback_; + std::vector ResultSets_; + std::vector ResultSetSizes_; }; class TResourcesWaiterActor : public NActors::TActorBootstrapped { @@ -186,10 +184,9 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, TProgressCallback progressCallback) { - return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback); + return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, progressCallback); } NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) { diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index a222f4b3e3b0..f0a8ef3d5b2d 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -4,11 +4,15 @@ namespace NKqpRun { +struct TQueryResponse { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response; + std::vector ResultSets; +}; + using TProgressCallback = std::function; NActors::IActor* CreateRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, + NThreading::TPromise promise, ui64 resultRowsLimit, ui64 resultSizeLimit, TProgressCallback progressCallback); NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index dd315c974e2c..8aab9649936d 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -55,6 +55,8 @@ struct TRunnerOptions { NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default; ETraceOptType TraceOptType = ETraceOptType::Disabled; + ui64 InFlightLimit = 0; + TYdbSetupSettings YdbSettings; }; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index da3ff9c4e353..4c890d38dea3 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -4,6 +4,8 @@ #include #include +#include + #include #include @@ -86,6 +88,34 @@ void PrintStatistics(const TString& fullStat, const THashMap& flat //// TKqpRunner::TImpl class TKqpRunner::TImpl { + struct TAsyncState { + ui64 OnStartRequest() { + InFlight++; + MaxInFlight = std::max(MaxInFlight, InFlight); + return RequestId++; + } + + void OnRequestFinished(bool success) { + InFlight--; + if (success) { + Completed++; + } else { + Failed++; + } + } + + TString GetInfoString() const { + return TStringBuilder() << "completed: " << Completed << ", failed: " << Failed << ", in flight: " << InFlight << ", max in flight: " << MaxInFlight << ", spend time: " << TInstant::Now() - Start; + } + + const TInstant Start = TInstant::Now(); + ui64 RequestId = 1; + ui64 MaxInFlight = 0; + ui64 InFlight = 0; + ui64 Completed = 0; + ui64 Failed = 0; + }; + public: enum class EQueryType { ScriptQuery, @@ -163,6 +193,45 @@ class TKqpRunner::TImpl { return true; } + void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { + TGuard lock(Mutex_); + + if (Options_.InFlightLimit && AsyncState_.InFlight >= Options_.InFlightLimit) { + AwaitInFlight_.WaitI(Mutex_); + } + + ui64 requestId = AsyncState_.OnStartRequest(); + RunningQueries_[requestId] = YdbSetup_.QueryRequestAsync(query, action, traceId, nullptr).Subscribe([this, requestId](const NThreading::TFuture& f) { + TGuard lock(Mutex_); + + auto response = f.GetValue().Response; + AsyncState_.OnRequestFinished(response.IsSuccess()); + if (response.IsSuccess()) { + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; + } else { + Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.Status << ". " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.Issues.ToString() << CoutColors_.Default(); + } + + if (AsyncState_.InFlight < Options_.InFlightLimit) { + AwaitInFlight_.Signal(); + } + if (!AsyncState_.InFlight) { + AwaitFinish_.Signal(); + } + RunningQueries_.erase(requestId); + }).IgnoreResult(); + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " started. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; + } + + void WaitAsyncQueries() { + TGuard lock(Mutex_); + + if (AsyncState_.InFlight) { + Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for async queries..." << CoutColors_.Default() << Endl; + AwaitFinish_.WaitI(Mutex_); + } + } + bool FetchScriptResults() { TYdbSetup::StopTraceOpt(); @@ -373,6 +442,12 @@ class TKqpRunner::TImpl { TString ExecutionOperation_; TExecutionMeta ExecutionMeta_; std::vector ResultSets_; + + TMutex Mutex_; + TCondVar AwaitInFlight_; + TCondVar AwaitFinish_; + TAsyncState AsyncState_; + std::unordered_map> RunningQueries_; }; @@ -394,10 +469,18 @@ bool TKqpRunner::ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction act return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::ScriptQuery); } +void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->ExecuteQueryAsync(query, action, traceId); +} + bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery); } +void TKqpRunner::WaitAsyncQueries() { + Impl_->WaitAsyncQueries(); +} + bool TKqpRunner::FetchScriptResults() { return Impl_->FetchScriptResults(); } diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index b263bbedbf55..bcff9cc8d01a 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -16,8 +16,12 @@ class TKqpRunner { bool ExecuteQuery(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void WaitAsyncQueries(); + bool FetchScriptResults(); bool ForgetExecutionOperation(); diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index e98786344c44..cd0f7261b3e8 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -60,6 +60,20 @@ class TStaticSecuredCredentialsFactory : public NYql::ISecuredServiceAccountCred TString YqlToken_; }; +TRequestResult GetQueryResult(TQueryResponse response, TQueryMeta& meta, std::vector& resultSets) { + resultSets = std::move(response.ResultSets); + + auto queryOperationResponse = response.Response->Get()->Record.GetRef(); + const auto& responseRecord = queryOperationResponse.GetResponse(); + + meta.Ast = responseRecord.GetQueryAst(); + if (const auto& plan = responseRecord.GetQueryPlan()) { + meta.Plan = plan; + } + + return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); +} + } // anonymous namespace @@ -218,7 +232,7 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector& resultSets, TProgressCallback progressCallback) const { + NThreading::TFuture QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { auto event = MakeHolder(); FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); @@ -226,12 +240,12 @@ class TYdbSetup::TImpl { event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); } - auto promise = NThreading::NewPromise(); + auto promise = NThreading::NewPromise(); auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(); auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets, progressCallback)); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, progressCallback), RandomNumber(Settings_.NodeCount)); - return promise.GetFuture().GetValueSync(); + return promise.GetFuture(); } NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { @@ -256,7 +270,7 @@ class TYdbSetup::TImpl { auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max()); - GetRuntime()->Register(fetchActor); + GetRuntime()->Register(fetchActor, RandomNumber(Settings_.NodeCount)); return GetRuntime()->GrabEdgeEvent(edgeActor); } @@ -288,7 +302,7 @@ class TYdbSetup::TImpl { template typename TResponse::TPtr RunKqpProxyRequest(THolder event) const { NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); - NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId()); + NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount))); GetRuntime()->Send(kqpProxy, edgeActor, event.Release()); @@ -378,17 +392,16 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer } TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { - resultSets.clear(); - - auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets, progressCallback)->Get()->Record.GetRef(); - const auto& responseRecord = queryOperationResponse.GetResponse(); - - meta.Ast = responseRecord.GetQueryAst(); - if (const auto& plan = responseRecord.GetQueryPlan()) { - meta.Plan = plan; - } + auto response = Impl_->QueryRequestAsync(query, action, traceId, progressCallback).GetValueSync(); + return GetQueryResult(std::move(response), meta, resultSets); +} - return TRequestResult(queryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); +NThreading::TFuture TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { + return Impl_->QueryRequestAsync(query, action, traceId, progressCallback).Apply([](const NThreading::TFuture& f) { + TQueryResult result; + result.Response = GetQueryResult(f.GetValue(), result.Meta, result.ResultSets); + return result; + }); } TRequestResult TYdbSetup::YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const { diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index 745cdae8a659..4d707f28b050 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -47,6 +47,13 @@ struct TRequestResult { }; +struct TQueryResult { + TRequestResult Response; + TQueryMeta Meta; + std::vector ResultSets; +}; + + class TYdbSetup { public: explicit TYdbSetup(const TYdbSetupSettings& settings); @@ -57,6 +64,8 @@ class TYdbSetup { TRequestResult QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const; + NThreading::TFuture QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const; + TRequestResult YqlScriptRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets) const; TRequestResult GetScriptExecutionOperationRequest(const TString& operation, TExecutionMeta& meta) const; From ce10854e53282d43af22ed0aaafb3c60658bd837 Mon Sep 17 00:00:00 2001 From: Grigoriy Pisarenko Date: Tue, 11 Jun 2024 17:43:24 +0000 Subject: [PATCH 5/5] Fixed bugs --- ydb/tests/tools/kqprun/kqprun.cpp | 2 +- ydb/tests/tools/kqprun/src/kqp_runner.cpp | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index b48172fa6c42..84c17fa204db 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -424,7 +424,7 @@ class TMain : public TMainClassArgs { .Handler1([this](const NLastGetopt::TOptsParser* option) { if (const TString& port = option->CurVal()) { RunnerOptions.YdbSettings.MonitoringEnabled = true; - RunnerOptions.YdbSettings.MonitoringPortOffset = FromString(port.c_str()); + RunnerOptions.YdbSettings.MonitoringPortOffset = FromString(port); } }); diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 4c890d38dea3..abcc69bed91a 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -199,15 +199,16 @@ class TKqpRunner::TImpl { if (Options_.InFlightLimit && AsyncState_.InFlight >= Options_.InFlightLimit) { AwaitInFlight_.WaitI(Mutex_); } - ui64 requestId = AsyncState_.OnStartRequest(); + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " started. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; + RunningQueries_[requestId] = YdbSetup_.QueryRequestAsync(query, action, traceId, nullptr).Subscribe([this, requestId](const NThreading::TFuture& f) { TGuard lock(Mutex_); auto response = f.GetValue().Response; AsyncState_.OnRequestFinished(response.IsSuccess()); if (response.IsSuccess()) { - Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << Endl; } else { Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.Status << ". " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.Issues.ToString() << CoutColors_.Default(); } @@ -220,7 +221,6 @@ class TKqpRunner::TImpl { } RunningQueries_.erase(requestId); }).IgnoreResult(); - Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " started. " << CoutColors_.Yellow() << AsyncState_.GetInfoString() << CoutColors_.Default() << "\n"; } void WaitAsyncQueries() { @@ -263,12 +263,14 @@ class TKqpRunner::TImpl { } void PrintScriptResults() const { - Cout << CoutColors_.Cyan() << "Writing script query results" << CoutColors_.Default() << Endl; - for (size_t i = 0; i < ResultSets_.size(); ++i) { - if (ResultSets_.size() > 1) { - *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; + if (Options_.ResultOutput) { + Cout << CoutColors_.Cyan() << "Writing script query results" << CoutColors_.Default() << Endl; + for (size_t i = 0; i < ResultSets_.size(); ++i) { + if (ResultSets_.size() > 1) { + *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; + } + PrintScriptResult(ResultSets_[i]); } - PrintScriptResult(ResultSets_[i]); } }