Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQL-17542 finalize split sync async ca #1689

8 changes: 2 additions & 6 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,22 +156,18 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvFetcherFinished::TPtr& ev)
}
}

void TKqpScanComputeActor::PollSources(std::any prev) {
void TKqpScanComputeActor::PollSources(ui64 prevFreeSpace) {
if (!ScanData || ScanData->IsFinished()) {
return;
}
const auto hasNewMemoryPred = [&]() {
if (!prev.has_value()) {
return false;
}
const ui64 freeSpace = CalculateFreeSpace();
const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
return freeSpace > prevFreeSpace;
};
if (!hasNewMemoryPred() && ScanData->GetStoredBytes()) {
return;
}
const ui32 freeSpace = CalculateFreeSpace();
const ui64 freeSpace = CalculateFreeSpace();
CA_LOG_D("POLL_SOURCES:START:" << Fetchers.size() << ";fs=" << freeSpace);
for (auto&& i : Fetchers) {
Send(i, new TEvScanExchange::TEvAckData(freeSpace));
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
: 0ul;
}

std::any GetSourcesState() override {
ui64 GetSourcesState() {
if (!ScanData) {
return 0;
}
return CalculateFreeSpace();
}

void PollSources(std::any prev) override;
void PollSources(ui64 prevFreeSpace);

void PassAway() override {
if (TaskRunner) {
Expand Down
24 changes: 6 additions & 18 deletions ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,18 +461,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
return inputChannel->FreeSpace;
}

TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() override {
return TypeEnv->BindAllocator();
}

std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> MaybeBindAllocator() override {
std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard;
if (TypeEnv) {
guard.emplace(TypeEnv->BindAllocator());
}
return guard;
}

void OnTaskRunnerCreated(NTaskRunnerActor::TEvTaskRunnerCreateFinished::TPtr& ev) {
const auto& secureParams = ev->Get()->SecureParams;
const auto& taskParams = ev->Get()->TaskParams;
Expand All @@ -483,7 +471,7 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
Stat->AddCounters2(ev->Get()->Sensors);
}
TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges);
FillIoMaps(holderFactory, typeEnv, secureParams, taskParams, readRanges, nullptr);

{
// say "Hello" to executer
Expand Down Expand Up @@ -517,7 +505,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC

MkqlMemoryLimit = ev->Get()->MkqlMemoryLimit;
ProfileStats = std::move(ev->Get()->ProfileStats);
auto sourcesState = GetSourcesState();
auto status = ev->Get()->RunStatus;

CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState
Expand All @@ -536,10 +523,6 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
}
}

if (status != ERunStatus::Finished) {
PollSources(std::move(sourcesState));
}

if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) {
ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark());
WatermarksTracker.PopPendingWatermark();
Expand Down Expand Up @@ -801,6 +784,11 @@ class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor, TC
return TaskRunnerStats.Get();
}

const NYql::NDq::TDqMeteringStats* GetMeteringStats() override {
// TODO: support async CA
return nullptr;
}

template<typename TSecond>
TVector<ui32> GetIds(const THashMap<ui64, TSecond>& collection) {
TVector<ui32> ids;
Expand Down
3 changes: 0 additions & 3 deletions ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,6 @@ struct TComputeMemoryLimits {
IMemoryQuotaManager::TPtr MemoryQuotaManager;
};

//temporary flag to integarate changes in interface
#define Y_YQL_DQ_TASK_RUNNER_REQUIRES_ALLOCATOR 1

using TTaskRunnerFactory = std::function<
TIntrusivePtr<IDqTaskRunner>(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TDqTaskSettings& task, NDqProto::EDqStatsMode statsMode, const TLogFunc& logFunc)
>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ struct TComputeActorAsyncInputHelper {
Pause(*watermark);
}
}
const bool emptyBatch = batch.empty();
AsyncInputPush(std::move(batch), space, finished);
if (!batch.empty()) {
if (!emptyBatch) {
// If we have read some data, we must run such reading again
// to process the case when async input notified us about new data
// but we haven't read all of it.
Expand Down
Loading
Loading