Skip to content

Commit

Permalink
【腾讯犀牛鸟计划】实现过载保护插件 - 滑动时间窗口 #144 (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
2549141519 authored Sep 18, 2024
1 parent 39f193c commit 05baf7e
Show file tree
Hide file tree
Showing 10 changed files with 702 additions and 0 deletions.
83 changes: 83 additions & 0 deletions docs/zh/window_limit_overload_control.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# 使用示例

基于流量控制的局部过载保护过滤器,**当前只能应用于服务端,客户端暂时不支持**,用户使用需修改代码,开启编译和增加配置即可。

## 编译选项

编译选项:在`.bazelrc` 文件中加入下面一行

```sh
build --define trpc_include_overload_control=true
```
## 配置文件

服务端流量控制配置如下(详细配置参考:[flow_test.yaml](../../trpc/overload_control/flow_control/flow_test.yaml)):
```
# Plugin configuration.
server:
service:
- name: trpc.test.helloworld.Greeter
filter:
- window_limiter
plugins:
overload_control:
window_limiter:
- service_name: trpc.test.helloworld.Greeter
is_report: false
service_limiter: smooth(5)
window_size: 100
func_limiter:
- name: SayHello
limiter: default(5)
window_size: 100
```

配置关键点如下:

- window_limiter :流量控制过载保护器的名称
- service_name:流量控制对应的服务名称
- service_limiter:指定服务名 `service_name` 下的服务级流控算法;这里配置 `default(100000)` 介绍如下:
- `default`(等同 `seconds`):表示此处选择了固定窗口流量控制算法
- `100000`:表示每秒最大的请求数,也即最大 QPS 是 100000
- func_limiter:指定服务名 `service_name` 下的接口级流控策略;每个服务可以存在多个接口,每个接口可以选择不同的流量控制算法,介绍如下:
- `name`:接口名称,如:`SayHello``Route`
- `limiter`:指定接口流控算法,例如:`Route` 配置了滑动窗口(smooth)算法,最大 QPS 是 80000
- is_report:是否上报监控数据到监控插件,默认设置成不上报 **注意,该配置必须与监控插件一起使用(例如配置:plugins->metrics->prometheus,则会上报到 prometheus 上),如果没有配置监控插件,该选项无意义**,不同流量控制算法监控数据分别如下:
- seconds:
- `SecondsLimiter`:流控算法名称,用于检查配置和程序中执行的流量控制算法是否一致
- `/{callee_name}/{method}`:监控名称格式,由被调服务(callee_name)和方法名(method)组成,例如:`/trpc.test.helloworld.Greeter/SayHello`
- `current_qps`:当前并发 QPS 值
- `max_qps`: 配置的最大 QPS,用于检查配置和程序中执行的流量控制最大 QPS 是否一致
- `window_size`:采样窗口个数,用户可不关注
- `Pass`:单个请求的通过状态,0:拦截;1:通过
- `Limited`:单个请求的拦截状态,1:拦截;0:通过。与上面的 `Pass` 监控属性是相反的
- smooth:
- `SmoothLimiter`:流控算法名称,用于检查配置和程序中执行的流量控制算法是否一致
- `/{callee_name}/{method}`: 监控名称格式,由被调服务(callee_name)和方法名(method)组成,例如:`/trpc.test.helloworld.Greeter/SayHello`
- `active_sum`:所有命中的时间片中的请求总和,表示除去当前请求后此刻的 QPS 值,若超过最大 QPS,下面的 `hit_num` 则为 0
- `hit_num`:当前命中时间片增加一个请求后的的 QPS 值
- `max_qps`:配置的最大 QPS,用于检查配置和程序中执行的流量控制最大 QPS 是否一致
- `window_size`:采样窗口个数,用户可不关注
- `Pass`:单个请求的通过状态,0:拦截;1:通过
- `Limited`:单个请求的拦截状态,1:拦截;0:通过。与上面的 `Pass` 监控属性是相反的

## 代码方式采用流量控制

除了通过配置流量控制之外,还必须注册filter。业务使用者注册服务级和接口级流量控制器方式如下:

```cpp
//引入filter头文件后
class HelloWorldServer : public ::trpc::TrpcApp {
public:
// ...
int RegisterPlugins() {
// register server-side filter
auto server_filter = std::make_shared<trpc::overload_control::WindowLimiterOverloadControlFilter>();
trpc::TrpcPlugin::GetInstance()->RegisterServerFilter(server_filter);
return 0;
}

};
```
3 changes: 3 additions & 0 deletions trpc/overload_control/overload_control_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ constexpr char kOverloadctrlThrottler[] = "overloadctrl_throttler";
/// @brief Key for request priority in trpc framework.
constexpr char kTransinfoKeyTrpcPriority[] = "trpc-priority";

