Skip to content

Commit

Permalink
Stable 24 3 8 analytics (#9022)
Browse files Browse the repository at this point in the history
Co-authored-by: Aidar Samerkhanov <[email protected]>
Co-authored-by: Vitalii Gridnev <[email protected]>
Co-authored-by: Tony-Romanov <[email protected]>
Co-authored-by: Whompe <[email protected]>
Co-authored-by: Vladislav Lukachik <[email protected]>
Co-authored-by: Whompe <[email protected]>
  • Loading branch information
7 people authored Sep 16, 2024
1 parent 04ee172 commit ffe45cd
Show file tree
Hide file tree
Showing 120 changed files with 2,001 additions and 1,210 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();

TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();

TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");

Expand All @@ -562,6 +564,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||
Expand Down
15 changes: 7 additions & 8 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
, std::shared_ptr<IKqpNodeState> state
, TIntrusivePtr<NRm::TTxState> tx
, TIntrusivePtr<NRm::TTaskState> task
, ui64 limit
, ui64 reasonableSpillingTreshold)
, ui64 limit)
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
, ResourceManager(std::move(resourceManager))
, MemoryPool(memoryPool)
, State(std::move(state))
, Tx(std::move(tx))
, Task(std::move(task))
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
{
}

Expand Down Expand Up @@ -57,7 +55,7 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
}

bool IsReasonableToUseSpilling() const override {
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
return Task->IsReasonableToStartSpilling();
}

TString MemoryConsumptionDetails() const override {
Expand Down Expand Up @@ -88,7 +86,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
std::atomic<ui64> MinChannelBufferSize = 0;
std::atomic<ui64> ReasonableSpillingTreshold = 0;
std::atomic<ui64> MinMemAllocSize = 8_MB;
std::atomic<ui64> MinMemFreeSize = 32_MB;

Expand All @@ -109,7 +106,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
MinMemAllocSize.store(config.GetMinMemAllocSize());
MinMemFreeSize.store(config.GetMinMemFreeSize());
}
Expand Down Expand Up @@ -164,14 +160,17 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
std::move(args.State),
std::move(args.TxInfo),
std::move(task),
limit,
ReasonableSpillingTreshold.load());
limit);

auto runtimeSettings = args.RuntimeSettings;
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
runtimeSettings.UseSpilling = args.WithSpilling;
runtimeSettings.StatsMode = args.StatsMode;

if (runtimeSettings.UseSpilling) {
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
}

