Skip to content

Commit

Permalink
Merge branch 'stable-24-3-8-analytics' into stable-analytics-locks
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Sep 17, 2024
2 parents 47edad5 + ffe45cd commit 2a50bae
Show file tree
Hide file tree
Showing 696 changed files with 4,302 additions and 2,988 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ __pycache__/
*.pb.h
*.pb.cc

# Other generated
*.fbs.h

# MacOS specific
.DS_Store

Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
}

void FillCompileResult(std::unique_ptr<NKikimrKqp::TPreparedQuery> preparingQuery, NKikimrKqp::EQueryType queryType,
bool allowCache) {
bool allowCache, bool success) {
auto preparedQueryHolder = std::make_shared<TPreparedQueryHolder>(
preparingQuery.release(), AppData()->FunctionRegistry);
preparingQuery.release(), AppData()->FunctionRegistry, !success);
preparedQueryHolder->MutableLlvmSettings().Fill(Config, queryType);
KqpCompileResult->PreparedQuery = preparedQueryHolder;
KqpCompileResult->AllowCache = CanCacheQuery(KqpCompileResult->PreparedQuery->GetPhysicalQuery()) && allowCache;
Expand Down Expand Up @@ -500,7 +500,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

if (status == Ydb::StatusIds::SUCCESS) {
YQL_ENSURE(kqpResult.PreparingQuery);
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache);
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache, true);

auto now = TInstant::Now();
auto duration = now - StartTime;
Expand All @@ -511,7 +511,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
<< ", duration: " << duration);
} else {
if (kqpResult.PreparingQuery) {
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache);
FillCompileResult(std::move(kqpResult.PreparingQuery), queryType, kqpResult.AllowCache, false);
}

LOG_ERROR_S(ctx, NKikimrServices::KQP_COMPILE_ACTOR, "Compilation failed"
Expand Down Expand Up @@ -610,6 +610,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableSpillingGenericQuery = serviceConfig.GetEnableQueryServiceSpilling();
kqpConfig.DefaultCostBasedOptimizationLevel = serviceConfig.GetDefaultCostBasedOptimizationLevel();
kqpConfig.EnableConstantFolding = serviceConfig.GetEnableConstantFolding();
kqpConfig.SetDefaultEnabledSpillingNodes(serviceConfig.GetEnableSpillingNodes());

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
ui64 defaultCostBasedOptimizationLevel = TableServiceConfig.GetDefaultCostBasedOptimizationLevel();
bool enableConstantFolding = TableServiceConfig.GetEnableConstantFolding();

TString enableSpillingNodes = TableServiceConfig.GetEnableSpillingNodes();

TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");

Expand Down Expand Up @@ -564,6 +566,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit ||
TableServiceConfig.GetIdxLookupJoinPointsLimit() != idxLookupPointsLimit ||
TableServiceConfig.GetEnableSpillingNodes() != enableSpillingNodes ||
TableServiceConfig.GetEnableQueryServiceSpilling() != enableQueryServiceSpilling ||
TableServiceConfig.GetEnableImplicitQueryParameterTypes() != enableImplicitQueryParameterTypes ||
TableServiceConfig.GetDefaultCostBasedOptimizationLevel() != defaultCostBasedOptimizationLevel ||
Expand Down
15 changes: 7 additions & 8 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
, std::shared_ptr<IKqpNodeState> state
, TIntrusivePtr<NRm::TTxState> tx
, TIntrusivePtr<NRm::TTaskState> task
, ui64 limit
, ui64 reasonableSpillingTreshold)
, ui64 limit)
: NYql::NDq::TGuaranteeQuotaManager(limit, limit)
, ResourceManager(std::move(resourceManager))
, MemoryPool(memoryPool)
, State(std::move(state))
, Tx(std::move(tx))
, Task(std::move(task))
, ReasonableSpillingTreshold(reasonableSpillingTreshold)
{
}

Expand Down Expand Up @@ -57,7 +55,7 @@ struct TMemoryQuotaManager : public NYql::NDq::TGuaranteeQuotaManager {
}

bool IsReasonableToUseSpilling() const override {
return Tx->GetExtraMemoryAllocatedSize() >= ReasonableSpillingTreshold;
return Task->IsReasonableToStartSpilling();
}

