Skip to content

Commit

Permalink
Added lease for queued requests table
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jun 20, 2024
1 parent 62cb0fa commit 3a45605
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 55 deletions.
61 changes: 42 additions & 19 deletions ydb/core/kqp/workload_service/kqp_workload_service_queues.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class TStateBase : public IState {
public:
TStateBase(const TActorContext& actorContext, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, NMonitoring::TDynamicCounterPtr counters)
: Counters(counters)
, PoolConfig(poolConfig)
, ActorContext(actorContext)
, PoolId(poolId)
, CancelAfter(poolConfig.QueryCancelAfter)
, PoolSizeLimit(GetMaxPoolSize(poolConfig))
, InFlightLimit(GetMaxInFlight(poolConfig))
, PoolConfig(poolConfig)
, CancelAfter(poolConfig.QueryCancelAfter)
{
RegisterCounters();
}
Expand All @@ -69,7 +69,9 @@ class TStateBase : public IState {
}

LOG_D("received new request, worker id: " << workerActorId << ", session id: " << sessionId);
ActorContext.Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(PoolId, sessionId));
if (CancelAfter) {
ActorContext.Schedule(CancelAfter, new TEvPrivate::TEvCancelRequest(PoolId, sessionId));
}

TRequest* request = &LocalSessions.insert({sessionId, TRequest(workerActorId, sessionId)}).first->second;
LocalDelayedRequests->Inc();
Expand Down Expand Up @@ -145,7 +147,7 @@ class TStateBase : public IState {
if (!request->Started && request->State != TRequest::EState::Finishing) {
if (request->State == TRequest::EState::Canceling && status == Ydb::StatusIds::SUCCESS) {
status = Ydb::StatusIds::CANCELLED;
issues.AddIssue("Delay deadline exceeded");
issues.AddIssue(TStringBuilder() << "Delay deadline exceeded in pool " << PoolId);
}
ReplyContinue(request, status, issues);
return;
Expand Down Expand Up @@ -190,6 +192,21 @@ class TStateBase : public IState {
return LocalInFlight;
}

TMaybe<TInstant> GetWaitDeadline(TInstant startTime) {
if (!CancelAfter) {
return Nothing();
}
return startTime + CancelAfter;
}

NYql::TIssue GroupIssues(const TString& message, NYql::TIssues issues) {
NYql::TIssue rootIssue(message);
for (const NYql::TIssue& issue : issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}
return rootIssue;
}

TString LogPrefix() const {
return TStringBuilder() << "PoolId: " << PoolId << ", ";
}
Expand Down Expand Up @@ -245,14 +262,15 @@ class TStateBase : public IState {
protected:
NMonitoring::TDynamicCounterPtr Counters;

const NResourcePool::TPoolSettings PoolConfig;
const TActorContext ActorContext;
const TString PoolId;
const TDuration CancelAfter;
const ui64 PoolSizeLimit;
const ui64 InFlightLimit;

private:
const NResourcePool::TPoolSettings PoolConfig;
const TDuration CancelAfter;

ui64 LocalInFlight = 0;
std::unordered_map<TString, TRequest> LocalSessions;

Expand Down Expand Up @@ -337,9 +355,12 @@ class TFifoState : public TStateBase {

void RefreshState(bool refreshRequired = false) override {
RefreshRequired |= refreshRequired;
DoCleanupRequests();
if (!PreparingFinished) {
return;
}

if (RunningOperation || !PreparingFinished) {
DoCleanupRequests();
if (RunningOperation) {
return;
}

Expand Down Expand Up @@ -418,17 +439,17 @@ class TFifoState : public TStateBase {

if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
LOG_E("failed to delay request " << ev->Get()->Status << ", session id: " << ev->Get()->SessionId << ", issues: " << ev->Get()->Issues.ToOneLineString());
ForUnfinished(ev->Get()->SessionId, [this, ev](TRequest* request) {
ReplyContinue(request, ev->Get()->Status, ev->Get()->Issues);
NYql::TIssue issue = GroupIssues("Failed to put request in queue", ev->Get()->Issues);
ForUnfinished(ev->Get()->SessionId, [this, ev, issue](TRequest* request) {
ReplyContinue(request, ev->Get()->Status, {issue});
});
RefreshRequired = true;
return;
}

LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId);

GlobalState.DelayedRequests++;
GlobalDelayedRequests->Inc();
LOG_D("succefully delayed request, session id: " << ev->Get()->SessionId);

DoStartDelayedRequest();
RefreshState();
Expand All @@ -440,9 +461,10 @@ class TFifoState : public TStateBase {
const TString& sessionId = ev->Get()->SessionId;
if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
LOG_E("failed start request " << ev->Get()->Status << ", session id: " << sessionId << ", issues: " << ev->Get()->Issues.ToOneLineString());
ForUnfinished(sessionId, [this, ev](TRequest* request) {
NYql::TIssue issue = GroupIssues("Failed to start request", ev->Get()->Issues);
ForUnfinished(sessionId, [this, ev, issue](TRequest* request) {
AddFinishedRequest(request->SessionId);
ReplyContinue(request, ev->Get()->Status, ev->Get()->Issues);
ReplyContinue(request, ev->Get()->Status, {issue});
});
RefreshState();
return;
Expand All @@ -467,8 +489,9 @@ class TFifoState : public TStateBase {
GlobalInFly->Inc();
ReplyContinue(request);
} else {
AddFinishedRequest(request->SessionId);
request->State = TRequest::EState::Canceling;
// Request was dropped due to lease expiration
PendingRequests.emplace_front(request->SessionId);
PendingRequestsCount->Inc();
}
});
DelayedRequests.pop_front();
Expand Down Expand Up @@ -533,9 +556,10 @@ class TFifoState : public TStateBase {
if (!PendingRequests.empty()) {
RunningOperation = true;
const TString& sessionId = PopPendingRequest();
ActorContext.Register(CreateDelayRequestActor(ActorContext.SelfID, PoolId, sessionId, GetRequest(sessionId)->StartTime + CancelAfter, Counters));
TRequest* request = GetRequest(sessionId);
ActorContext.Register(CreateDelayRequestActor(ActorContext.SelfID, PoolId, sessionId, request->StartTime, GetWaitDeadline(request->StartTime), LEASE_DURATION, Counters));
DelayedRequests.emplace_back(sessionId);
GetRequest(sessionId)->CleanupRequired = true;
request->CleanupRequired = true;
}
}

Expand All @@ -547,7 +571,6 @@ class TFifoState : public TStateBase {
if (!FinishedRequests.empty()) {
RunningOperation = true;
ActorContext.Register(CreateCleanupRequestsActor(ActorContext.SelfID, PoolId, FinishedRequests, Counters));

FinishedRequests.clear();
FinishingRequestsCount->Set(0);
}
Expand Down
Loading

0 comments on commit 3a45605

Please sign in to comment.