/// @brief Window limiter control name.
constexpr char kWindowLimiterControlName[] = "window_limiter";

/// @brief Name of overload protection limiter based on token bucket.
constexpr char kTokenBucketLimiterName[] = "token_bucket_limiter";

Expand Down
79 changes: 79 additions & 0 deletions trpc/overload_control/window_limiter_control/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
licenses(["notice"])

package(default_visibility = ["//visibility:public"])

cc_library(
name = "window_limiter_overload_controller_filter",
srcs = ["window_limiter_overload_controller_filter.cc"],
hdrs = ["window_limiter_overload_controller_filter.h"],
defines = [] +
select({
"//trpc:trpc_include_overload_control": ["TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL"],
"//conditions:default": [],
}),
visibility = [
"//visibility:public",
],
deps = [
":window_limiter_overload_controller",
"//trpc/filter:filter",
"//trpc/overload_control/flow_control:flow_controller_conf",
"//trpc/server:server_context",
"//trpc/util:likely",
],
)

cc_library(
name = "window_limiter_overload_controller",
srcs = ["window_limiter_overload_controller.cc"],
hdrs = ["window_limiter_overload_controller.h"],
defines = [] +
select({
"//trpc:trpc_include_overload_control": ["TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL"],
"//conditions:default": [],
}),
visibility = [
"//visibility:public",
],
deps = [
"//trpc/overload_control:overload_control_defs",
"//trpc/overload_control:server_overload_controller",
"//trpc/overload_control/common:report",
"//trpc/overload_control/flow_control:flow_controller",
"//trpc/overload_control/flow_control:flow_controller_conf",
"//trpc/overload_control/flow_control:flow_controller_generator",
"//trpc/overload_control/flow_control:smooth_limiter",
"//trpc/common/config:trpc_config",
"//trpc/log:trpc_log",
],
)

