Skip to content

Commit

Permalink
Support cancel after in rate limiter (ydb-platform#9348)
Browse files Browse the repository at this point in the history
(cherry picked from commit a430757)
  • Loading branch information
UgnineSirdis committed Sep 19, 2024
1 parent 590ac64 commit 56c048e
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 20 deletions.
4 changes: 3 additions & 1 deletion ydb/core/grpc_services/grpc_request_check_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ class TGrpcRequestCheckActor
SetTokenAndDie();
break;
case Ydb::StatusIds::TIMEOUT:
case Ydb::StatusIds::CANCELLED:
Counters_->IncDatabaseRateLimitedCounter();
LOG_INFO(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Throughput limit exceeded");
ReplyOverloadedAndDie(MakeIssue(NKikimrIssues::TIssuesIds::YDB_RESOURCE_USAGE_LIMITED, "Throughput limit exceeded"));
Expand All @@ -331,7 +332,8 @@ class TGrpcRequestCheckActor
}
};

req.mutable_operation_params()->mutable_operation_timeout()->set_nanos(200000000); // same as cloud-go serverless proxy
req.mutable_operation_params()->mutable_operation_timeout()->set_seconds(10);
req.mutable_operation_params()->mutable_cancel_after()->set_nanos(200000000); // same as cloud-go serverless proxy

NKikimr::NRpcService::RateLimiterAcquireUseSameMailbox(
std::move(req),
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/grpc_services/local_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ TActorId RateLimiterAcquireUseSameMailbox(
onSuccess();
break;
case Ydb::StatusIds::TIMEOUT:
case Ydb::StatusIds::CANCELLED:
onTimeout();
break;
default:
Expand All @@ -32,7 +33,8 @@ TActorId RateLimiterAcquireUseSameMailbox(
};

Ydb::RateLimiter::AcquireResourceRequest request;
SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after());
request.set_coordination_node_path(fullPath.CoordinationNode);
request.set_resource_path(fullPath.ResourcePath);
request.set_required(required);
Expand Down Expand Up @@ -72,6 +74,7 @@ TActorId RateLimiterAcquireUseSameMailbox(
onSuccess();
break;
case Ydb::StatusIds::TIMEOUT:
case Ydb::StatusIds::CANCELLED:
onTimeout();
break;
default:
Expand All @@ -82,7 +85,8 @@ TActorId RateLimiterAcquireUseSameMailbox(

const auto& rlPath = maybeRlPath.GetRef();
Ydb::RateLimiter::AcquireResourceRequest request;
SetDuration(duration, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration * 10, *request.mutable_operation_params()->mutable_operation_timeout());
SetDuration(duration, *request.mutable_operation_params()->mutable_cancel_after());
request.set_coordination_node_path(rlPath.CoordinationNode);
request.set_resource_path(rlPath.ResourcePath);
request.set_required(required);
Expand Down
32 changes: 27 additions & 5 deletions ydb/core/grpc_services/rpc_rate_limiter_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,18 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLi
SendRequest();
}

// Always race when "cancel after" time is not set.
// If "cancel after" is not set, quoter service can spend resource and say "OK", but we here reply with TIMEOUT.
void OnOperationTimeout(const TActorContext& ctx) {
Send(MakeQuoterServiceID(), new TEvQuota::TEvRpcTimeout(GetProtoRequest()->coordination_node_path(), GetProtoRequest()->resource_path()), 0, 0);
TBase::OnOperationTimeout(ctx);
}

// Do nothing here, because quoter service replies after "cancel after" time passes.
void OnCancelOperation(const TActorContext& ctx) {
Y_UNUSED(ctx);
}

STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvQuota::TEvClearance, Handle);
Expand Down Expand Up @@ -637,22 +644,37 @@ class TAcquireRateLimiterResourceRPC : public TRateLimiterRequest<TAcquireRateLi
true));
}

StatusIds::StatusCode QuoterDeadlineStatusCode() {
if (const TDuration cancelAfter = GetCancelAfter(); cancelAfter && cancelAfter < GetOperationTimeout()) {
return StatusIds::CANCELLED;
}
return StatusIds::TIMEOUT;
}

void SendLeaf(const TEvQuota::TResourceLeaf& leaf) {
TDuration deadline = GetOperationTimeout();
// CancelAfter is an intelligent way to say quoter service that we can wait maximum time.
// After that time quoter service sends EResult::Deadline.
// It says that the system lacks the resource.
if (const TDuration cancelAfter = GetCancelAfter(); cancelAfter && cancelAfter < deadline) {
deadline = cancelAfter;
}

Send(MakeQuoterServiceID(),
new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, GetOperationTimeout()), 0, 0);
new TEvQuota::TEvRequest(TEvQuota::EResourceOperator::And, { leaf }, deadline), 0, 0);
}