if (args.Deadline) {
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct IKqpNodeComputeActorFactory {
const TInstant& Deadline;
const bool ShareMailbox;
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;

TComputeStagesWithScan* ComputesByStages = nullptr;
std::shared_ptr<IKqpNodeState> State = nullptr;
TComputeActorSchedulingOptions SchedulingOptions = {};
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ using namespace NYql::NDq;

class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
, WithSpilling_(withSpilling)
{
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

auto wakeup = [this]{ ContinueExecute(); };
auto wakeupCallback = [this]{ ContinueExecute(); };
auto errorCallback = [this](const TString& error){ SendError(error); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ void TKqpScanComputeActor::DoBootstrap() {
TBase::SetTaskRunner(taskRunner);

auto wakeup = [this] { ContinueExecute(); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
auto errorCallback = [this](const TString& error){ SendError(error); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));

ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
Expand Down
11 changes: 9 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
request.SetStartAllOrFail(true);
if (UseDataQueryPool) {
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::DATA);
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
} else {
request.MutableRuntimeSettings()->SetExecType(NYql::NDqProto::TComputeRuntimeSettings::SCAN);
request.MutableRuntimeSettings()->SetUseSpilling(WithSpilling);
Expand Down Expand Up @@ -432,8 +433,14 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
NYql::NDqProto::TDqTask* taskDesc = ArenaSerializeTaskToProto(TasksGraph, task, true);
NYql::NDq::TComputeRuntimeSettings settings;
if (!TxInfo) {
double memoryPoolPercent = 100;
if (UserRequestContext->PoolConfig.has_value()) {
memoryPoolPercent = UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode;
}

TxInfo = MakeIntrusive<NRm::TTxState>(
TxId, TInstant::Now(), ResourceManager_->GetCounters());
TxId, TInstant::Now(), ResourceManager_->GetCounters(),
UserRequestContext->PoolId, memoryPoolPercent, Database);
}

auto startResult = CaFactory_->CreateKqpComputeActor({
Expand All @@ -454,7 +461,7 @@ TString TKqpPlanner::ExecuteDataComputeTask(ui64 taskId, ui32 computeTasksSize)
.StatsMode = GetDqStatsMode(StatsMode),
.Deadline = Deadline,
.ShareMailbox = (computeTasksSize <= 1),
.RlPath = Nothing()
.RlPath = Nothing(),
});

if (const auto* rmResult = std::get_if<NRm::TKqpRMAllocateResult>(&startResult)) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
}

TIntrusivePtr<NRm::TTxState> txInfo = MakeIntrusive<NRm::TTxState>(
txId, TInstant::Now(), ResourceManager_->GetCounters());
txId, TInstant::Now(), ResourceManager_->GetCounters(),
msg.GetSchedulerGroup(), msg.GetMemoryPoolPercent(),
msg.GetDatabase());

const ui32 tasksCount = msg.GetTasks().size();
for (auto& dqTask: *msg.MutableTasks()) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/logical/kqp_opt_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
true, // defaultWatermarksMode
true); // syncActor
} else {
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey());
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
output = DqRewriteAggregate(node, ctx, TypesCtx, false, KqpCtx.Config->HasOptEnableOlapPushdown() || KqpCtx.Config->HasOptUseFinalizeByKey(), KqpCtx.Config->HasOptUseFinalizeByKey(), spillingSettings.IsAggregationSpillingEnabled());
}
if (output) {
DumpAppliedRule("RewriteAggregate", node.Ptr(), output.Cast().Ptr(), ctx);
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
}

TMaybeNode<TExprBase> ExpandAggregatePhase(TExprBase node, TExprContext& ctx) {
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false);
NDq::TSpillingSettings spillingSettings(KqpCtx.Config->GetEnabledSpillingNodes());
auto output = ExpandAggregatePeepholeImpl(node.Ptr(), ctx, TypesCtx, KqpCtx.Config->HasOptUseFinalizeByKey(), false, spillingSettings.IsAggregationSpillingEnabled());
DumpAppliedRule("ExpandAggregatePhase", node.Ptr(), output, ctx);
return TExprBase(output);
}
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,10 +815,11 @@ class TKiSourceCallableExecutionTransformer : public TAsyncCallbackTransformer<T

TVector<TExprBase> fakeReads;
auto paramsType = NDq::CollectParameters(programLambda, ctx);
NDq::TSpillingSettings spillingSettings{SessionCtx->Config().GetEnabledSpillingNodes()};
lambda = NDq::BuildProgram(
programLambda, *paramsType, compiler, SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry(),
ctx, fakeReads);
ctx, fakeReads, spillingSettings);

NKikimr::NMiniKQL::TProgramBuilder programBuilder(SessionCtx->Query().QueryData->GetAllocState()->TypeEnv,
*SessionCtx->Query().QueryData->GetAllocState()->HolderFactory.GetFunctionRegistry());
Expand Down
31 changes: 29 additions & 2 deletions ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/table_service_config.pb.h>
#include <util/generic/size_literals.h>
#include <util/string/split.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>