cc_test(
name = "window_limiter_overload_controller_filter_test",
srcs = ["window_limiter_overload_controller_filter_test.cc"],
data = [
":filter_test.yaml",
],
deps = [
":window_limiter_overload_controller",
":window_limiter_overload_controller_filter",
"//trpc/codec/testing:protocol_testing",
"//trpc/common:trpc_plugin",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "window_limiter_overload_controller_test",
srcs = ["window_limiter_overload_controller_test.cc"],
data = [
":filter_test.yaml",
],
deps = [
":window_limiter_overload_controller",
":window_limiter_overload_controller_filter",
"//trpc/codec/testing:protocol_testing",
"//trpc/common:trpc_plugin",
"@com_google_googletest//:gtest_main",
],
)
39 changes: 39 additions & 0 deletions trpc/overload_control/window_limiter_control/filter_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
global:
threadmodel:
fiber: # Use Fiber(m:n coroutine) threadmodel
- instance_name: fiber_instance # Need to be unique if you config multiple fiber threadmodel instances
# Fiber worker thread num
# If not specified, will use number of cores on the machine.
# In a Numa architecture, the workers will be automatically grouped (8-15 cores per group),
# while in Uma architecture, they will not be grouped.
concurrency_hint: 8

server:
app: test
server: helloworld
admin_port: 8888 # Start server with admin service which can manage service
admin_ip: 0.0.0.0
service:
- name: trpc.test.helloworld.Greeter
protocol: trpc # Application layer protocol, eg: trpc/http/...
network: tcp # Network type, Support two types: tcp/udp
ip: 0.0.0.0 # Service bind ip
port: 12345
filter:
- window_limiter # Service bind port

# Plugin configuration.
plugins:
overload_control:
window_limiter:
- service_name: trpc.test.helloworld.Greeter #service name.
is_report: true # Whether to report monitoring data.
service_limiter: default(10) #Service-level flow control limiter, standard format: name (maximum limit per second), empty for no limit.
window_size: 9
func_limiter: # Interface-level flow control.
- name: SayHello # Method name
limiter: smooth(5) # Interface-level flow control limiter, standard format: name (maximum limit per second), empty for no limit.
window_size: 9
- name: SayHelloAgain # Method name
limiter: smooth(5) # Interface-level flow control limiter, standard format: name (maximum limit per second), empty for no limit.
window_size: 9
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2024 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//

#ifdef TRPC_BUILD_INCLUDE_OVERLOAD_CONTROL

#include "trpc/overload_control/window_limiter_control/window_limiter_overload_controller.h"

#include <chrono>
#include <cmath>
#include <cstdint>

#include "trpc/common/config/trpc_config.h"
#include "trpc/overload_control/common/report.h"
#include "trpc/overload_control/flow_control/flow_controller_generator.h"

namespace trpc::overload_control {

void LoadWindowLimitControlConf(std::vector<FlowControlLimiterConf>& flow_control_confs) {
YAML::Node flow_control_nodes;
FlowControlLimiterConf flow_control_conf;
if (ConfigHelper::GetInstance()->GetConfig({"plugins", kOverloadCtrConfField, kWindowLimiterControlName},
flow_control_nodes)) {
for (const auto& node : flow_control_nodes) {
auto flow_control_conf = node.as<FlowControlLimiterConf>();
flow_control_confs.emplace_back(std::move(flow_control_conf));
}
}
}

bool WindowLimiterOverloadController::Init() {
std::vector<FlowControlLimiterConf> flow_control_confs;
LoadWindowLimitControlConf(flow_control_confs);
for (const auto& flow_conf : flow_control_confs) {
if (!flow_conf.service_limiter.empty()) {
FlowControllerPtr service_controller =
CreateFlowController(flow_conf.service_limiter, flow_conf.is_report, flow_conf.window_size);
if (service_controller) {
RegisterLimiter(flow_conf.service_name, service_controller);
} else {
TRPC_FMT_ERROR("create service window limiter control fail|service_name: {}, |service_limiter: {}",
flow_conf.service_name, flow_conf.service_limiter);
}
}

for (const auto& func_conf : flow_conf.func_limiters) {
if (!func_conf.limiter.empty()) {
std::string service_func_name = fmt::format("/{}/{}", flow_conf.service_name, func_conf.name);
FlowControllerPtr func_controller =
CreateFlowController(func_conf.limiter, flow_conf.is_report, func_conf.window_size);
if (func_controller) {
RegisterLimiter(service_func_name, func_controller);
} else {
TRPC_FMT_ERROR("create func window limiter control fail|service_name:{}|func_name:{}|limiter:{}",
flow_conf.service_name, func_conf.name, func_conf.limiter);
}
}
}
}
return 0;
}

bool WindowLimiterOverloadController::BeforeSchedule(const ServerContextPtr& context) {
auto service_controller = GetLimiter(context->GetCalleeName());
// func flow controller
auto func_controller = GetLimiter(context->GetFuncName());
if (!service_controller && !func_controller) {
return true;
}

// flow control strategy
if (service_controller && service_controller->CheckLimit(context)) {
context->SetStatus(
Status(TrpcRetCode::TRPC_SERVER_OVERLOAD_ERR, 0, "rejected by server window limiter overload control"));
TRPC_FMT_ERROR_EVERY_SECOND("rejected by server window limiter overload , service name: {}",
context->GetCalleeName());
return false;
}
if (func_controller && func_controller->CheckLimit(context)) {
context->SetStatus(
Status(TrpcRetCode::TRPC_SERVER_OVERLOAD_ERR, 0, "rejected by server window limiter overload control"));
TRPC_FMT_ERROR_EVERY_SECOND("rejected by server window limiter overload , service name: {}, func name: {}",
context->GetCalleeName(), context->GetFuncName());
return false;
}
return true;
}

void WindowLimiterOverloadController::Destroy() {
for (auto window_limiters_iter : window_limiters_) {
window_limiters_iter.second.reset();
}
window_limiters_.clear();
}

void WindowLimiterOverloadController::Stop(){
// nothing to do,The time thread automatically stops.
};

WindowLimiterOverloadController::WindowLimiterOverloadController() {}

void WindowLimiterOverloadController::RegisterLimiter(const std::string& name, FlowControllerPtr limiter) {
if (window_limiters_.count(name) == 0) {
window_limiters_[name] = limiter;
}
}

FlowControllerPtr WindowLimiterOverloadController::GetLimiter(const std::string& name) {
FlowControllerPtr ret = nullptr;
auto iter = window_limiters_.find(name);
if (iter != window_limiters_.end()) {
ret = iter->second;
}
return ret;
}

// The destructor does nothing, but the stop method in public needs to set the timed task to join and make it invalid
// The user must stop before calling destroy
WindowLimiterOverloadController::~WindowLimiterOverloadController() {}

} // namespace trpc::overload_control
#endif
Loading

0 comments on commit 05baf7e

Please sign in to comment.