void Handle(TEvQuota::TEvClearance::TPtr& ev) {
switch (ev->Get()->Result) {
case TEvQuota::TEvClearance::EResult::Success:
Reply(StatusIds::SUCCESS, TActivationContext::AsActorContext());
break;
break;
case TEvQuota::TEvClearance::EResult::UnknownResource:
Reply(StatusIds::BAD_REQUEST, TActivationContext::AsActorContext());
break;
break;
case TEvQuota::TEvClearance::EResult::Deadline:
Reply(StatusIds::TIMEOUT, TActivationContext::AsActorContext());
break;
Reply(QuoterDeadlineStatusCode(), TActivationContext::AsActorContext());
break;
default:
Reply(StatusIds::INTERNAL_ERROR, TActivationContext::AsActorContext());
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/api/protos/ydb_rate_limiter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,11 @@ message DescribeResourceResult {
//

message AcquireResourceRequest {
// If cancel_after is set greater than zero and less than operation_timeout
// and resource is not ready after cancel_after time,
// the result code of this operation will be CANCELLED and resource will not be spent.
// It is recommended to specify both operation_timeout and cancel_after.
// cancel_after should be less than operation_timeout and non zero.
Ydb.Operations.OperationParams operation_params = 1;

// Path of a coordination node.
Expand Down
5 changes: 5 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_rate_limiter/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ class TRateLimiterClient {
TAsyncDescribeResourceResult DescribeResource(const TString& coordinationNodePath, const TString& resourcePath, const TDescribeResourceSettings& = {});

// Acquire resources's units inside a coordination node.
// If CancelAfter is set greater than zero and less than OperationTimeout
// and resource is not ready after CancelAfter time,
// the result code of this operation will be CANCELLED and resource will not be spent.
// It is recommended to specify both OperationTimeout and CancelAfter.
// CancelAfter should be less than OperationTimeout.
TAsyncStatus AcquireResource(const TString& coordinationNodePath, const TString& resourcePath, const TAcquireResourceSettings& = {});

private:
Expand Down
49 changes: 37 additions & 12 deletions ydb/services/rate_limiter/rate_limiter_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ class TTestSetupAcquireActor : public TTestSetup {
request.set_resource_path(ResourcePath);

SetDuration(Settings.OperationTimeout_, *request.mutable_operation_params()->mutable_operation_timeout());
if (Settings.CancelAfter_) {
SetDuration(Settings.CancelAfter_, *request.mutable_operation_params()->mutable_cancel_after());
}

if (Settings.IsUsedAmount_) {
request.set_used(Settings.Amount_.GetRef());
Expand Down Expand Up @@ -317,50 +320,72 @@ Y_UNIT_TEST_SUITE(TGRpcRateLimiterTest) {
return std::make_unique<TTestSetup>();
}

void AcquireResourceManyRequired(bool useActorApi) {
void AcquireResourceManyRequired(bool useActorApi, bool useCancelAfter) {
const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200);
const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set

using NYdb::NRateLimiter::TAcquireResourceSettings;

auto setup = MakeTestSetup(useActorApi);

ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));

setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);

for (int i = 0; i < 3; ++i) {
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
}
}

void AcquireResourceManyUsed(bool useActorApi) {
void AcquireResourceManyUsed(bool useActorApi, bool useCancelAfter) {
const TDuration operationTimeout = useCancelAfter ? TDuration::Hours(1) : TDuration::MilliSeconds(200);
const TDuration cancelAfter = useCancelAfter ? TDuration::MilliSeconds(200) : TDuration::Zero(); // 0 means that parameter is not set

using NYdb::NRateLimiter::TAcquireResourceSettings;

auto setup = MakeTestSetup(useActorApi);
ASSERT_STATUS_SUCCESS(setup->RateLimiterClient.CreateResource(TTestSetup::CoordinationNodePath, "res",
TCreateResourceSettings().MaxUnitsPerSecond(1).MaxBurstSizeCoefficient(42)));

setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(10000).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
for (int i = 0; i < 3; ++i) {
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(TDuration::MilliSeconds(200)), NYdb::EStatus::SUCCESS);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), useCancelAfter ? NYdb::EStatus::CANCELLED : NYdb::EStatus::TIMEOUT);
setup->CheckAcquireResource(TTestSetup::CoordinationNodePath, "res", TAcquireResourceSettings().Amount(1).IsUsedAmount(true).OperationTimeout(operationTimeout).CancelAfter(cancelAfter), NYdb::EStatus::SUCCESS);
}
}

Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApi) {
AcquireResourceManyRequired(false);
AcquireResourceManyRequired(false, false);
}

Y_UNIT_TEST(AcquireResourceManyRequiredActorApi) {
AcquireResourceManyRequired(true);
AcquireResourceManyRequired(true, false);
}

Y_UNIT_TEST(AcquireResourceManyRequiredGrpcApiWithCancelAfter) {
AcquireResourceManyRequired(false, true);
}

Y_UNIT_TEST(AcquireResourceManyRequiredActorApiWithCancelAfter) {
AcquireResourceManyRequired(true, true);
}

Y_UNIT_TEST(AcquireResourceManyUsedGrpcApi) {
AcquireResourceManyUsed(false);
AcquireResourceManyUsed(false, false);
}

Y_UNIT_TEST(AcquireResourceManyUsedActorApi) {
AcquireResourceManyUsed(true);
AcquireResourceManyUsed(true, false);
}

Y_UNIT_TEST(AcquireResourceManyUsedGrpcApiWithCancelAfter) {
AcquireResourceManyUsed(false, true);
}

Y_UNIT_TEST(AcquireResourceManyUsedActorApiWithCancelAfter) {
AcquireResourceManyUsed(true, true);
}
}

Expand Down

0 comments on commit 56c048e

Please sign in to comment.