Skip to content

Commit

Permalink
refactor(conf): use DSN_DEFINE_uint32 to load uint32 type of configs (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 authored Feb 15, 2023
1 parent c37bacd commit 4c76112
Show file tree
Hide file tree
Showing 24 changed files with 197 additions and 219 deletions.
15 changes: 7 additions & 8 deletions src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ DSN_DEFINE_group_validator(min_max_level, [](std::string &message) -> bool {
}
return true;
});
DSN_DEFINE_uint32(geo_client.lib, latitude_index, 5, "latitude index in value");
DSN_DEFINE_uint32(geo_client.lib, longitude_index, 4, "longitude index in value");

struct SearchResultNearer
{
Expand Down Expand Up @@ -99,14 +101,11 @@ geo_client::geo_client(const char *config_file,
_geo_data_client = pegasus_client_factory::get_client(cluster_name, geo_app_name);
CHECK_NOTNULL(_geo_data_client, "init pegasus _geo_data_client failed");

uint32_t latitude_index = (uint32_t)dsn_config_get_value_uint64(
"geo_client.lib", "latitude_index", 5, "latitude index in value");

uint32_t longitude_index = (uint32_t)dsn_config_get_value_uint64(
"geo_client.lib", "longitude_index", 4, "longitude index in value");

dsn::error_s s = _codec.set_latlng_indices(latitude_index, longitude_index);
CHECK(s.is_ok(), "set_latlng_indices({}, {}) failed", latitude_index, longitude_index);
dsn::error_s s = _codec.set_latlng_indices(FLAGS_latitude_index, FLAGS_longitude_index);
CHECK(s.is_ok(),
"set_latlng_indices({}, {}) failed",
FLAGS_latitude_index,
FLAGS_longitude_index);
}

dsn::error_s geo_client::set_max_level(int level)
Expand Down
12 changes: 6 additions & 6 deletions src/meta/test/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ DEFINE_TASK_CODE(TASK_META_TEST, TASK_PRIORITY_COMMON, THREAD_POOL_META_TEST)

meta_service_test_app *g_app;

DSN_DEFINE_uint32(tools.simulator, random_seed, 0, "random seed");

// as it is not easy to clean test environment in some cases, we simply run these tests in several
// commands,
// please check the script "run.sh" to modify the GTEST_FILTER
Expand Down Expand Up @@ -80,13 +82,11 @@ TEST(meta, app_envs_basic_test) { g_app->app_envs_basic_test(); }

dsn::error_code meta_service_test_app::start(const std::vector<std::string> &args)
{
uint32_t seed =
(uint32_t)dsn_config_get_value_uint64("tools.simulator", "random_seed", 0, "random seed");
if (seed == 0) {
seed = time(0);
LOG_ERROR("initial seed: {}", seed);
if (FLAGS_random_seed == 0) {
FLAGS_random_seed = static_cast<uint32_t>(time(nullptr));
LOG_INFO("initial seed: {}", FLAGS_random_seed);
}
srand(seed);
srand(FLAGS_random_seed);

int argc = args.size();
char *argv[20];
Expand Down
4 changes: 0 additions & 4 deletions src/runtime/rpc/asio_net_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ error_code asio_network_provider::start(rpc_channel channel, int port, bool clie
if (_acceptor != nullptr)
return ERR_SERVICE_ALREADY_RUNNING;

// get connection threshold from config, default value 0 means no threshold
_cfg_conn_threshold_per_ip = (uint32_t)dsn_config_get_value_uint64(
"network", "conn_threshold_per_ip", 0, "max connection count to each server per ip");

for (int i = 0; i < FLAGS_io_service_worker_count; i++) {
_workers.push_back(std::make_shared<std::thread>([this, i]() {
task::set_tls_dsn_context(node(), nullptr);
Expand Down
12 changes: 8 additions & 4 deletions src/runtime/rpc/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
#include "utils/strings.h"

namespace dsn {
DSN_DEFINE_uint32(network,
conn_threshold_per_ip,
0,
"max connection count to each server per ip, 0 means no limit");

/*static*/ join_point<void, rpc_session *>
rpc_session::on_rpc_session_connected("rpc.session.connected");
/*static*/ join_point<void, rpc_session *>
Expand Down Expand Up @@ -582,7 +587,6 @@ uint32_t network::get_local_ipv4()
connection_oriented_network::connection_oriented_network(rpc_engine *srv, network *inner_provider)
: network(srv, inner_provider)
{
_cfg_conn_threshold_per_ip = 0;
_client_session_count.init_global_counter("server",
"network",
"client_session_count",
Expand Down Expand Up @@ -744,7 +748,7 @@ void connection_oriented_network::on_server_session_disconnected(rpc_session_ptr

bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_address ep)
{
if (_cfg_conn_threshold_per_ip <= 0) {
if (FLAGS_conn_threshold_per_ip <= 0) {
LOG_DEBUG("new client from {} is connecting to server {}, no connection threshold",
ep.ipv4_str(),
address());
Expand All @@ -760,7 +764,7 @@ bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_ad
ip_conn_count = it->second;
}
}
if (ip_conn_count >= _cfg_conn_threshold_per_ip) {
if (ip_conn_count >= FLAGS_conn_threshold_per_ip) {
exceeded = true;
}

Expand All @@ -769,7 +773,7 @@ bool connection_oriented_network::check_if_conn_threshold_exceeded(::dsn::rpc_ad
ep.ipv4_str(),
address(),
ip_conn_count,
_cfg_conn_threshold_per_ip);
FLAGS_conn_threshold_per_ip);

return exceeded;
}
Expand Down
1 change: 0 additions & 1 deletion src/runtime/rpc/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ class connection_oriented_network : public network
ip_connection_count _ip_conn_count; // from_ip => connection count
utils::rw_lock_nr _servers_lock;

uint32_t _cfg_conn_threshold_per_ip;
perf_counter_wrapper _client_session_count;
};

Expand Down
25 changes: 9 additions & 16 deletions src/runtime/rpc/network.sim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,17 @@
#include "utils/rand.h"
#include "runtime/node_scoper.h"
#include "network.sim.h"
#include "utils/flags.h"

namespace dsn {
namespace tools {

DSN_DEFINE_uint32(tools.simulator, min_message_delay_microseconds, 1, "min message delay (us)");
DSN_DEFINE_uint32(tools.simulator,
max_message_delay_microseconds,
100000,
"max message delay (us)");

// switch[channel][header_format]
// multiple machines connect to the same switch
// 10 should be >= than rpc_channel::max_value() + 1
Expand Down Expand Up @@ -153,20 +160,6 @@ sim_network_provider::sim_network_provider(rpc_engine *rpc, network *inner_provi
: connection_oriented_network(rpc, inner_provider)
{
_address.assign_ipv4("localhost", 1);

_min_message_delay_microseconds = 1;
_max_message_delay_microseconds = 100000;

_min_message_delay_microseconds =
(uint32_t)dsn_config_get_value_uint64("tools.simulator",
"min_message_delay_microseconds",
_min_message_delay_microseconds,
"min message delay (us)");
_max_message_delay_microseconds =
(uint32_t)dsn_config_get_value_uint64("tools.simulator",
"max_message_delay_microseconds",
_max_message_delay_microseconds,
"max message delay (us)");
}

error_code sim_network_provider::start(rpc_channel channel, int port, bool client_only)
Expand Down Expand Up @@ -194,8 +187,8 @@ error_code sim_network_provider::start(rpc_channel channel, int port, bool clien

uint32_t sim_network_provider::net_delay_milliseconds() const
{
return static_cast<uint32_t>(
rand::next_u32(_min_message_delay_microseconds, _max_message_delay_microseconds)) /
return rand::next_u32(FLAGS_min_message_delay_microseconds,
FLAGS_max_message_delay_microseconds) /
1000;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/runtime/rpc/network.sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ class sim_network_provider : public connection_oriented_network

private:
::dsn::rpc_address _address;
uint32_t _min_message_delay_microseconds;
uint32_t _max_message_delay_microseconds;
};

//------------- inline implementations -------------
Expand Down
4 changes: 3 additions & 1 deletion src/runtime/rpc/rpc_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
#include "runtime/rpc/serialization.h"
#include "utils/rand.h"
#include <set>
#include "utils/flags.h"

namespace dsn {
DSN_DECLARE_uint32(local_hash);

DEFINE_TASK_CODE(LPC_RPC_TIMEOUT, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)

Expand Down Expand Up @@ -735,7 +737,7 @@ void rpc_engine::reply(message_ex *response, error_code err)
sizeof(response->header->server.error_name) - 1);
response->header->server.error_name[sizeof(response->header->server.error_name) - 1] = '\0';
response->header->server.error_code.local_code = err;
response->header->server.error_code.local_hash = message_ex::s_local_hash;
response->header->server.error_code.local_hash = FLAGS_local_hash;

// response rpc code may be TASK_CODE_INVALID when request rpc code is not exist
auto sp = response->local_rpc_code == TASK_CODE_INVALID
Expand Down
23 changes: 15 additions & 8 deletions src/runtime/rpc/rpc_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@
#include <cctype>

#include "runtime/task/task_engine.h"
#include "utils/flags.h"

using namespace dsn::utils;

namespace dsn {
// init common for all per-node providers
DSN_DEFINE_uint32(core,
local_hash,
0,
"a same hash value from two processes indicate the rpc codes are registered in "
"the same order, and therefore the mapping between rpc code string and integer "
"is the same, which we leverage for fast rpc handler lookup optimization");

std::atomic<uint64_t> message_ex::_id(0);
uint32_t message_ex::s_local_hash = 0;

message_ex::message_ex()
: header(nullptr),
Expand Down Expand Up @@ -82,11 +89,11 @@ error_code message_ex::error()
{
dsn::error_code code;
auto binary_hash = header->server.error_code.local_hash;
if (binary_hash != 0 && binary_hash == ::dsn::message_ex::s_local_hash) {
if (binary_hash != 0 && binary_hash == FLAGS_local_hash) {
code = dsn::error_code(header->server.error_code.local_code);
} else {
code = error_code::try_get(header->server.error_name, dsn::ERR_UNKNOWN);
header->server.error_code.local_hash = ::dsn::message_ex::s_local_hash;
header->server.error_code.local_hash = FLAGS_local_hash;
header->server.error_code.local_code = code;
}
return code;
Expand All @@ -99,11 +106,11 @@ task_code message_ex::rpc_code()
}

auto binary_hash = header->rpc_code.local_hash;
if (binary_hash != 0 && binary_hash == ::dsn::message_ex::s_local_hash) {
if (binary_hash != 0 && binary_hash == FLAGS_local_hash) {
local_rpc_code = dsn::task_code(header->rpc_code.local_code);
} else {
local_rpc_code = dsn::task_code::try_get(header->rpc_name, ::dsn::TASK_CODE_INVALID);
header->rpc_code.local_hash = ::dsn::message_ex::s_local_hash;
header->rpc_code.local_hash = FLAGS_local_hash;
header->rpc_code.local_code = local_rpc_code.code();
}

Expand Down Expand Up @@ -307,7 +314,7 @@ message_ex *message_ex::create_request(dsn::task_code rpc_code,
strncpy(hdr.rpc_name, sp->name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = (uint32_t)rpc_code;
hdr.rpc_code.local_hash = s_local_hash;
hdr.rpc_code.local_hash = FLAGS_local_hash;

hdr.id = new_id();

Expand Down Expand Up @@ -348,7 +355,7 @@ message_ex *message_ex::create_response()
strncpy(hdr.rpc_name, response_sp->name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = msg->local_rpc_code;
hdr.rpc_code.local_hash = s_local_hash;
hdr.rpc_code.local_hash = FLAGS_local_hash;

// join point
request_sp->on_rpc_create_response.execute(this, msg);
Expand All @@ -359,7 +366,7 @@ message_ex *message_ex::create_response()
strncpy(hdr.rpc_name, ack_rpc_name.c_str(), sizeof(hdr.rpc_name) - 1);
hdr.rpc_name[sizeof(hdr.rpc_name) - 1] = '\0';
hdr.rpc_code.local_code = TASK_CODE_INVALID;
hdr.rpc_code.local_hash = s_local_hash;
hdr.rpc_code.local_hash = FLAGS_local_hash;
}

return msg;
Expand Down
3 changes: 0 additions & 3 deletions src/runtime/rpc/rpc_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,6 @@ class message_ex : public ref_counter, public extensible_object<message_ex, 4>
int _rw_offset; // current buffer offset
bool _rw_committed; // mark if it is in middle state of reading/writing
bool _is_read; // is for read(recv) or write(send)

public:
static uint32_t s_local_hash; // used by fast_rpc_name
};
typedef dsn::ref_ptr<message_ex> message_ptr;

Expand Down
16 changes: 1 addition & 15 deletions src/runtime/service_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,21 +194,7 @@ service_engine::service_engine()

service_engine::~service_engine() { _nodes_by_app_id.clear(); }

void service_engine::init_before_toollets(const service_spec &spec)
{
_spec = spec;

// init common for all per-node providers
message_ex::s_local_hash =
(uint32_t)dsn_config_get_value_uint64("core",
"local_hash",
0,
"a same hash value from two processes indicate the "
"rpc code are registered in the same order, "
"and therefore the mapping between rpc code string "
"and integer is the same, which we leverage "
"for fast rpc handler lookup optimization");
}
void service_engine::init_before_toollets(const service_spec &spec) { _spec = spec; }

void service_engine::init_after_toollets()
{
Expand Down
Loading

0 comments on commit 4c76112

Please sign in to comment.