TString MemoryConsumptionDetails() const override {
Expand Down Expand Up @@ -88,7 +86,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
std::atomic<ui64> MkqlLightProgramMemoryLimit = 0;
std::atomic<ui64> MkqlHeavyProgramMemoryLimit = 0;
std::atomic<ui64> MinChannelBufferSize = 0;
std::atomic<ui64> ReasonableSpillingTreshold = 0;
std::atomic<ui64> MinMemAllocSize = 8_MB;
std::atomic<ui64> MinMemFreeSize = 32_MB;

Expand All @@ -109,7 +106,6 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
MkqlLightProgramMemoryLimit.store(config.GetMkqlLightProgramMemoryLimit());
MkqlHeavyProgramMemoryLimit.store(config.GetMkqlHeavyProgramMemoryLimit());
MinChannelBufferSize.store(config.GetMinChannelBufferSize());
ReasonableSpillingTreshold.store(config.GetReasonableSpillingTreshold());
MinMemAllocSize.store(config.GetMinMemAllocSize());
MinMemFreeSize.store(config.GetMinMemFreeSize());
}
Expand Down Expand Up @@ -164,14 +160,17 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
std::move(args.State),
std::move(args.TxInfo),
std::move(task),
limit,
ReasonableSpillingTreshold.load());
limit);

auto runtimeSettings = args.RuntimeSettings;
runtimeSettings.ExtraMemoryAllocationPool = args.MemoryPool;
runtimeSettings.UseSpilling = args.WithSpilling;
runtimeSettings.StatsMode = args.StatsMode;

if (runtimeSettings.UseSpilling) {
args.Task->SetEnableSpilling(runtimeSettings.UseSpilling);
}

if (args.Deadline) {
runtimeSettings.Timeout = args.Deadline - TAppData::TimeProvider->Now();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ struct IKqpNodeComputeActorFactory {
const TInstant& Deadline;
const bool ShareMailbox;
const TMaybe<NYql::NDqProto::TRlPath>& RlPath;

TComputeStagesWithScan* ComputesByStages = nullptr;
std::shared_ptr<IKqpNodeState> State = nullptr;
TComputeActorSchedulingOptions SchedulingOptions = {};
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ using namespace NYql::NDq;

class TKqpTaskRunnerExecutionContext : public TDqTaskRunnerExecutionContext {
public:
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUp))
TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, TWakeUpCallback&& wakeUpCallback, TErrorCallback&& errorCallback)
: TDqTaskRunnerExecutionContext(txId, std::move(wakeUpCallback), std::move(errorCallback))
, WithSpilling_(withSpilling)
{
}
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ void TKqpComputeActor::DoBootstrap() {
auto taskRunner = MakeDqTaskRunner(TBase::GetAllocatorPtr(), execCtx, settings, logger);
SetTaskRunner(taskRunner);

auto wakeup = [this]{ ContinueExecute(); };
auto wakeupCallback = [this]{ ContinueExecute(); };
auto errorCallback = [this](const TString& error){ SendError(error); };
try {
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeupCallback), std::move(errorCallback)));
} catch (const NMiniKQL::TKqpEnsureFail& e) {
InternalError((TIssuesIds::EIssueCode) e.GetCode(), e.GetMessage());
return;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ void TKqpScanComputeActor::DoBootstrap() {
TBase::SetTaskRunner(taskRunner);

auto wakeup = [this] { ContinueExecute(); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup)));
auto errorCallback = [this](const TString& error){ SendError(error); };
TBase::PrepareTaskRunner(TKqpTaskRunnerExecutionContext(std::get<ui64>(TxId), RuntimeSettings.UseSpilling, std::move(wakeup), std::move(errorCallback)));

ComputeCtx.AddTableScan(0, Meta, GetStatsMode());
ScanData = &ComputeCtx.GetTableScan(0);
Expand Down
117 changes: 74 additions & 43 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,49 +207,15 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
);
}

bool LogStatsByLongTasks() const {
return Stats->CollectStatsByLongTasks && HasOlapTable;
}

void FillResponseStats(Ydb::StatusIds::StatusCode status) {
auto& response = *ResponseEv->Record.MutableResponse();

response.SetStatus(status);

if (Stats) {
ReportEventElapsedTime();

Stats->FinishTs = TInstant::Now();
Stats->Finish();

if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) {
for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
const auto& tx = Request.Transactions[txId].Body;
auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
}
}

if (LogStatsByLongTasks()) {
const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
if (!txPlansWithStats.empty()) {
LOG_N("Full stats: " << txPlansWithStats);
}
}

Stats.reset();
}
}

