Skip to content

Commit

Permalink
Feature: set max_wait_time when grace_stop
Browse files Browse the repository at this point in the history
  • Loading branch information
kiviyu committed Oct 11, 2023
1 parent dd6ccac commit ff8d2ed
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 14 deletions.
4 changes: 2 additions & 2 deletions docs/en/architecture_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ The request processing generally includes the following processes:

The service exit process generally includes the following processes:

1. Stop the monitoring and reading events of the server network connection;
2. Wait for all requests received by the server to be processed;
1. Wait for all requests received by the server to be processed(To avoid being unable to exit due to waiting, the default maximum waiting time is set to 5 seconds.It can be configured through server::stop_max_wait_time);
2. Stop the monitoring and reading events of the server network connection;
3. Close the server network connection;
4. Call the `Destroy` method of the `TrpcApp` business subclass to stop the dynamic resources created by the business (for example: started threads);
5. Stop the dynamic resources created by the plugins (for example: threads started inside the plugins);
Expand Down
6 changes: 1 addition & 5 deletions docs/en/framework_config_full.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ server:
admin_port: 7897
admin_ip: 0.0.0.0
admin_idle_time: 60000 #admin_idle_time
enable_self_register: false #self_register
stop_max_wait_time: 3000 #set max wait timeout(ms) when stop incase cannot stop
service:
- name: trpc.test.helloworld.Greeter
protocol: trpc
Expand All @@ -109,10 +109,6 @@ server:
send_queue_timeout: 3000 #Used in Fiber scenarios, It represents the timeout duration for the IO send queue when sending network data.
threadmodel_instance_name: default_instance
accept_thread_num: 1
service_limiter: default(100000) #Service-level flow control, format: name (limit per minute).
func_limiter: #Interface-level flow control
- name: SayHello
limiter: default(100000)
stream_max_window_size: 65535 #The default window value is 65535. 0 represents disabling flow control. Additionally, if set to a value less than 65535, it will not take effect.
stream_read_timeout: 32000 #stream_read_timeout
filter: #The filter list at the service level, only effective for the current service.
Expand Down
4 changes: 2 additions & 2 deletions docs/zh/architecture_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ Server 启动过程, 大致包括以下流程:
### 退出

