diff --git a/ydb/core/testlib/actors/test_runtime.cpp b/ydb/core/testlib/actors/test_runtime.cpp index 764063d504f3..126f18930f4b 100644 --- a/ydb/core/testlib/actors/test_runtime.cpp +++ b/ydb/core/testlib/actors/test_runtime.cpp @@ -174,7 +174,7 @@ namespace NActors { } if (NeedMonitoring && !SingleSysEnv) { - ui16 port = GetPortManager().GetPort(); + ui16 port = MonitoringPortOffset ? MonitoringPortOffset + nodeIndex : GetPortManager().GetPort(); node->Mon.Reset(new NActors::TSyncHttpMon({ .Port = port, .Threads = 10, diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 3d745d2b977b..79105fd14889 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -225,7 +225,7 @@ namespace Tests { NKikimr::SetupChannelProfiles(app); - Runtime->SetupMonitoring(); + Runtime->SetupMonitoring(Settings->MonitoringPortOffset); Runtime->SetLogBackend(Settings->LogBackend); Runtime->AddAppDataInit([this](ui32 nodeIdx, NKikimr::TAppData& appData) { diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 98629760b8ae..0fe0120d0b08 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -106,6 +106,7 @@ namespace Tests { ui16 Port; ui16 GrpcPort = 0; int GrpcMaxMessageSize = 0; // 0 - default (4_MB), -1 - no limit + ui16 MonitoringPortOffset = 0; NKikimrProto::TAuthConfig AuthConfig; NKikimrPQ::TPQConfig PQConfig; NKikimrPQ::TPQClusterDiscoveryConfig PQClusterDiscoveryConfig; @@ -159,6 +160,7 @@ namespace Tests { TServerSettings& SetGrpcPort(ui16 value) { GrpcPort = value; return *this; } TServerSettings& SetGrpcMaxMessageSize(int value) { GrpcMaxMessageSize = value; return *this; } + TServerSettings& SetMonitoringPortOffset(ui16 value) { MonitoringPortOffset = value; return *this; } TServerSettings& SetSupportsRedirect(bool value) { SupportsRedirect = value; return *this; } TServerSettings& SetTracePath(const TString& value) { TracePath = value; return *this; } TServerSettings& SetDomain(ui32 value) { Domain = value; return *this; } diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index 200690ad98fc..c93d8ea27ff6 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -1594,8 +1594,9 @@ namespace NActors { return node->DynamicCounters; } - void TTestActorRuntimeBase::SetupMonitoring() { + void TTestActorRuntimeBase::SetupMonitoring(ui16 monitoringPortOffset) { NeedMonitoring = true; + MonitoringPortOffset = monitoringPortOffset; } void TTestActorRuntimeBase::SendInternal(TAutoPtr ev, ui32 nodeIndex, bool viaActorSystem) { diff --git a/ydb/library/actors/testlib/test_runtime.h b/ydb/library/actors/testlib/test_runtime.h index d6b20ca9a357..eef097c1b7b9 100644 --- a/ydb/library/actors/testlib/test_runtime.h +++ b/ydb/library/actors/testlib/test_runtime.h @@ -295,7 +295,7 @@ namespace NActors { void EnableScheduleForActor(const TActorId& actorId, bool allow = true); bool IsScheduleForActorEnabled(const TActorId& actorId) const; TIntrusivePtr GetDynamicCounters(ui32 nodeIndex = 0); - void SetupMonitoring(); + void SetupMonitoring(ui16 monitoringPortOffset = 0); using TEventObserverCollection = std::list& event)>>; class TEventObserverHolder { @@ -655,6 +655,7 @@ namespace NActors { TIntrusivePtr DispatcherRandomProvider; TAutoPtr LogBackend; bool NeedMonitoring; + ui16 MonitoringPortOffset = 0; TIntrusivePtr RandomProvider; TIntrusivePtr TimeProvider; diff --git a/ydb/tests/fq/yt/kqprun.py b/ydb/tests/fq/yt/kqprun.py index 3179fee6bee7..4e0f86b9d1be 100644 --- a/ydb/tests/fq/yt/kqprun.py +++ b/ydb/tests/fq/yt/kqprun.py @@ -37,7 +37,7 @@ def yql_exec(self, program=None, program_file=None, verbose=False, check_error=T cmd += '--emulate-yt ' \ '--exclude-linked-udfs ' \ - '--clear-execution query ' \ + '--execution-case query ' \ '--app-config=%(config_file)s ' \ '--script-query=%(program_file)s ' \ '--scheme-query=%(scheme_file)s ' \ diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 30ad08e628e8..50d92cdd2827 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -6,36 +6,56 @@ #include #include +#include #include #include +#include + #include #include #include #include #include -#include struct TExecutionOptions { - enum class EClearExecutionCase { - Disabled, + enum class EExecutionCase { + GenericScript, GenericQuery, - YqlScript + YqlScript, + AsyncQuery }; std::vector ScriptQueries; TString SchemeQuery; + ui32 LoopCount = 1; + TDuration LoopDelay; + bool ForgetExecution = false; - EClearExecutionCase ClearExecution = EClearExecutionCase::Disabled; + std::vector ExecutionCases; NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; - TString TraceId = "kqprun_" + CreateGuidAsString(); + const TString TraceId = "kqprun_" + CreateGuidAsString(); bool HasResults() const { - return !ScriptQueries.empty() && ScriptQueryAction == NKikimrKqp::QUERY_ACTION_EXECUTE; + if (ScriptQueries.empty() || ScriptQueryAction != NKikimrKqp::QUERY_ACTION_EXECUTE) { + return false; + } + + for (EExecutionCase executionCase : ExecutionCases) { + if (executionCase != EExecutionCase::AsyncQuery) { + return true; + } + } + return false; + } + + EExecutionCase GetExecutionCase(size_t index) const { + Y_ABORT_UNLESS(!ExecutionCases.empty()); + return ExecutionCases[std::min(index, ExecutionCases.size() - 1)]; } }; @@ -53,10 +73,28 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } } - for (size_t id = 0; id < executionOptions.ScriptQueries.size(); ++id) { - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script" << (executionOptions.ScriptQueries.size() > 1 ? TStringBuilder() << " " << id : TString()) << "..." << colors.Default() << Endl; - switch (executionOptions.ClearExecution) { - case TExecutionOptions::EClearExecutionCase::Disabled: + const size_t numberQueries = executionOptions.ScriptQueries.size(); + const size_t numberLoops = executionOptions.LoopCount; + for (size_t queryId = 0; queryId < numberQueries * numberLoops || numberLoops == 0; ++queryId) { + size_t id = queryId % numberQueries; + if (id == 0 && queryId > 0) { + Sleep(executionOptions.LoopDelay); + } + + const auto executionCase = executionOptions.GetExecutionCase(id); + if (executionCase != TExecutionOptions::EExecutionCase::AsyncQuery) { + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing script"; + if (numberQueries > 1) { + Cout << " " << id; + } + if (numberLoops != 1) { + Cout << ", loop " << queryId / numberQueries; + } + Cout << "..." << colors.Default() << Endl; + } + + switch (executionCase) { + case TExecutionOptions::EExecutionCase::GenericScript: if (!runner.ExecuteScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Script execution failed"; } @@ -72,19 +110,24 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } break; - case TExecutionOptions::EClearExecutionCase::GenericQuery: + case TExecutionOptions::EExecutionCase::GenericQuery: if (!runner.ExecuteQuery(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; } break; - case TExecutionOptions::EClearExecutionCase::YqlScript: + case TExecutionOptions::EExecutionCase::YqlScript: if (!runner.ExecuteYqlScript(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId)) { ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Yql script execution failed"; } break; + + case TExecutionOptions::EExecutionCase::AsyncQuery: + runner.ExecuteQueryAsync(executionOptions.ScriptQueries[id], executionOptions.ScriptQueryAction, executionOptions.TraceId); + break; } } + runner.WaitAsyncQueries(); if (executionOptions.HasResults()) { try { @@ -111,37 +154,6 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner } -THolder SetupDefaultFileOutput(const TString& filePath, IOutputStream*& stream) { - THolder fileHolder; - if (filePath == "-") { - stream = &Cout; - } else if (filePath) { - fileHolder.Reset(new TFileOutput(filePath)); - stream = fileHolder.Get(); - } - return fileHolder; -} - - -template -EnumType GetCaseVariant(const TString& optionName, const TString& caseName, const std::map& casesMap) { - auto it = casesMap.find(caseName); - if (it == casesMap.end()) { - ythrow yexception() << "Option '" << optionName << "' has no case '" << caseName << "'"; - } - return it->second; -} - - -void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { - TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; - for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { - query.replace(position, variableTemplate.size(), variableValue); - position += variableValue.size(); - } -} - - TIntrusivePtr CreateFunctionRegistry(const TString& udfsDirectory, TVector udfsPaths, bool excludeLinkedUdfs) { if (!udfsDirectory.empty() || !udfsPaths.empty()) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); @@ -149,7 +161,7 @@ TIntrusivePtr CreateFunctionRegistr } NKikimr::NMiniKQL::FindUdfsInDir(udfsDirectory, &udfsPaths); - auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); + auto functionRegistry = NKikimr::NMiniKQL::CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, udfsPaths)->Clone(); if (excludeLinkedUdfs) { for (const auto& wrapper : NYql::NUdf::GetStaticUdfModuleWrapperList()) { @@ -166,253 +178,298 @@ TIntrusivePtr CreateFunctionRegistr } -void RunMain(int argc, const char* argv[]) { - TExecutionOptions executionOptions; - NKqpRun::TRunnerOptions runnerOptions; - - std::vector scriptQueryFiles; - TString schemeQueryFile; - TString resultOutputFile = "-"; - TString schemeQueryAstFile; - TString scriptQueryAstFile; - TString scriptQueryPlanFile; - TString inProgressStatisticsFile; - TString logFile = "-"; - TString appConfigFile = "./configuration/app_config.conf"; - std::vector tablesMappingList; - - TString clearExecutionType = "disabled"; - TString traceOptType = "disabled"; - TString scriptQueryAction = "execute"; - TString planOutputFormat = "pretty"; - TString resultOutputFormat = "rows"; - i64 resultsRowsLimit = 1000; - bool emulateYt = false; - - TVector udfsPaths; - TString udfsDirectory; - bool excludeLinkedUdfs = false; - - NLastGetopt::TOpts options = NLastGetopt::TOpts::Default(); - options.AddLongOption('p', "script-query", "Script query to execute") - .Optional() - .RequiredArgument("FILE") - .AppendTo(&scriptQueryFiles); - options.AddLongOption('s', "scheme-query", "Scheme query to execute") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&schemeQueryFile); - options.AddLongOption('c', "app-config", "File with app config (TAppConfig)") - .Optional() - .RequiredArgument("FILE") - .DefaultValue(appConfigFile) - .StoreResult(&appConfigFile); - options.AddLongOption('t', "table", "File with table (can be used by YT with -E flag), table@file") - .Optional() - .RequiredArgument("FILE") - .AppendTo(&tablesMappingList); - - options.AddLongOption("log-file", "File with execution logs (use '-' to write in stderr)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&logFile); - options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&resultOutputFile); - options.AddLongOption("scheme-ast-file", "File with scheme query ast (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&schemeQueryAstFile); - options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&scriptQueryAstFile); - options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&scriptQueryPlanFile); - options.AddLongOption("in-progress-statistics", "File with script inprogress statistics") - .Optional() - .RequiredArgument("FILE") - .StoreResult(&inProgressStatisticsFile); - - options.AddLongOption('C', "clear-execution", "Execute script query without creating additional tables, one of { query | yql-script }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(clearExecutionType) - .StoreResult(&clearExecutionType); - options.AddLongOption('F', "forget", "Forget script execution operation after fetching results, cannot be used with -C") - .Optional() - .NoArgument() - .DefaultValue(executionOptions.ForgetExecution) - .SetFlag(&executionOptions.ForgetExecution); - options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation, one of { scheme | script | all }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(traceOptType) - .StoreResult(&traceOptType); - options.AddLongOption('A', "script-action", "Script query execute action, one of { execute | explain }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(scriptQueryAction) - .StoreResult(&scriptQueryAction); - options.AddLongOption('P', "plan-format", "Script query plan format, one of { pretty | table | json }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(planOutputFormat) - .StoreResult(&planOutputFormat); - options.AddLongOption('R', "result-format", "Script query result format, one of { rows | full-json | full-proto }") - .Optional() - .RequiredArgument("STR") - .DefaultValue(resultOutputFormat) - .StoreResult(&resultOutputFormat); - options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") - .Optional() - .RequiredArgument("INT") - .DefaultValue(resultsRowsLimit) - .StoreResult(&resultsRowsLimit); - options.AddLongOption('E', "emulate-yt", "Emulate YT tables") - .Optional() - .NoArgument() - .DefaultValue(emulateYt) - .SetFlag(&emulateYt); - options.AddLongOption('N', "node-count", "Number of nodes to create") - .Optional() - .RequiredArgument("INT") - .DefaultValue(runnerOptions.YdbSettings.NodeCount) - .StoreResult(&runnerOptions.YdbSettings.NodeCount); - options.AddLongOption('M', "monitoring", "Enable embedded UI access and run kqprun as deamon") - .Optional() - .NoArgument() - .DefaultValue(runnerOptions.YdbSettings.MonitoringEnabled) - .SetFlag(&runnerOptions.YdbSettings.MonitoringEnabled); - - options.AddLongOption('u', "udf", "Load shared library with UDF by given path") - .Optional() - .RequiredArgument("FILE") - .AppendTo(&udfsPaths); - options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") - .Optional() - .RequiredArgument("PATH") - .StoreResult(&udfsDirectory); - options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") - .Optional() - .NoArgument() - .DefaultValue(excludeLinkedUdfs) - .SetFlag(&excludeLinkedUdfs); - - NLastGetopt::TOptsParseResult parsedOptions(&options, argc, argv); - - // Environment variables - - const TString& yqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE); - - // Execution options - - if (!schemeQueryFile && scriptQueryFiles.empty() && !runnerOptions.YdbSettings.MonitoringEnabled) { - ythrow yexception() << "Nothing to execute"; - } - - if (schemeQueryFile) { - executionOptions.SchemeQuery = TFileInput(schemeQueryFile).ReadAll(); - ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, yqlToken, executionOptions.SchemeQuery); - } - - executionOptions.ScriptQueries.reserve(scriptQueryFiles.size()); - for (const TString& scriptQueryFile : scriptQueryFiles) { - executionOptions.ScriptQueries.emplace_back(TFileInput(scriptQueryFile).ReadAll()); - } - - executionOptions.ClearExecution = GetCaseVariant("clear-execution", clearExecutionType, { - {"query", TExecutionOptions::EClearExecutionCase::GenericQuery}, - {"yql-script", TExecutionOptions::EClearExecutionCase::YqlScript}, - {"disabled", TExecutionOptions::EClearExecutionCase::Disabled} - }); - - executionOptions.ScriptQueryAction = GetCaseVariant("script-action", scriptQueryAction, { - {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, - {"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN} - }); +class TMain : public TMainClassArgs { + inline static const TString YqlToken = GetEnv(NKqpRun::YQL_TOKEN_VARIABLE); + inline static std::vector> FileHolders; - // Runner options + TExecutionOptions ExecutionOptions; + NKqpRun::TRunnerOptions RunnerOptions; - THolder resultFileHolder = SetupDefaultFileOutput(resultOutputFile, runnerOptions.ResultOutput); - THolder schemeQueryAstFileHolder = SetupDefaultFileOutput(schemeQueryAstFile, runnerOptions.SchemeQueryAstOutput); - THolder scriptQueryAstFileHolder = SetupDefaultFileOutput(scriptQueryAstFile, runnerOptions.ScriptQueryAstOutput); - THolder scriptQueryPlanFileHolder = SetupDefaultFileOutput(scriptQueryPlanFile, runnerOptions.ScriptQueryPlanOutput); + THashMap TablesMapping; + TVector UdfsPaths; + TString UdfsDirectory; + bool ExcludeLinkedUdfs = false; + ui64 ResultsRowsLimit = 1000; + bool EmulateYt = false; - if (inProgressStatisticsFile) { - runnerOptions.InProgressStatisticsOutputFile = inProgressStatisticsFile; + static TString LoadFile(const TString& file) { + return TFileInput(file).ReadAll(); } - runnerOptions.TraceOptType = GetCaseVariant("trace-opt", traceOptType, { - {"all", NKqpRun::TRunnerOptions::ETraceOptType::All}, - {"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme}, - {"script", NKqpRun::TRunnerOptions::ETraceOptType::Script}, - {"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled} - }); - runnerOptions.YdbSettings.TraceOptEnabled = runnerOptions.TraceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; - - runnerOptions.ResultOutputFormat = GetCaseVariant("result-format", resultOutputFormat, { - {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, - {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, - {"full-proto", NKqpRun::TRunnerOptions::EResultOutputFormat::FullProto} - }); - - runnerOptions.PlanOutputFormat = GetCaseVariant("plan-format", planOutputFormat, { - {"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty}, - {"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable}, - {"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode}, - }); - - // Ydb settings - - if (runnerOptions.YdbSettings.NodeCount < 1) { - ythrow yexception() << "Number of nodes less than one"; + static void ReplaceTemplate(const TString& variableName, const TString& variableValue, TString& query) { + TString variableTemplate = TStringBuilder() << "${" << variableName << "}"; + for (size_t position = query.find(variableTemplate); position != TString::npos; position = query.find(variableTemplate, position)) { + query.replace(position, variableTemplate.size(), variableValue); + position += variableValue.size(); + } } - if (logFile != "-") { - runnerOptions.YdbSettings.LogOutputFile = logFile; - std::remove(logFile.c_str()); + static IOutputStream* GetDefaultOutput(const TString& file) { + if (file == "-") { + return &Cout; + } + if (file) { + FileHolders.emplace_back(new TFileOutput(file)); + return FileHolders.back().get(); + } + return nullptr; } - runnerOptions.YdbSettings.YqlToken = yqlToken; - runnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(udfsDirectory, udfsPaths, excludeLinkedUdfs).Get(); + template + class TChoices { + public: + explicit TChoices(std::map choicesMap) + : ChoicesMap(std::move(choicesMap)) + {} - TString appConfigData = TFileInput(appConfigFile).ReadAll(); - if (!google::protobuf::TextFormat::ParseFromString(appConfigData, &runnerOptions.YdbSettings.AppConfig)) { - ythrow yexception() << "Bad format of app configuration"; - } + TResult operator()(const TString& choice) const { + return ChoicesMap.at(choice); + } - if (resultsRowsLimit < 0) { - ythrow yexception() << "Results rows limit less than zero"; - } - runnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(resultsRowsLimit); - - if (emulateYt) { - THashMap tablesMapping; - for (const auto& tablesMappingItem: tablesMappingList) { - TStringBuf tableName; - TStringBuf filePath; - TStringBuf(tablesMappingItem).Split('@', tableName, filePath); - if (tableName.empty() || filePath.empty()) { - ythrow yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.Root/plato.Input@input.txt"; + TVector GetChoices() const { + TVector choices; + choices.reserve(ChoicesMap.size()); + for (const auto& [choice, _] : ChoicesMap) { + choices.emplace_back(choice); } - tablesMapping[tableName] = filePath; + return choices; } - const auto& fileStorageConfig = runnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); - auto fileStorage = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)})); - auto ytFileServices = NYql::NFile::TYtFileServices::Make(runnerOptions.YdbSettings.FunctionRegistry.Get(), tablesMapping, fileStorage); - runnerOptions.YdbSettings.YtGateway = NYql::CreateYtFileGateway(ytFileServices); - runnerOptions.YdbSettings.ComputationFactory = NYql::NFile::GetYtFileFactory(ytFileServices); - } else if (!tablesMappingList.empty()) { - ythrow yexception() << "Tables mapping is not supported without emulate YT mode"; + private: + const std::map ChoicesMap; + }; + +protected: + void RegisterOptions(NLastGetopt::TOpts& options) override { + options.SetTitle("KqpRun -- tool to execute queries by using kikimr provider (instead of dq provider in DQrun tool)"); + options.AddHelpOption('h'); + options.SetFreeArgsNum(0); + + // Inputs + + options.AddLongOption('s', "scheme-query", "Scheme query to execute (typically DDL/DCL query)") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + ExecutionOptions.SchemeQuery = LoadFile(option->CurVal()); + ReplaceTemplate(NKqpRun::YQL_TOKEN_VARIABLE, YqlToken, ExecutionOptions.SchemeQuery); + }); + options.AddLongOption('p', "script-query", "Script query to execute (typically DML query)") + .RequiredArgument("file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + ExecutionOptions.ScriptQueries.emplace_back(LoadFile(option->CurVal())); + }); + + options.AddLongOption('t', "table", "File with input table (can be used by YT with -E flag), table@file") + .RequiredArgument("table@file") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TStringBuf tableName; + TStringBuf filePath; + TStringBuf(option->CurVal()).Split('@', tableName, filePath); + if (tableName.empty() || filePath.empty()) { + ythrow yexception() << "Incorrect table mapping, expected form table@file, e.g. yt.Root/plato.Input@input.txt"; + } + if (TablesMapping.contains(tableName)) { + ythrow yexception() << "Got duplicate table name: " << tableName; + } + TablesMapping[tableName] = filePath; + }); + + options.AddLongOption('c', "app-config", "File with app config (TAppConfig for ydb tennant)") + .RequiredArgument("file") + .DefaultValue("./configuration/app_config.conf") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + TString file(option->CurValOrDef()); + if (!google::protobuf::TextFormat::ParseFromString(LoadFile(file), &RunnerOptions.YdbSettings.AppConfig)) { + ythrow yexception() << "Bad format of app configuration"; + } + }); + + options.AddLongOption('u', "udf", "Load shared library with UDF by given path") + .RequiredArgument("file") + .EmplaceTo(&UdfsPaths); + options.AddLongOption("udfs-dir", "Load all shared libraries with UDFs found in given directory") + .RequiredArgument("directory") + .StoreResult(&UdfsDirectory); + options.AddLongOption("exclude-linked-udfs", "Exclude linked udfs when same udf passed from -u or --udfs-dir") + .NoArgument() + .SetFlag(&ExcludeLinkedUdfs); + + // Outputs + + options.AddLongOption("log-file", "File with execution logs (writes in stderr if empty)") + .RequiredArgument("file") + .StoreResult(&RunnerOptions.YdbSettings.LogOutputFile) + .Handler1([](const NLastGetopt::TOptsParser* option) { + if (const TString& file = option->CurVal()) { + std::remove(file.c_str()); + } + }); + TChoices traceOpt({ + {"all", NKqpRun::TRunnerOptions::ETraceOptType::All}, + {"scheme", NKqpRun::TRunnerOptions::ETraceOptType::Scheme}, + {"script", NKqpRun::TRunnerOptions::ETraceOptType::Script}, + {"disabled", NKqpRun::TRunnerOptions::ETraceOptType::Disabled} + }); + options.AddLongOption('T', "trace-opt", "print AST in the begin of each transformation") + .RequiredArgument("trace-opt-query") + .DefaultValue("disabled") + .Choices(traceOpt.GetChoices()) + .StoreMappedResultT(&RunnerOptions.TraceOptType, [this, traceOpt](const TString& choise) { + auto traceOptType = traceOpt(choise); + RunnerOptions.YdbSettings.TraceOptEnabled = traceOptType != NKqpRun::TRunnerOptions::ETraceOptType::Disabled; + return traceOptType; + }); + + options.AddLongOption("result-file", "File with script execution results (use '-' to write in stdout)") + .RequiredArgument("file") + .DefaultValue("-") + .StoreMappedResultT(&RunnerOptions.ResultOutput, &GetDefaultOutput); + options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results") + .RequiredArgument("uint") + .DefaultValue(ResultsRowsLimit) + .StoreResult(&ResultsRowsLimit); + TChoices resultFormat({ + {"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson}, + {"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson}, + {"full-proto", NKqpRun::TRunnerOptions::EResultOutputFormat::FullProto} + }); + options.AddLongOption('R', "result-format", "Script query result format") + .RequiredArgument("result-format") + .DefaultValue("rows") + .Choices(resultFormat.GetChoices()) + .StoreMappedResultT(&RunnerOptions.ResultOutputFormat, resultFormat); + + options.AddLongOption("scheme-ast-file", "File with scheme query ast (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.SchemeQueryAstOutput, &GetDefaultOutput); + + options.AddLongOption("script-ast-file", "File with script query ast (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryAstOutput, &GetDefaultOutput); + + options.AddLongOption("script-plan-file", "File with script query plan (use '-' to write in stdout)") + .RequiredArgument("file") + .StoreMappedResultT(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput); + options.AddLongOption("script-statistics", "File with script inprogress statistics") + .RequiredArgument("file") + .StoreResult(&RunnerOptions.InProgressStatisticsOutputFile); + TChoices planFormat({ + {"pretty", NYdb::NConsoleClient::EOutputFormat::Pretty}, + {"table", NYdb::NConsoleClient::EOutputFormat::PrettyTable}, + {"json", NYdb::NConsoleClient::EOutputFormat::JsonUnicode}, + }); + options.AddLongOption('P', "plan-format", "Script query plan format") + .RequiredArgument("plan-format") + .DefaultValue("pretty") + .Choices(planFormat.GetChoices()) + .StoreMappedResultT(&RunnerOptions.PlanOutputFormat, planFormat); + + // Pipeline settings + + TChoices executionCase({ + {"script", TExecutionOptions::EExecutionCase::GenericScript}, + {"query", TExecutionOptions::EExecutionCase::GenericQuery}, + {"yql-script", TExecutionOptions::EExecutionCase::YqlScript}, + {"async", TExecutionOptions::EExecutionCase::AsyncQuery} + }); + options.AddLongOption('C', "execution-case", "Type of query for -p argument") + .RequiredArgument("query-type") + .DefaultValue("script") + .Choices(executionCase.GetChoices()) + .Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + ExecutionOptions.ExecutionCases.emplace_back(executionCase(choice)); + }); + options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") + .RequiredArgument("uint") + .DefaultValue(RunnerOptions.YdbSettings.InFlightLimit) + .StoreResult(&RunnerOptions.YdbSettings.InFlightLimit); + + TChoices scriptAction({ + {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, + {"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN} + }); + options.AddLongOption('A', "script-action", "Script query execute action") + .RequiredArgument("script-action") + .DefaultValue("execute") + .Choices(scriptAction.GetChoices()) + .StoreMappedResultT(&ExecutionOptions.ScriptQueryAction, scriptAction); + + options.AddLongOption('F', "forget", "Forget script execution operation after fetching results") + .NoArgument() + .SetFlag(&ExecutionOptions.ForgetExecution); + + options.AddLongOption("loop-count", "Number of runs of the script query (use 0 to start infinite loop)") + .RequiredArgument("uint") + .DefaultValue(ExecutionOptions.LoopCount) + .StoreResult(&ExecutionOptions.LoopCount); + options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") + .RequiredArgument("uint") + .DefaultValue(1000) + .StoreMappedResultT(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds); + + // Cluster settings + + options.AddLongOption('N', "node-count", "Number of nodes to create") + .RequiredArgument("uint") + .DefaultValue(RunnerOptions.YdbSettings.NodeCount) + .StoreMappedResultT(&RunnerOptions.YdbSettings.NodeCount, [](ui32 nodeCount) { + if (nodeCount < 1) { + ythrow yexception() << "Number of nodes less than one"; + } + return nodeCount; + }); + + options.AddLongOption('M', "monitoring", "Embedded UI port (use 0 to start on random free port), if used kqprun will be runs as daemon") + .RequiredArgument("uint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + if (const TString& port = option->CurVal()) { + RunnerOptions.YdbSettings.MonitoringEnabled = true; + RunnerOptions.YdbSettings.MonitoringPortOffset = FromString(port); + } + }); + + options.AddLongOption('E', "emulate-yt", "Emulate YT tables (use file gateway instead of native gateway)") + .NoArgument() + .SetFlag(&EmulateYt); + + TChoices> backtrace({ + {"heavy", &NKikimr::EnableYDBBacktraceFormat}, + {"light", []() { SetFormatBackTraceFn(FormatBackTrace); }} + }); + options.AddLongOption("backtrace", "Default backtrace format function") + .RequiredArgument("backtrace-type") + .DefaultValue("heavy") + .Choices(backtrace.GetChoices()) + .Handler1([backtrace](const NLastGetopt::TOptsParser* option) { + TString choice(option->CurValOrDef()); + backtrace(choice)(); + }); } - RunScript(executionOptions, runnerOptions); -} + int DoRun(NLastGetopt::TOptsParseResult&&) override { + if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled) { + ythrow yexception() << "Nothing to execute"; + } + + RunnerOptions.YdbSettings.YqlToken = YqlToken; + RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get(); + RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit); + + if (EmulateYt) { + const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage(); + auto fileStorage = WithAsync(CreateFileStorage(fileStorageConfig, {MakeYtDownloader(fileStorageConfig)})); + auto ytFileServices = NYql::NFile::TYtFileServices::Make(RunnerOptions.YdbSettings.FunctionRegistry.Get(), TablesMapping, fileStorage); + RunnerOptions.YdbSettings.YtGateway = NYql::CreateYtFileGateway(ytFileServices); + RunnerOptions.YdbSettings.ComputationFactory = NYql::NFile::GetYtFileFactory(ytFileServices); + } else if (!TablesMapping.empty()) { + ythrow yexception() << "Tables mapping is not supported without emulate YT mode"; + } + + RunScript(ExecutionOptions, RunnerOptions); + return 0; + } +}; void KqprunTerminateHandler() { @@ -442,7 +499,7 @@ int main(int argc, const char* argv[]) { signal(SIGSEGV, &SegmentationFaultHandler); try { - RunMain(argc, argv); + TMain().Run(argc, argv); } catch (...) { NColorizer::TColors colors = NColorizer::AutoColors(Cerr); diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 16dd8e0c1e28..6486938d1638 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -1,5 +1,7 @@ #include "actors.h" +#include + #include #include @@ -10,28 +12,25 @@ namespace { class TRunScriptActorMock : public NActors::TActorBootstrapped { public: - TRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, - TProgressCallback progressCallback) - : Request_(std::move(request)) + TRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback) + : TargetNode(request.TargetNode) + , Request_(std::move(request.Event)) , Promise_(promise) , ResultRowsLimit_(std::numeric_limits::max()) , ResultSizeLimit_(std::numeric_limits::max()) - , ResultSets_(resultSets) , ProgressCallback_(progressCallback) { - if (resultRowsLimit) { - ResultRowsLimit_ = resultRowsLimit; + if (request.ResultRowsLimit) { + ResultRowsLimit_ = request.ResultRowsLimit; } - if (resultSizeLimit) { - ResultSizeLimit_ = resultSizeLimit; + if (request.ResultSizeLimit) { + ResultSizeLimit_ = request.ResultSizeLimit; } } void Bootstrap() { NActors::ActorIdToProto(SelfId(), Request_->Record.MutableRequestActorId()); - Send(NKikimr::NKqp::MakeKqpProxyID(SelfId().NodeId()), std::move(Request_)); + Send(NKikimr::NKqp::MakeKqpProxyID(TargetNode), std::move(Request_)); Become(&TRunScriptActorMock::StateFunc); } @@ -50,30 +49,36 @@ class TRunScriptActorMock : public NActors::TActorBootstrappedGet()->Record.GetQueryResultIndex(); if (resultSetIndex >= ResultSets_.size()) { ResultSets_.resize(resultSetIndex + 1); + ResultSetSizes_.resize(resultSetIndex + 1, 0); } if (!ResultSets_[resultSetIndex].truncated()) { + ui64& resultSetSize = ResultSetSizes_[resultSetIndex]; for (auto& row : *ev->Get()->Record.MutableResultSet()->mutable_rows()) { if (static_cast(ResultSets_[resultSetIndex].rows_size()) >= ResultRowsLimit_) { ResultSets_[resultSetIndex].set_truncated(true); break; } - if (ResultSets_[resultSetIndex].ByteSizeLong() + row.ByteSizeLong() > ResultSizeLimit_) { + auto rowSize = row.ByteSizeLong(); + if (resultSetSize + rowSize > ResultSizeLimit_) { ResultSets_[resultSetIndex].set_truncated(true); break; } + resultSetSize += rowSize; *ResultSets_[resultSetIndex].add_rows() = std::move(row); } - *ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns(); + if (!ResultSets_[resultSetIndex].columns_size()) { + *ResultSets_[resultSetIndex].mutable_columns() = ev->Get()->Record.GetResultSet().columns(); + } } Send(ev->Sender, response.Release()); } void Handle(NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { - Promise_.SetValue(std::move(ev)); + Promise_.SetValue(TQueryResponse{.Response = std::move(ev), .ResultSets = std::move(ResultSets_)}); PassAway(); } @@ -84,33 +89,116 @@ class TRunScriptActorMock : public NActors::TActorBootstrapped Request_; - NThreading::TPromise Promise_; + ui32 TargetNode = 0; + std::unique_ptr Request_; + NThreading::TPromise Promise_; ui64 ResultRowsLimit_; ui64 ResultSizeLimit_; - std::vector& ResultSets_; TProgressCallback ProgressCallback_; + std::vector ResultSets_; + std::vector ResultSetSizes_; }; -class TResourcesWaiterActor : public NActors::TActorBootstrapped { - struct TEvPrivate { - enum EEv : ui32 { - EvResourcesInfo = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), +class TAsyncQueryRunnerActor : public NActors::TActor { + using TBase = NActors::TActor; + +public: + TAsyncQueryRunnerActor(ui64 inFlightLimit) + : TBase(&TAsyncQueryRunnerActor::StateFunc) + , InFlightLimit_(inFlightLimit) + { + RunningRequests_.reserve(InFlightLimit_); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvStartAsyncQuery, Handle); + hFunc(TEvPrivate::TEvAsyncQueryFinished, Handle); + hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle); + ) + + void Handle(TEvPrivate::TEvStartAsyncQuery::TPtr& ev) { + DelayedRequests_.emplace(std::move(ev)); + StartDelayedRequests(); + } + + void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) { + const ui64 requestId = ev->Get()->RequestId; + RunningRequests_.erase(requestId); + + const auto& response = ev->Get()->Result.Response->Get()->Record.GetRef(); + const auto status = response.GetYdbStatus(); + + if (status == Ydb::StatusIds::SUCCESS) { + Completed_++; + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + } else { + Failed_++; + NYql::TIssues issues; + NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues); + Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default(); + } + + StartDelayedRequests(); + TryFinalize(); + } - EvEnd - }; + void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) { + FinalizePromise_ = ev->Get()->FinalizePromise; + if (!TryFinalize()) { + Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl; + } + } + +private: + void StartDelayedRequests() { + while (!DelayedRequests_.empty() && (!InFlightLimit_ || RunningRequests_.size() < InFlightLimit_)) { + auto request = std::move(DelayedRequests_.front()); + DelayedRequests_.pop(); + + auto promise = NThreading::NewPromise(); + Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr)); + RunningRequests_[RequestId_] = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture& f) { + Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue()))); + }); + + MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size()); + Cout << TStringBuilder() << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + + RequestId_++; + request->Get()->StartPromise.SetValue(); + } + } + + bool TryFinalize() { + if (!FinalizePromise_ || !RunningRequests_.empty()) { + return false; + } - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + FinalizePromise_->SetValue(); + PassAway(); + return true; + } - struct TEvResourcesInfo : public NActors::TEventLocal { - explicit TEvResourcesInfo(i32 nodeCount) - : NodeCount(nodeCount) - {} + TString GetInfoString() const { + return TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; + } - const i32 NodeCount; - }; - }; +private: + const ui64 InFlightLimit_; + const TInstant StartTime_ = TInstant::Now(); + const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout); + + std::optional> FinalizePromise_; + std::queue DelayedRequests_; + std::unordered_map> RunningRequests_; + + ui64 RequestId_ = 1; + ui64 MaxInFlight_ = 0; + ui64 Completed_ = 0; + ui64 Failed_ = 0; +}; +class TResourcesWaiterActor : public NActors::TActorBootstrapped { static constexpr TDuration REFRESH_PERIOD = TDuration::MilliSeconds(10); public: @@ -178,11 +266,12 @@ class TResourcesWaiterActor : public NActors::TActorBootstrapped request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, - TProgressCallback progressCallback) { - return new TRunScriptActorMock(std::move(request), promise, resultRowsLimit, resultSizeLimit, resultSets, progressCallback); +NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback) { + return new TRunScriptActorMock(std::move(request), promise, progressCallback); +} + +NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit) { + return new TAsyncQueryRunnerActor(inFlightLimit); } NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount) { diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index a222f4b3e3b0..e7f0c38680f7 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -4,12 +4,73 @@ namespace NKqpRun { +struct TQueryResponse { + NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response; + std::vector ResultSets; +}; + +struct TQueryRequest { + std::unique_ptr Event; + ui32 TargetNode; + ui64 ResultRowsLimit; + ui64 ResultSizeLimit; +}; + +struct TEvPrivate { + enum EEv : ui32 { + EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvAsyncQueryFinished, + EvFinalizeAsyncQueryRunner, + + EvResourcesInfo, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvStartAsyncQuery : public NActors::TEventLocal { + TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise startPromise) + : Request(std::move(request)) + , StartPromise(startPromise) + {} + + TQueryRequest Request; + NThreading::TPromise StartPromise; + }; + + struct TEvAsyncQueryFinished : public NActors::TEventLocal { + TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result) + : RequestId(requestId) + , Result(std::move(result)) + {} + + const ui64 RequestId; + const TQueryResponse Result; + }; + + struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal { + explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise finalizePromise) + : FinalizePromise(finalizePromise) + {} + + NThreading::TPromise FinalizePromise; + }; + + struct TEvResourcesInfo : public NActors::TEventLocal { + explicit TEvResourcesInfo(i32 nodeCount) + : NodeCount(nodeCount) + {} + + const i32 NodeCount; + }; +}; + using TProgressCallback = std::function; -NActors::IActor* CreateRunScriptActorMock(THolder request, - NThreading::TPromise promise, - ui64 resultRowsLimit, ui64 resultSizeLimit, std::vector& resultSets, - TProgressCallback progressCallback); +NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise promise, TProgressCallback progressCallback); + +NActors::IActor* CreateAsyncQueryRunnerActor(ui64 inFlightLimit); NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise promise, i32 expectedNodeCount); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index e7dd43b8dc6d..31abec30971b 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -14,19 +14,22 @@ namespace NKqpRun { constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; struct TYdbSetupSettings { - i32 NodeCount = 1; + ui32 NodeCount = 1; TString DomainName = "Root"; TDuration InitializationTimeout = TDuration::Seconds(10); bool MonitoringEnabled = false; + ui16 MonitoringPortOffset = 0; bool TraceOptEnabled = false; - TMaybe LogOutputFile; + TString LogOutputFile; TString YqlToken; TIntrusivePtr FunctionRegistry; NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory; TIntrusivePtr YtGateway; NKikimrConfig::TAppConfig AppConfig; + + ui64 InFlightLimit = 0; }; @@ -44,11 +47,11 @@ struct TRunnerOptions { FullProto, // Columns, rows and types in proto string format }; - IOutputStream* ResultOutput = &Cout; + IOutputStream* ResultOutput = nullptr; IOutputStream* SchemeQueryAstOutput = nullptr; IOutputStream* ScriptQueryAstOutput = nullptr; IOutputStream* ScriptQueryPlanOutput = nullptr; - TMaybe InProgressStatisticsOutputFile; + TString InProgressStatisticsOutputFile; EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson; NYdb::NConsoleClient::EOutputFormat PlanOutputFormat = NYdb::NConsoleClient::EOutputFormat::Default; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 4901a512e26e..fd4b07a027b9 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -89,7 +89,8 @@ class TKqpRunner::TImpl { public: enum class EQueryType { ScriptQuery, - YqlScriptQuery + YqlScriptQuery, + AsyncQuery }; explicit TImpl(const TRunnerOptions& options) @@ -143,6 +144,10 @@ class TKqpRunner::TImpl { case EQueryType::YqlScriptQuery: status = YdbSetup_.YqlScriptRequest(query, action, traceId, meta, ResultSets_); break; + + case EQueryType::AsyncQuery: + YdbSetup_.QueryRequestAsync(query, action, traceId); + return true; } TYdbSetup::StopTraceOpt(); @@ -163,6 +168,10 @@ class TKqpRunner::TImpl { return true; } + void WaitAsyncQueries() const { + YdbSetup_.WaitAsyncQueries(); + } + bool FetchScriptResults() { TYdbSetup::StopTraceOpt(); @@ -194,12 +203,14 @@ class TKqpRunner::TImpl { } void PrintScriptResults() const { - Cout << CoutColors_.Cyan() << "Writing script query results" << CoutColors_.Default() << Endl; - for (size_t i = 0; i < ResultSets_.size(); ++i) { - if (ResultSets_.size() > 1) { - *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; + if (Options_.ResultOutput) { + Cout << CoutColors_.Cyan() << "Writing script query results" << CoutColors_.Default() << Endl; + for (size_t i = 0; i < ResultSets_.size(); ++i) { + if (ResultSets_.size() > 1) { + *Options_.ResultOutput << CoutColors_.Cyan() << "Result set " << i + 1 << ":" << CoutColors_.Default() << Endl; + } + PrintScriptResult(ResultSets_[i]); } - PrintScriptResult(ResultSets_[i]); } } @@ -295,7 +306,7 @@ class TKqpRunner::TImpl { void PrintScriptProgress(const TString& plan) const { if (Options_.InProgressStatisticsOutputFile) { - TFileOutput outputStream(*Options_.InProgressStatisticsOutputFile); + TFileOutput outputStream(Options_.InProgressStatisticsOutputFile); outputStream << TInstant::Now().ToIsoStringLocal() << " Script in progress statistics" << Endl; auto convertedPlan = plan; @@ -398,6 +409,14 @@ bool TKqpRunner::ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction return Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::YqlScriptQuery); } +void TKqpRunner::ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->ExecuteQuery(query, action, traceId, TImpl::EQueryType::AsyncQuery); +} + +void TKqpRunner::WaitAsyncQueries() const { + Impl_->WaitAsyncQueries(); +} + bool TKqpRunner::FetchScriptResults() { return Impl_->FetchScriptResults(); } diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index b263bbedbf55..3687a7cbda06 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -18,6 +18,10 @@ class TKqpRunner { bool ExecuteYqlScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + void ExecuteQueryAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + + void WaitAsyncQueries() const; + bool FetchScriptResults(); bool ForgetExecutionOperation(); diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index d69a6e59d898..d7777d9a7237 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -69,7 +69,7 @@ class TYdbSetup::TImpl { private: TAutoPtr CreateLogBackend() const { if (Settings_.LogOutputFile) { - return NActors::CreateFileBackend(*Settings_.LogOutputFile); + return NActors::CreateFileBackend(Settings_.LogOutputFile); } else { return NActors::CreateStderrBackend(); } @@ -139,6 +139,7 @@ class TYdbSetup::TImpl { if (Settings_.MonitoringEnabled) { serverSettings.InitKikimrRunConfig(); + serverSettings.SetMonitoringPortOffset(Settings_.MonitoringPortOffset); } return serverSettings; @@ -198,7 +199,9 @@ class TYdbSetup::TImpl { WaitResourcesPublishing(); if (Settings_.MonitoringEnabled) { - Cout << CoutColors_.Cyan() << "Monitoring port: " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort() << Endl; + for (ui32 nodeIndex = 0; nodeIndex < Settings_.NodeCount; ++nodeIndex) { + Cout << CoutColors_.Cyan() << "Monitoring port" << (Settings_.NodeCount > 1 ? TStringBuilder() << " for node " << nodeIndex + 1 : TString()) << ": " << CoutColors_.Default() << Server_->GetRuntime()->GetMonPort(nodeIndex) << Endl; + } } } @@ -216,18 +219,10 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, std::vector& resultSets, TProgressCallback progressCallback) const { - auto event = MakeHolder(); - FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); - - if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { - event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); - } - - auto promise = NThreading::NewPromise(); - auto rowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(); - auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); - GetRuntime()->Register(CreateRunScriptActorMock(std::move(event), promise, rowsLimit, sizeLimit, resultSets, progressCallback)); + TQueryResponse QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TProgressCallback progressCallback) const { + auto request = GetQueryRequest(query, action, traceId); + auto promise = NThreading::NewPromise(); + GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback)); return promise.GetFuture().GetValueSync(); } @@ -254,7 +249,7 @@ class TYdbSetup::TImpl { auto sizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit(); NActors::IActor* fetchActor = NKikimr::NKqp::CreateGetScriptExecutionResultActor(edgeActor, Settings_.DomainName, executionId, resultSetId, 0, rowsLimit, sizeLimit, TInstant::Max()); - GetRuntime()->Register(fetchActor); + GetRuntime()->Register(fetchActor, RandomNumber(Settings_.NodeCount)); return GetRuntime()->GrabEdgeEvent(edgeActor); } @@ -266,6 +261,29 @@ class TYdbSetup::TImpl { return RunKqpProxyRequest(std::move(event)); } + void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) { + if (!AsyncQueryRunnerActorId_) { + AsyncQueryRunnerActorId_ = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings_.InFlightLimit)); + } + + auto request = GetQueryRequest(query, action, traceId); + auto startPromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); + + return startPromise.GetFuture().GetValueSync(); + } + + void WaitAsyncQueries() const { + if (!AsyncQueryRunnerActorId_) { + return; + } + + auto finalizePromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise)); + + return finalizePromise.GetFuture().GetValueSync(); + } + void StartTraceOpt() const { if (!Settings_.TraceOptEnabled) { ythrow yexception() << "Trace opt was disabled"; @@ -286,7 +304,7 @@ class TYdbSetup::TImpl { template typename TResponse::TPtr RunKqpProxyRequest(THolder event) const { NActors::TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); - NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId()); + NActors::TActorId kqpProxy = NKikimr::NKqp::MakeKqpProxyID(GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount))); GetRuntime()->Send(kqpProxy, edgeActor, event.Release()); @@ -316,6 +334,22 @@ class TYdbSetup::TImpl { } } + TQueryRequest GetQueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + auto event = std::make_unique(); + FillQueryRequest(query, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY, action, traceId, event->Record); + + if (auto progressStatsPeriodMs = Settings_.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { + event->SetProgressStatsPeriod(TDuration::MilliSeconds(progressStatsPeriodMs)); + } + + return { + .Event = std::move(event), + .TargetNode = GetRuntime()->GetNodeId(RandomNumber(Settings_.NodeCount)), + .ResultRowsLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultRowsLimit(), + .ResultSizeLimit = Settings_.AppConfig.GetQueryServiceConfig().GetScriptResultSizeLimit() + }; + } + private: TYdbSetupSettings Settings_; NColorizer::TColors CoutColors_; @@ -323,6 +357,8 @@ class TYdbSetup::TImpl { THolder Server_; THolder Client_; TPortManager PortManager_; + + std::optional AsyncQueryRunnerActorId_; }; @@ -378,9 +414,11 @@ TRequestResult TYdbSetup::ScriptRequest(const TString& script, NKikimrKqp::EQuer TRequestResult TYdbSetup::QueryRequest(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId, TQueryMeta& meta, std::vector& resultSets, TProgressCallback progressCallback) const { resultSets.clear(); - auto queryOperationResponse = Impl_->QueryRequest(query, action, traceId, resultSets, progressCallback)->Get()->Record.GetRef(); + TQueryResponse queryResponse = Impl_->QueryRequest(query, action, traceId, progressCallback); + const auto& queryOperationResponse = queryResponse.Response->Get()->Record.GetRef(); const auto& responseRecord = queryOperationResponse.GetResponse(); + resultSets = std::move(queryResponse.ResultSets); meta.Ast = responseRecord.GetQueryAst(); if (const auto& plan = responseRecord.GetQueryPlan()) { meta.Plan = plan; @@ -442,6 +480,14 @@ TRequestResult TYdbSetup::ForgetScriptExecutionOperationRequest(const TString& o return TRequestResult(forgetScriptExecutionOperationResponse->Get()->Status, forgetScriptExecutionOperationResponse->Get()->Issues); } +void TYdbSetup::QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + Impl_->QueryRequestAsync(query, action, traceId); +} + +void TYdbSetup::WaitAsyncQueries() const { + Impl_->WaitAsyncQueries(); +} + void TYdbSetup::StartTraceOpt() const { Impl_->StartTraceOpt(); } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index 745cdae8a659..017ab6e18ede 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -65,6 +65,10 @@ class TYdbSetup { TRequestResult ForgetScriptExecutionOperationRequest(const TString& operation) const; + void QueryRequestAsync(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const; + + void WaitAsyncQueries() const; + void StartTraceOpt() const; static void StopTraceOpt();