void Finalize() {
YQL_ENSURE(!AlreadyReplied);

if (LocksBroken) {
YQL_ENSURE(ResponseEv->BrokenLockShardId);
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
}

auto& response = *ResponseEv->Record.MutableResponse();

FillResponseStats(Ydb::StatusIds::SUCCESS);
ResponseEv->Record.MutableResponse()->SetStatus(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();

auto addLocks = [this](const ui64 taskId, const auto& data) {
Expand Down Expand Up @@ -297,7 +263,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (LockHandle) {
ResponseEv->LockHandle = std::move(LockHandle);
}
BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
BuildLocks(*ResponseEv->Record.MutableResponse()->MutableResult()->MutableLocks(), Locks);
}

auto resultSize = ResponseEv->GetByteSize();
Expand All @@ -323,9 +289,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

ExecuterSpan.EndOk();

Request.Transactions.crop(0);
LOG_D("Sending response to: " << Target << ", results: " << ResponseEv->ResultsSize());
Send(Target, ResponseEv.release());
AlreadyReplied = true;
PassAway();
}

Expand Down Expand Up @@ -365,6 +329,8 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
return "WaitSnapshotState";
} else if (func == &TThis::WaitResolveState) {
return "WaitResolveState";
} else if (func == &TThis::WaitShutdownState) {
return "WaitShutdownState";
} else {
return TBase::CurrentStateFuncName();
}
Expand Down Expand Up @@ -597,7 +563,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (ev->Get()->Record.GetState() == NDqProto::COMPUTE_STATE_FAILURE) {
CancelProposal(0);
}
HandleComputeStats(ev);
HandleComputeState(ev);
}

void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
Expand Down Expand Up @@ -1078,7 +1044,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
hFunc(TEvTxProxy::TEvProposeTransactionStatus, HandleExecute);
hFunc(TEvDqCompute::TEvState, HandleComputeStats);
hFunc(TEvDqCompute::TEvState, HandleComputeState);
hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleChannelData);
hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleStreamAck);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
Expand Down Expand Up @@ -2741,6 +2707,23 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

void Shutdown() override {
if (Planner) {
if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
LOG_I("Shutdown immediately - nothing to wait");
PassAway();
} else {
this->Become(&TThis::WaitShutdownState);
LOG_I("Waiting for shutdown of " << Planner->GetPendingComputeTasks().size() << " tasks and "
<< Planner->GetPendingComputeActors().size() << " compute actors");
// TODO(ilezhankin): the CA awaiting timeout should be configurable.
TActivationContext::Schedule(TDuration::Seconds(10), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvPoison));
}
} else {
PassAway();
}
}

void PassAway() override {
auto totalTime = TInstant::Now() - StartTime;
Counters->Counters->DataTxTotalTimeHistogram->Collect(totalTime.MilliSeconds());
Expand All @@ -2758,6 +2741,54 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
TBase::PassAway();
}

STATEFN(WaitShutdownState) {
switch(ev->GetTypeRewrite()) {
hFunc(TEvDqCompute::TEvState, HandleShutdown);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleShutdown);
hFunc(TEvents::TEvPoison, HandleShutdown);
default:
LOG_E("Unexpected event: " << ev->GetTypeName()); // ignore all other events
}
}

void HandleShutdown(TEvDqCompute::TEvState::TPtr& ev) {
HandleComputeStats(ev);

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

void HandleShutdown(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
const auto nodeId = ev->Get()->NodeId;
LOG_N("Node has disconnected while shutdown: " << nodeId);

YQL_ENSURE(Planner);

for (const auto& task : TasksGraph.GetTasks()) {
if (task.Meta.NodeId == nodeId && !task.Meta.Completed) {
if (task.ComputeActorId) {
Planner->CompletedCA(task.Id, task.ComputeActorId);
} else {
Planner->TaskNotStarted(task.Id);
}
}
}

if (Planner->GetPendingComputeTasks().empty() && Planner->GetPendingComputeActors().empty()) {
PassAway();
}
}

void HandleShutdown(TEvents::TEvPoison::TPtr& ev) {
// Self-poison means timeout - don't wait anymore.
LOG_I("Timed out on waiting for Compute Actors to finish - forcing shutdown");

if (ev->Sender == SelfId()) {
PassAway();
}
}

private:
void ReplyTxStateUnknown(ui64 shardId) {
auto message = TStringBuilder() << "Tx state unknown for shard " << shardId << ", txid " << TxId;
Expand Down
Loading

0 comments on commit 2a50bae

Please sign in to comment.