namespace NYql {

Expand All @@ -23,6 +25,22 @@ EOptionalFlag GetOptionalFlagValue(const TMaybe<TType>& flag) {
return EOptionalFlag::Disabled;
}


ui64 ParseEnableSpillingNodes(const TString &v) {
ui64 res = 0;
TVector<TString> vec;
StringSplitter(v).SplitBySet(",;| ").AddTo(&vec);
for (auto& s: vec) {
if (s.empty()) {
throw yexception() << "Empty value item";
}

auto value = FromString<NDq::EEnabledSpillingNodes>(s);
res |= ui64(value);
}
return res;
}

static inline bool GetFlagValue(const TMaybe<bool>& flag) {
return flag ? flag.GetRef() : false;
}
Expand Down Expand Up @@ -73,6 +91,8 @@ TKikimrConfiguration::TKikimrConfiguration() {

REGISTER_SETTING(*this, OptUseFinalizeByKey);
REGISTER_SETTING(*this, CostBasedOptimizationLevel);
REGISTER_SETTING(*this, EnableSpillingNodes)
.Parser([](const TString& v) { return ParseEnableSpillingNodes(v); });

REGISTER_SETTING(*this, MaxDPccpDPTableSize);

Expand Down Expand Up @@ -126,10 +146,9 @@ bool TKikimrSettings::HasOptEnableOlapProvideComputeSharding() const {
}

bool TKikimrSettings::HasOptUseFinalizeByKey() const {
return GetOptionalFlagValue(OptUseFinalizeByKey.Get()) != EOptionalFlag::Disabled;
return GetFlagValue(OptUseFinalizeByKey.Get().GetOrElse(true)) != EOptionalFlag::Disabled;
}


EOptionalFlag TKikimrSettings::GetOptPredicateExtract() const {
return GetOptionalFlagValue(OptEnablePredicateExtract.Get());
}
Expand All @@ -151,4 +170,12 @@ TKikimrSettings::TConstPtr TKikimrConfiguration::Snapshot() const {
return std::make_shared<const TKikimrSettings>(*this);
}

void TKikimrConfiguration::SetDefaultEnabledSpillingNodes(const TString& node) {
DefaultEnableSpillingNodes = ParseEnableSpillingNodes(node);
}

ui64 TKikimrConfiguration::GetEnabledSpillingNodes() const {
return EnableSpillingNodes.Get().GetOrElse(DefaultEnableSpillingNodes);
}

}
5 changes: 5 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct TKikimrSettings {
NCommon::TConfSetting<TString, false> OptCardinalityHints;
NCommon::TConfSetting<TString, false> OptJoinAlgoHints;
NCommon::TConfSetting<TString, false> OptJoinOrderHints;
NCommon::TConfSetting<TString, false> OverrideStatistics;

/* Disable optimizer rules */
NCommon::TConfSetting<bool, false> OptDisableTopSort;
Expand Down Expand Up @@ -175,6 +176,10 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableSpillingGenericQuery = false;
ui32 DefaultCostBasedOptimizationLevel = 4;
bool EnableConstantFolding = true;
ui64 DefaultEnableSpillingNodes = 0;

void SetDefaultEnabledSpillingNodes(const TString& node);
ui64 GetEnabledSpillingNodes() const;
};

}
9 changes: 8 additions & 1 deletion ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/resource/resource.h>

#include <util/folder/dirut.h>

namespace NKikimr::NKqp {

Expand Down Expand Up @@ -236,9 +237,15 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());

if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
TString spillingRoot = cfg.GetRoot();
if (spillingRoot.empty()) {
spillingRoot = NYql::NDq::GetTmpSpillingRootForCurrentUser();
MakeDirIfNotExist(spillingRoot);
}

SpillingService = TlsActivationContext->ExecutorThread.RegisterActor(NYql::NDq::CreateDqLocalFileSpillingService(
NYql::NDq::TFileSpillingServiceConfig{
.Root = cfg.GetRoot(),
.Root = spillingRoot,
.MaxTotalSize = cfg.GetMaxTotalSize(),
.MaxFileSize = cfg.GetMaxFileSize(),
.MaxFilePartSize = cfg.GetMaxFilePartSize(),
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,8 +777,9 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
stageProto.SetIsEffectsStage(hasEffects || hasTxTableSink);

auto paramsType = CollectParameters(stage, ctx);
NDq::TSpillingSettings spillingSettings{Config->GetEnabledSpillingNodes()};
auto programBytecode = NDq::BuildProgram(stage.Program(), *paramsType, *KqlCompiler, TypeEnv, FuncRegistry,
ctx, {});
ctx, {}, spillingSettings);

auto& programProto = *stageProto.MutableProgram();
programProto.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
Expand Down
Loading

0 comments on commit ffe45cd

Please sign in to comment.