服务退出过程, 大致包括以下流程:
1. 停止服务端网络连接的监听和读事件;
2. 等待服务端接收的请求都处理完成;
1. 等待服务端接收的请求都处理完成(为避免因等待无法退出,默认最多等待5秒钟,可通过yaml中的server::max_wait_time配置);
2. 停止服务端网络连接的监听和读事件;
3. 关闭服务端网络连接;
4. 调用 `TrpcApp` 业务子类的 `Destroy` 方法, 停止业务创建的动态资源(比如: 起的线程);
5. 停止插件创建的动态资源(比如: 插件内部起的线程);
Expand Down
6 changes: 1 addition & 5 deletions docs/zh/framework_config_full.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ server:
admin_port: 7897 #admin监听端口
admin_ip: ${trpc_admin_ip} #admin监听ip
admin_idle_time: 60000 #admin空闲连接清理时间,框架默认60s,如果业务注册了处理时间超过60s的逻辑,适当调大此值以得到响应
enable_self_register: false #开启框架的自注册(默认为false,不开启)
stop_max_wait_time: 3000 #设置max wait timeout(ms) 避免无法正常退出
service: #业务服务提供的service,可以有多个
- name: trpc.test.helloworld.Greeter #service名称,需要按照这里的格式填写,第一个字段默认为trpc,第二、三个字段为上边的app和server配置,第四个字段为用户定义的service_name
protocol: trpc #应用层协议:trpc http等
Expand All @@ -109,10 +109,6 @@ server:
send_queue_timeout: 3000 #Fiber场景下使用,表示发送网络数据时io发送队列的超时时间
threadmodel_instance_name: default_instance #使用的线程模型实例名,为global->threadmodel->instance_name内容
accept_thread_num: 1 #绑定端口的线程个数,如果大于1,需要指定编译选项.
service_limiter: default(100000) #service级别流量控制,格式为: name(每分钟限制个数)
func_limiter: #接口级别流量控制
- name: SayHello #接口名称
limiter: default(100000) #接口级别流量控制
stream_max_window_size: 65535 #默认窗口值为65535,0代表关闭流控,除此之外,如果设置小于65535将不会生效
stream_read_timeout: 32000 #从流上读取消息超时,单位:毫秒,默认为32000ms
filter: #service级别的filter列表,只针对当前service生效
Expand Down
1 change: 1 addition & 0 deletions trpc/common/config/server_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void ServerConfig::Display() const {
TRPC_LOG_DEBUG("registry_name:" << registry_name);
TRPC_LOG_DEBUG("enable_server_stats:" << enable_server_stats);
TRPC_LOG_DEBUG("server_stats_interval:" << server_stats_interval);
TRPC_LOG_DEBUG("stop_max_wait_time:" << stop_max_wait_time);

for (const auto& i : services_config) {
i.Display();
Expand Down
3 changes: 3 additions & 0 deletions trpc/common/config/server_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ struct ServerConfig {
/// @brief Filters used by all services
std::vector<std::string> filters;

/// @brief set max wait timeout(ms) when stop incase cannot stop
uint32_t stop_max_wait_time{5000};

void Display() const;
};

Expand Down
5 changes: 5 additions & 0 deletions trpc/common/config/server_conf_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ struct convert<trpc::ServerConfig> {
node["server_stats_interval"] = server_config.server_stats_interval;
node["filter"] = server_config.filters;
node["service"] = server_config.services_config;
node["stop_max_wait_time"] = server_config.stop_max_wait_time;

return node;
}
Expand Down Expand Up @@ -236,6 +237,10 @@ struct convert<trpc::ServerConfig> {
}
}

if (node["stop_max_wait_time"]) {
server_config.stop_max_wait_time = node["stop_max_wait_time"].as<uint32_t>();
}

return true;
}
};
Expand Down
2 changes: 2 additions & 0 deletions trpc/common/config/server_conf_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ TEST(ServerConfigTest, server_config_test) {
server_config.enable_server_stats = false;
server_config.server_stats_interval = 60000;
server_config.filters = {"tpstelemetry"};
server_config.stop_max_wait_time = 1000;

ServiceConfig service_config;
service_config.service_name = "trpc.test.helloworld.Greeter";
Expand Down Expand Up @@ -82,6 +83,7 @@ TEST(ServerConfigTest, server_config_test) {
ASSERT_EQ(server_config.enable_server_stats, tmp.enable_server_stats);
ASSERT_EQ(server_config.server_stats_interval, tmp.server_stats_interval);
ASSERT_EQ(server_config.filters[0], tmp.filters[0]);
ASSERT_EQ(server_config.stop_max_wait_time, tmp.stop_max_wait_time);

ASSERT_EQ(server_config.services_config.front().service_name, tmp.services_config.front().service_name);
ASSERT_EQ(server_config.services_config.front().network, tmp.services_config.front().network);
Expand Down
1 change: 1 addition & 0 deletions trpc/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ cc_library(
"//trpc/runtime/common/heartbeat:heartbeat_report",
"//trpc/runtime/common/stats:frame_stats",
"//trpc/runtime/threadmodel:thread_model",
"//trpc/util/chrono:time",
"//trpc/util/log:logging",
"//trpc/util/string:string_util",
],
Expand Down
7 changes: 7 additions & 0 deletions trpc/server/trpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "trpc/runtime/common/stats/frame_stats.h"
#include "trpc/runtime/runtime.h"
#include "trpc/runtime/threadmodel/thread_model.h"
#include "trpc/util/chrono/time.h"
#include "trpc/util/log/logging.h"
#include "trpc/util/string/string_util.h"

Expand Down Expand Up @@ -211,6 +212,7 @@ void TrpcServer::Stop() {
}
}

uint64_t begin_stop_time = trpc::time::GetMilliSeconds();
while (FrameStats::GetInstance()->GetServerStats().GetReqConcurrency() > 0) {
TRPC_LOG_DEBUG("current ReqConcurrency:" << FrameStats::GetInstance()->GetServerStats().GetReqConcurrency()
<< ",mso need wait until request be all handled over");
Expand All @@ -219,6 +221,11 @@ void TrpcServer::Stop() {
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

// check if wait more than max_wait_time,just break and continue stop
if (trpc::time::GetMilliSeconds() - begin_stop_time > server_config_.stop_max_wait_time) {
break;
}
}

TRPC_LOG_DEBUG(" and current ReqConcurrency:" << FrameStats::GetInstance()->GetServerStats().GetReqConcurrency());
Expand Down

0 comments on commit ff8d2ed

Please sign in to comment.