diff --git a/Makefile b/Makefile index d711e56..ac0d2f4 100644 --- a/Makefile +++ b/Makefile @@ -16,18 +16,18 @@ all:main.d core_objs utilities libs_3rdparty runtest:core_objs utilities libs_3rdparty rm -f tmp.*.txt - make -f test/Makefile MODE=$(MODE) COMPILER=$(COMPILER) \ - CHECK_MEM=$(CHECK_MEM) + @make -f test/Makefile MODE=$(MODE) COMPILER=$(COMPILER) \ + CHECK_MEM=$(CHECK_MEM) utilities: - make -f utils/Makefile MODE=$(MODE) COMPILER=$(COMPILER) + @make -f utils/Makefile MODE=$(MODE) COMPILER=$(COMPILER) core_objs: - make -f core/Makefile MODE=$(MODE) COMPILER=$(COMPILER) + @make -f core/Makefile MODE=$(MODE) COMPILER=$(COMPILER) libs_3rdparty: - mkdir -p $(LIBS_DIR) - make -f backtracpp/Makefile LIB_DIR=$(LIBS_DIR) REL_PATH=backtracpp + @mkdir -p $(LIBS_DIR) + @make -f backtracpp/Makefile LIB_DIR=$(LIBS_DIR) REL_PATH=backtracpp clean: rm -f tmp.* diff --git a/README.md b/README.md index 50391a1..48930a0 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ The argument is path of a configuration file, which should contains at least * bind : (integer) local port to listen * node : (address) one of active node in a cluster; format should be host:port * thread: (integer) number of threads -* read-slave: (optional, default off) set to "1" to turn on read slave mode. A proxy in read-slave mode won't support writing commands like `SET`, `INCR`, `PUBLISH`, and it would select slave nodes for reading commands if possible. +* read-slave: (optional, default off) set to "1" to turn on read slave mode. A proxy in read-slave mode won't support writing commands like `SET`, `INCR`, `PUBLISH`, and it would select slave nodes for reading commands if possible. For more information please read [here (CN)](https://github.com/HunanTV/redis-cerberus/wiki/%E8%AF%BB%E5%86%99%E5%88%86%E7%A6%BB). Commands in Particular === @@ -83,14 +83,3 @@ others: `PFADD`, `PFCOUNT`, `PFMERGE`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SLOWLOG`, `SYNC`, `TIME`, For more information please read [here (CN)](https://github.com/HunanTV/redis-cerberus/wiki/Redis-%E9%9B%86%E7%BE%A4%E4%BB%A3%E7%90%86%E5%9F%BA%E6%9C%AC%E5%8E%9F%E7%90%86%E4%B8%8E%E4%BD%BF%E7%94%A8). - -Readonly Commands ---- - -Only the following commands are allowed when a proxy runs with "read-slave" mode - - DUMP EXISTS TTL PTTL TYPE GET BITCOUNT GETBIT GETRANGE STRLEN - HGET HGETALL HKEYS HVALS HLEN HEXISTS HMGET HSCAN - LINDEX LLEN LRANGE SCARD SISMEMBER SMEMBERS SSCAN - ZCARD ZSCAN ZCOUNT ZLEXCOUNT ZRANGE ZRANGEBYLEX ZREVRANGEBYLEX - ZRANGEBYSCORE ZRANK ZREVRANGE ZREVRANGEBYSCORE ZREVRANK ZSCORE diff --git a/common.hpp b/common.hpp index ca69b25..e6961d5 100644 --- a/common.hpp +++ b/common.hpp @@ -5,7 +5,7 @@ #include "utils/typetraits.hpp" -#define VERSION "0.6.0-2015-04-14" +#define VERSION "0.6.4-2015-04-30" namespace cerb { @@ -18,6 +18,8 @@ namespace cerb { typedef Clock::time_point Time; typedef std::chrono::duration Interval; + int const CLUSTER_SLOT_COUNT = 16384; + } #endif /* __CERBERUS_COMMON_HPP__ */ diff --git a/core/client.cpp b/core/client.cpp index 7e0f5e6..4b3e3a5 100644 --- a/core/client.cpp +++ b/core/client.cpp @@ -36,8 +36,8 @@ void Client::on_events(int events) } } catch (BadRedisMessage& e) { LOG(ERROR) << "Receive bad message from client " << this->fd - << " because: " << e.what() - << " dump buffer (before close): " + << " because: " << e.what(); + LOG(DEBUG) << "Dump buffer (before close): " << this->_buffer.to_string(); return this->close(); } @@ -66,6 +66,9 @@ void Client::_write_response() g->append_buffer_to(buffer_arr); } Buffer::writev(this->fd, buffer_arr); + for (auto const& g: this->_ready_groups) { + g->collect_stats(this->_proxy); + } this->_ready_groups.clear(); this->_peers.clear(); @@ -169,11 +172,6 @@ void Client::add_peer(Server* svr) this->_peers.insert(svr); } -void Client::stat_proccessed(Interval cmd_elapse) -{ - _proxy->stat_proccessed(cmd_elapse); -} - void Client::push_command(util::sptr g) { this->_parsed_groups.push_back(std::move(g)); diff --git a/core/client.hpp b/core/client.hpp index 3ab8425..e77dea1 100644 --- a/core/client.hpp +++ b/core/client.hpp @@ -43,7 +43,6 @@ namespace cerb { void add_peer(Server* svr); void reactivate(util::sref cmd); void push_command(util::sptr g); - void stat_proccessed(Interval cmd_elapse); }; } diff --git a/core/command.cpp b/core/command.cpp index 0153af1..023f69b 100644 --- a/core/command.cpp +++ b/core/command.cpp @@ -13,6 +13,7 @@ #include "globals.hpp" #include "utils/logging.hpp" #include "utils/random.hpp" +#include "utils/string.h" using namespace cerb; @@ -21,7 +22,7 @@ namespace { std::string const RSP_OK_STR("+OK\r\n"); Buffer RSP_OK(Buffer::from_string(RSP_OK_STR)); - Server* select_server_for(Proxy* proxy, Command* cmd, slot key_slot) + Server* select_server_for(Proxy* proxy, DataCommand* cmd, slot key_slot) { Server* svr = proxy->get_server_by_slot(key_slot); if (svr == nullptr) { @@ -34,12 +35,12 @@ namespace { } class OneSlotCommand - : public Command + : public DataCommand { slot const key_slot; public: OneSlotCommand(Buffer b, util::sref g, slot ks) - : Command(std::move(b), g, true) + : DataCommand(std::move(b), g) , key_slot(ks) { LOG(DEBUG) << "-Keyslot = " << this->key_slot; @@ -52,7 +53,7 @@ namespace { }; class MultiStepsCommand - : public Command + : public DataCommand { public: slot current_key_slot; @@ -60,7 +61,7 @@ namespace { MultiStepsCommand(util::sref group, slot s, std::function r) - : Command(group, true) + : DataCommand(group) , current_key_slot(s) , on_rsp(std::move(r)) {} @@ -70,7 +71,7 @@ namespace { return ::select_server_for(proxy, this, this->current_key_slot); } - void copy_response(Buffer rsp, bool error) + void on_remote_responsed(Buffer rsp, bool error) { on_rsp(std::move(rsp), error); } @@ -84,7 +85,7 @@ namespace { { public: DirectCommand(Buffer b, util::sref g) - : Command(std::move(b), g, false) + : Command(std::move(b), g) {} Server* select_server(Proxy*) @@ -131,13 +132,6 @@ namespace { class StatsCommandGroup : public CommandGroup { - public: - ~StatsCommandGroup() - { - if (this->complete) { - this->client->stat_proccessed(Clock::now() - this->creation); - } - } protected: explicit StatsCommandGroup(util::sref cli) : CommandGroup(cli) @@ -152,13 +146,21 @@ namespace { { return true; } + + void collect_stats(Proxy* p) const + { + p->stat_proccessed(Clock::now() - this->creation, + this->avg_commands_remote_cost()); + } + + virtual Interval avg_commands_remote_cost() const = 0; }; class SingleCommandGroup : public StatsCommandGroup { public: - util::sptr command; + util::sptr command; explicit SingleCommandGroup(util::sref cli) : StatsCommandGroup(cli) @@ -190,6 +192,11 @@ namespace { { command->select_server(proxy); } + + Interval avg_commands_remote_cost() const + { + return command->remote_cost(); + } }; class MultipleCommandsGroup @@ -197,7 +204,7 @@ namespace { { public: Buffer arr_payload; - std::vector> commands; + std::vector> commands; int awaiting_count; explicit MultipleCommandsGroup(util::sref c) @@ -205,11 +212,9 @@ namespace { , awaiting_count(0) {} - void append_command(util::sptr c) + void append_command(util::sptr c) { - if (c->need_send) { - awaiting_count += 1; - } + awaiting_count += 1; commands.push_back(std::move(c)); } @@ -229,7 +234,7 @@ namespace { void append_buffer_to(std::vector>& b) { b.push_back(util::mkref(arr_payload)); - for (auto const& c: commands) { + for (auto const& c: this->commands) { b.push_back(util::mkref(c->buffer)); } } @@ -237,11 +242,9 @@ namespace { int total_buffer_size() const { int i = arr_payload.size(); - std::for_each(commands.begin(), commands.end(), - [&](util::sptr const& command) - { - i += command->buffer.size(); - }); + for (auto const& c: this->commands) { + i += c->buffer.size(); + } return i; } @@ -251,6 +254,19 @@ namespace { c->select_server(proxy); } } + + Interval avg_commands_remote_cost() const + { + if (this->commands.empty()) { + return Interval(0); + } + return std::accumulate( + this->commands.begin(), this->commands.end(), Interval(0), + [](Interval a, util::sptr const& c) + { + return a + c->remote_cost(); + }) / this->commands.size(); + } }; class LongCommandGroup @@ -323,7 +339,7 @@ namespace { virtual util::sptr spawn_commands( util::sref c, Buffer::iterator end) = 0; - SpecialCommandParser() {} + SpecialCommandParser() = default; SpecialCommandParser(SpecialCommandParser const&) = delete; }; @@ -568,25 +584,25 @@ namespace { { if (error) { this->buffer = std::move(rsp); - return this->group->command_responsed(); + return this->responsed(); } if (rsp.same_as_string("$-1\r\n")) { this->buffer = Buffer::from_string( "-ERR no such key\r\n"); - return this->group->command_responsed(); + return this->responsed(); } this->buffer = Buffer::from_string("*3\r\n$3\r\nSET\r\n"); this->buffer.append_from(new_key.begin(), new_key.end()); this->buffer.append_from(rsp.begin(), rsp.end()); this->current_key_slot = new_key_slot; this->on_rsp = - [&](Buffer rsp, bool error) + [this](Buffer rsp, bool error) { if (error) { this->buffer = std::move(rsp); - return this->group->command_responsed(); + return this->responsed(); } - rsp_set(); + this->rsp_set(); }; this->group->client->reactivate(util::mkref(*this)); } @@ -597,10 +613,10 @@ namespace { this->buffer.append_from(old_key.begin(), old_key.end()); this->current_key_slot = old_key_slot; this->on_rsp = - [&](Buffer, bool) + [this](Buffer, bool) { this->buffer = Buffer::from_string("+OK\r\n"); - this->group->command_responsed(); + this->responsed(); }; this->group->client->reactivate(util::mkref(*this)); } @@ -995,6 +1011,14 @@ namespace { void on_arr(cerb::rint size, Buffer::iterator i) { + /* + * Redis server will reset a request of more than 1M args. + * See also + * https://github.com/antirez/redis/blob/3.0/src/networking.c#L1014 + */ + if (size > 1024 * 1024) { + throw BadRedisMessage("Request is too large"); + } if (!_nested_array_element_count.empty()) { throw BadRedisMessage("Invalid nested array as client command"); } @@ -1009,9 +1033,14 @@ namespace { } -void Command::copy_response(Buffer rsp, bool) +void Command::on_remote_responsed(Buffer rsp, bool) { this->buffer = std::move(rsp); + this->responsed(); +} + +void Command::responsed() +{ this->group->command_responsed(); } diff --git a/core/command.hpp b/core/command.hpp index ce3848a..763c133 100644 --- a/core/command.hpp +++ b/core/command.hpp @@ -18,22 +18,21 @@ namespace cerb { public: Buffer buffer; util::sref const group; - bool const need_send; - virtual ~Command() {} + virtual ~Command() = default; virtual Server* select_server(Proxy* proxy) = 0; - virtual void copy_response(Buffer rsp, bool error); + virtual void on_remote_responsed(Buffer rsp, bool error); - Command(Buffer b, util::sref g, bool s) + void responsed(); + + Command(Buffer b, util::sref g) : buffer(std::move(b)) , group(g) - , need_send(s) {} - Command(util::sref g, bool s) + explicit Command(util::sref g) : group(g) - , need_send(s) {} Command(Command const&) = delete; @@ -41,16 +40,37 @@ namespace cerb { static void allow_write_commands(); }; + class DataCommand + : public Command + { + public: + DataCommand(Buffer b, util::sref g) + : Command(std::move(b), g) + {} + + explicit DataCommand(util::sref g) + : Command(g) + {} + + Time sent_time; + Time resp_time; + + Interval remote_cost() const + { + return resp_time - sent_time; + } + }; + class CommandGroup { public: util::sref const client; - CommandGroup(util::sref cli) + explicit CommandGroup(util::sref cli) : client(cli) {} CommandGroup(CommandGroup const&) = delete; - virtual ~CommandGroup() {} + virtual ~CommandGroup() = default; virtual bool long_connection() const { @@ -63,6 +83,7 @@ namespace cerb { virtual void append_buffer_to(std::vector>& b) = 0; virtual int total_buffer_size() const = 0; virtual void command_responsed() = 0; + virtual void collect_stats(Proxy*) const {} }; void split_client_command(Buffer& buffer, util::sref cli); diff --git a/core/proxy.cpp b/core/proxy.cpp index f434747..b172c49 100644 --- a/core/proxy.cpp +++ b/core/proxy.cpp @@ -86,13 +86,13 @@ void SlotsMapUpdater::on_events(int events) try { this->_recv_rsp(); } catch (BadRedisMessage& e) { - LOG(FATAL) << "Receive bad message from server on update from " + LOG(ERROR) << "Receive bad message from server on update from " << this->fd << " because: " << e.what() - << " buffer length=" << this->_rsp.size() - << " dump buffer (before close): " + << " buffer length=" << this->_rsp.size(); + LOG(DEBUG) << "Dump buffer (before close): " << this->_rsp.to_string(); - exit(1); + _proxy->notify_slot_map_updated(); } } if (events & EPOLLOUT) { @@ -110,7 +110,10 @@ Proxy::Proxy(util::Address const& remote) : _clients_count(0) , _active_slot_updaters_count(0) , _total_cmd_elapse(0) + , _total_remote_cost(0) , _total_cmd(0) + , _last_cmd_elapse(0) + , _last_remote_cost(0) , _slot_map_expired(false) , epfd(epoll_create(MAX_EVENTS)) { @@ -168,7 +171,7 @@ void Proxy::_set_slot_map(std::vector map) { for (Server* s: _server_map.replace_map(map, this)) { LOG(DEBUG) << "Replaced server " << s; - std::vector> c(s->deliver_commands()); + std::vector> c(s->deliver_commands()); this->_retrying_commands.insert(_retrying_commands.end(), c.begin(), c.end()); this->_inactive_long_connections.insert( s->attached_long_connections.begin(), s->attached_long_connections.end()); @@ -182,8 +185,8 @@ void Proxy::_set_slot_map(std::vector map) LOG(DEBUG) << "Retry MOVED or ASK: " << _retrying_commands.size(); std::set svrs; - std::vector> retrying(std::move(_retrying_commands)); - for (util::sref cmd: retrying) { + std::vector> retrying(std::move(_retrying_commands)); + for (util::sref cmd: retrying) { Server* s = cmd->select_server(this); if (s == nullptr) { LOG(ERROR) << "Select null server after slot map updated"; @@ -245,7 +248,7 @@ bool Proxy::_should_update_slot_map() const (!_retrying_commands.empty() || _slot_map_expired); } -void Proxy::retry_move_ask_command_later(util::sref cmd) +void Proxy::retry_move_ask_command_later(util::sref cmd) { _retrying_commands.push_back(cmd); LOG(DEBUG) << "A MOVED or ASK added for later retry - " << _retrying_commands.size(); @@ -354,15 +357,18 @@ void Proxy::pop_client(Client* cli) { util::erase_if( this->_retrying_commands, - [&](util::sref cmd) + [&](util::sref cmd) { return cmd->group->client.is(cli); }); --this->_clients_count; } -void Proxy::stat_proccessed(Interval cmd_elapse) +void Proxy::stat_proccessed(Interval cmd_elapse, Interval remote_cost) { _total_cmd_elapse += cmd_elapse; ++_total_cmd; + _last_cmd_elapse = cmd_elapse; + _total_remote_cost += remote_cost; + _last_remote_cost = remote_cost; } diff --git a/core/proxy.hpp b/core/proxy.hpp index 57a0e06..425f8f0 100644 --- a/core/proxy.hpp +++ b/core/proxy.hpp @@ -63,10 +63,13 @@ namespace cerb { std::vector> _slot_updaters; std::vector> _finished_slot_updaters; int _active_slot_updaters_count; - std::vector> _retrying_commands; + std::vector> _retrying_commands; std::set _inactive_long_connections; Interval _total_cmd_elapse; + Interval _total_remote_cost; long _total_cmd; + Interval _last_cmd_elapse; + Interval _last_remote_cost; bool _slot_map_expired; bool _should_update_slot_map() const; @@ -97,6 +100,21 @@ namespace cerb { return _total_cmd_elapse; } + Interval total_remote_cost() const + { + return _total_remote_cost; + } + + Interval last_cmd_elapse() const + { + return _last_cmd_elapse; + } + + Interval last_remote_cost() const + { + return _last_remote_cost; + } + Server* random_addr() { return _server_map.random_addr(); @@ -105,11 +123,11 @@ namespace cerb { Server* get_server_by_slot(slot key_slot); void notify_slot_map_updated(); void update_slot_map(); - void retry_move_ask_command_later(util::sref cmd); + void retry_move_ask_command_later(util::sref cmd); void run(int listen_port); void accept_from(int listen_fd); void pop_client(Client* cli); - void stat_proccessed(Interval cmd_elapse); + void stat_proccessed(Interval cmd_elapse, Interval remote_cost); }; } diff --git a/core/response.cpp b/core/response.cpp index 59ce00b..e73455e 100644 --- a/core/response.cpp +++ b/core/response.cpp @@ -22,9 +22,9 @@ namespace { , error(e) {} - void rsp_to(util::sref cmd, util::sref) + void rsp_to(util::sref cmd, util::sref) { - cmd->copy_response(std::move(this->rsp), error); + cmd->on_remote_responsed(std::move(this->rsp), error); } Buffer const& dump_buffer() const @@ -38,7 +38,7 @@ namespace { { static Buffer const dump; public: - void rsp_to(util::sref cmd, util::sref p) + void rsp_to(util::sref cmd, util::sref p) { p->retry_move_ask_command_later(cmd); } diff --git a/core/response.hpp b/core/response.hpp index 9fee40c..92c5369 100644 --- a/core/response.hpp +++ b/core/response.hpp @@ -8,7 +8,7 @@ namespace cerb { - class Command; + class DataCommand; class Proxy; class Response { @@ -17,7 +17,7 @@ namespace cerb { virtual ~Response() {} Response(Response const&) = delete; - virtual void rsp_to(util::sref c, util::sref p) = 0; + virtual void rsp_to(util::sref c, util::sref p) = 0; virtual Buffer const& dump_buffer() const = 0; }; diff --git a/core/server.cpp b/core/server.cpp index 8a9b2db..688eb69 100644 --- a/core/server.cpp +++ b/core/server.cpp @@ -46,10 +46,14 @@ void Server::_send_to() std::vector> buffer_arr; this->_ready_commands = std::move(this->_commands); buffer_arr.reserve(this->_ready_commands.size()); - for (auto const& c: this->_ready_commands) { + for (util::sref c: this->_ready_commands) { buffer_arr.push_back(util::mkref(c->buffer)); } Buffer::writev(this->fd, buffer_arr); + auto now = Clock::now(); + for (util::sref c: this->_ready_commands) { + c->sent_time = now; + } struct epoll_event ev; ev.events = EPOLLIN | EPOLLET; @@ -73,27 +77,25 @@ void Server::_recv_from() if (responses.size() > this->_ready_commands.size()) { LOG(ERROR) << "+Error on split, expected size: " << this->_ready_commands.size() << " actual: " << responses.size() << " dump buffer:"; - std::for_each(responses.begin(), responses.end(), - [](util::sptr const& rsp) - { - LOG(ERROR) << "::: " << rsp->dump_buffer().to_string(); - }); + for (util::sptr const& rsp: responses) { + LOG(ERROR) << "::: " << rsp->dump_buffer().to_string(); + } LOG(ERROR) << "Rest buffer: " << this->_buffer.to_string(); LOG(FATAL) << "Exit"; exit(1); } LOG(DEBUG) << "+responses size: " << responses.size(); LOG(DEBUG) << "+rest buffer: " << this->_buffer.size() << ": " << this->_buffer.to_string(); - auto client_it = this->_ready_commands.begin(); - std::for_each(responses.begin(), responses.end(), - [&](util::sptr& rsp) - { - util::sref c = *client_it++; - if (c.not_nul()) { - rsp->rsp_to(c, util::mkref(*this->_proxy)); - } - }); - this->_ready_commands.erase(this->_ready_commands.begin(), client_it); + auto cmd_it = this->_ready_commands.begin(); + auto now = Clock::now(); + for (util::sptr& rsp: responses) { + util::sref c = *cmd_it++; + if (c.not_nul()) { + rsp->rsp_to(c, util::mkref(*this->_proxy)); + c->resp_time = now; + } + } + this->_ready_commands.erase(this->_ready_commands.begin(), cmd_it); struct epoll_event ev; ev.events = EPOLLIN | EPOLLOUT | EPOLLET; ev.data.ptr = this; @@ -102,7 +104,7 @@ void Server::_recv_from() } } -void Server::push_client_command(util::sref cmd) +void Server::push_client_command(util::sref cmd) { _commands.push_back(cmd); cmd->group->client->add_peer(this); @@ -112,24 +114,22 @@ void Server::pop_client(Client* cli) { util::erase_if( this->_commands, - [&](util::sref cmd) + [&](util::sref cmd) { return cmd->group->client.is(cli); }); - std::for_each(this->_ready_commands.begin(), this->_ready_commands.end(), - [&](util::sref& cmd) - { - if (cmd.not_nul() && cmd->group->client.is(cli)) { - cmd.reset(); - } - }); + for (util::sref& cmd: this->_ready_commands) { + if (cmd.not_nul() && cmd->group->client.is(cli)) { + cmd.reset(); + } + } } -std::vector> Server::deliver_commands() +std::vector> Server::deliver_commands() { util::erase_if( this->_ready_commands, - [](util::sref cmd) + [](util::sref cmd) { return cmd.nul(); }); @@ -165,8 +165,8 @@ std::map::iterator Server::addr_end() return servers_map.end(); } -static std::function>&)> on_server_connected( - [](int, std::vector>&) {}); +static std::function>&)> on_server_connected( + [](int, std::vector>&) {}); void Server::_reconnect(util::Address const& addr, Proxy* p) { @@ -230,9 +230,9 @@ static Buffer const READ_ONLY_CMD(Buffer::from_string("READONLY\r\n")); void Server::send_readonly_for_each_conn() { ::on_server_connected = - [](int fd, std::vector>& cmds) + [](int fd, std::vector>& cmds) { READ_ONLY_CMD.write(fd); - cmds.push_back(util::sref(nullptr)); + cmds.push_back(util::sref(nullptr)); }; } diff --git a/core/server.hpp b/core/server.hpp index 836eeda..599d2cc 100644 --- a/core/server.hpp +++ b/core/server.hpp @@ -13,7 +13,7 @@ namespace cerb { class Proxy; class Client; - class Command; + class DataCommand; class Server : public ProxyConnection @@ -21,8 +21,8 @@ namespace cerb { Proxy* _proxy; Buffer _buffer; - std::vector> _commands; - std::vector> _ready_commands; + std::vector> _commands; + std::vector> _ready_commands; void _send_to(); void _recv_from(); @@ -50,9 +50,9 @@ namespace cerb { void on_events(int events); void after_events(std::set&); - void push_client_command(util::sref cmd); + void push_client_command(util::sref cmd); void pop_client(Client* cli); - std::vector> deliver_commands(); + std::vector> deliver_commands(); void attach_long_connection(ProxyConnection* c) { diff --git a/core/slot_map.hpp b/core/slot_map.hpp index c81bf5e..a13ccde 100644 --- a/core/slot_map.hpp +++ b/core/slot_map.hpp @@ -9,8 +9,6 @@ namespace cerb { - int const CLUSTER_SLOT_COUNT = 16384; - class Server; class Proxy; diff --git a/core/stats.cpp b/core/stats.cpp index aac4ccd..bbd4ae3 100644 --- a/core/stats.cpp +++ b/core/stats.cpp @@ -1,5 +1,5 @@ -#include "core/stats.hpp" -#include "core/globals.hpp" +#include "stats.hpp" +#include "globals.hpp" #include "utils/string.h" using namespace cerb; @@ -10,24 +10,32 @@ std::string cerb::stats_all() { std::vector clients_counts; std::vector mem_buffer_allocs; - std::vector avg_cmd_elapse; + std::vector last_cmd_elapse; + std::vector last_remote_cost; long total_commands = 0; Interval total_cmd_elapse(0); + Interval total_remote_cost(0); for (auto const& thread: cerb_global::all_threads) { util::sref proxy(thread.get_proxy()); clients_counts.push_back(util::str(proxy->clients_count())); total_commands += proxy->total_cmd(); total_cmd_elapse += proxy->total_cmd_elapse(); + total_remote_cost += proxy->total_remote_cost(); mem_buffer_allocs.push_back(util::str(thread.buffer_allocated())); + last_cmd_elapse.push_back(util::str(proxy->last_cmd_elapse())); + last_remote_cost.push_back(util::str(proxy->last_remote_cost())); } return util::join("", { "version:" VERSION "\nthreads:", util::str(msize_t(cerb_global::all_threads.size())), + "\nread_slave:", ::read_slave ? "1" : "0", "\nclients_count:", util::join(",", clients_counts), "\nmem_buffer_alloc:", util::join(",", mem_buffer_allocs), "\ncompleted_commands:", util::str(total_commands), "\ntotal_process_elapse:", util::str(total_cmd_elapse), - "\nread_slave:", ::read_slave ? "1" : "0", + "\ntotal_remote_cost:", util::str(total_remote_cost), + "\nlast_command_elapse:", util::join(",", last_cmd_elapse), + "\nlast_remote_cost:", util::join(",", last_remote_cost), }); } diff --git a/misc/mf-template.mk b/misc/mf-template.mk index 0f3c0d6..cb9c7b3 100644 --- a/misc/mf-template.mk +++ b/misc/mf-template.mk @@ -26,13 +26,13 @@ COMPILE=$(CC) $(CFLAGS) $(INCLUDE) COMPILE_GENERATED=$(CC) $(INCLUDE) %.d:$(WORKDIR)/%.cpp - echo -n "$(WORKDIR)/" > $(MKTMP) - $(RESOLVE_DEP) $< >> $(MKTMP) - echo " $(COMPILE) $< -o $(WORKDIR)/$*.o" >> $(MKTMP) - make -f $(MKTMP) + @echo -n "$(WORKDIR)/" > $(MKTMP) + @$(RESOLVE_DEP) $< >> $(MKTMP) + @echo " $(COMPILE) $< -o $(WORKDIR)/$*.o" >> $(MKTMP) + @make -f $(MKTMP) %.dt:$(TESTDIR)/%.cpp - echo -n "$(TESTDIR)/" > $(MKTMP) - $(RESOLVE_DEP) $< >> $(MKTMP) - echo " $(COMPILE) $< -o $(TESTDIR)/$*.o" >> $(MKTMP) - make -f $(MKTMP) + @echo -n "$(TESTDIR)/" > $(MKTMP) + @$(RESOLVE_DEP) $< >> $(MKTMP) + @echo " $(COMPILE) $< -o $(TESTDIR)/$*.o" >> $(MKTMP) + @make -f $(MKTMP) diff --git a/test/Makefile b/test/Makefile index 5c2034f..0b5dd2d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -9,13 +9,29 @@ else VALGRIND= endif -test:$(TESTDIR)/test.out - $(VALGRIND) $(TESTDIR)/test.out +MOCK_OBJS=$(TESTDIR)/mock-stats.o + +test:util-test slot-map-test + @echo "======================" + @echo "| Test done _(:3J<)_ |" + @echo "======================" -$(TESTDIR)/test.out:message.dt response.dt buffer.dt slot_calc.dt slot_map.dt - $(LINK) $(TESTDIR)/*.o utils/*.o core/*.o \ - $(TEST_LIBS) \ +util-test:message.dt response.dt buffer.dt slot_calc.dt + $(LINK) $(TESTDIR)/message.o $(TESTDIR)/response.o $(TESTDIR)/buffer.o \ + $(TESTDIR)/slot_calc.o \ + utils/*.o core/*.o $(TEST_LIBS) \ -o $(TESTDIR)/test.out + $(VALGRIND) $(TESTDIR)/test.out + +slot-map-test:slot_map.dt mock-server.dt mock-suit + $(LINK) $(TESTDIR)/mock-server.o $(TESTDIR)/slot_map.o utils/*.o \ + core/slot_map.o core/exceptions.o core/connection.o core/fdutil.o \ + $(TEST_LIBS) $(MOCK_OBJS) \ + -o $(TESTDIR)/test-slot-map.out + $(VALGRIND) $(TESTDIR)/test-slot-map.out + +mock-suit:mock-stats.dt + @true clean: rm -f $(TESTDIR)/*.o diff --git a/test/mock-server.cpp b/test/mock-server.cpp new file mode 100644 index 0000000..b6b7dde --- /dev/null +++ b/test/mock-server.cpp @@ -0,0 +1,58 @@ +#include + +#include "core/server.hpp" + +using namespace cerb; + +namespace { + + struct ServerManager { + std::function const dtor; + + explicit ServerManager(std::function d) + : dtor(d) + {} + + ServerManager(ServerManager const&) = delete; + + ~ServerManager() + { + for (Server* s: allocated) { + dtor(s); + } + } + + std::set allocated; + std::map addr_map; + + template + Server* allocate(Constructor ctor) + { + Server* s = ctor(); + this->allocated.insert(s); + return s; + } + + template + Server* get(util::Address const& addr, Constructor ctor) + { + auto it = this->addr_map.find(addr); + if (it != this->addr_map.end()) { + return it->second; + } + return addr_map[addr] = allocate(ctor); + } + }; + +} + +Server* Server::get_server(util::Address addr, Proxy*) +{ + static ServerManager server_manager([](Server* s) { delete s; }); + Server* s = server_manager.get(addr, []() { return new Server; }); + s->addr = addr; + return s; +} + +void Server::on_events(int) {} +void Server::after_events(std::set&) {} diff --git a/test/mock-stats.cpp b/test/mock-stats.cpp new file mode 100644 index 0000000..a2bd606 --- /dev/null +++ b/test/mock-stats.cpp @@ -0,0 +1,19 @@ +#include "core/stats.hpp" + +using namespace cerb; + +std::string cerb::stats_all() +{ + return "$14\r\nMOCK STATISTIC\r\n"; +} + +BufferStatAllocator::pointer BufferStatAllocator::allocate( + size_type n, void const* hint) +{ + return BaseType::allocate(n, hint); +} + +void BufferStatAllocator::deallocate(pointer p, size_type n) +{ + BaseType::deallocate(p, n); +} diff --git a/test/slot_map.cpp b/test/slot_map.cpp index 33494eb..089e63d 100644 --- a/test/slot_map.cpp +++ b/test/slot_map.cpp @@ -1,6 +1,7 @@ #include #include "core/slot_map.hpp" +#include "core/server.hpp" TEST(SlotMap, ParseMap) { @@ -122,4 +123,269 @@ TEST(SlotMap, ParseMap) ASSERT_TRUE(nodes[2].slot_ranges.empty()); } + + { + std::vector nodes(cerb::parse_slot_map( + "69853562969c74ff387f9e491d025b2a86ac478f 192.168.1.100:7002 master - 0 0 3 connected 8192-12287\n" + "2f53d0fb4a59274e83e47b1dca02697384822ca5 192.168.1.100:7006 slave 69853562969c74ff387f9e491d025b2a86ac478f 0 0 3 connected\n" + "2560c867f9ca2ef4cc872eb85ce985373ad9e815 192.168.1.101:7003 master - 0 0 2 connected 0-4095\n" + "933970b4fd2d1ad06166ab1d893e8cac7b129ebd 192.168.1.101:7001 master - 0 0 4 connected 4096-8191\n" + "d3adf40539ad749d214609987563bf9903a57ffc 192.168.1.101:7007 slave 2560c867f9ca2ef4cc872eb85ce985373ad9e815 0 0 2 connected\n" + "6c001456aff0ae537ba242d4e86fb325c5babbea 192.168.1.100:7000 myself,master - 0 0 1 connected 12288-16383\n", + "127.0.0.1")); + ASSERT_EQ(6, nodes.size()); + ASSERT_EQ("192.168.1.100", nodes[0].addr.host); + ASSERT_EQ(7002, nodes[0].addr.port); + ASSERT_EQ("69853562969c74ff387f9e491d025b2a86ac478f", nodes[0].node_id); + ASSERT_TRUE(nodes[0].is_master()); + + ASSERT_EQ(1, nodes[0].slot_ranges.size()); + std::vector> slot_ranges( + nodes[0].slot_ranges.begin(), nodes[0].slot_ranges.end()); + ASSERT_EQ(8192, slot_ranges[0].first); + ASSERT_EQ(12287, slot_ranges[0].second); + + ASSERT_EQ("192.168.1.100", nodes[1].addr.host); + ASSERT_EQ(7006, nodes[1].addr.port); + ASSERT_EQ("2f53d0fb4a59274e83e47b1dca02697384822ca5", nodes[1].node_id); + ASSERT_FALSE(nodes[1].is_master()); + + ASSERT_TRUE(nodes[1].slot_ranges.empty()); + + ASSERT_EQ("192.168.1.101", nodes[2].addr.host); + ASSERT_EQ(7003, nodes[2].addr.port); + ASSERT_EQ("2560c867f9ca2ef4cc872eb85ce985373ad9e815", nodes[2].node_id); + ASSERT_TRUE(nodes[2].is_master()); + + ASSERT_EQ(1, nodes[2].slot_ranges.size()); + slot_ranges = std::vector>( + nodes[2].slot_ranges.begin(), nodes[2].slot_ranges.end()); + ASSERT_EQ(0, slot_ranges[0].first); + ASSERT_EQ(4095, slot_ranges[0].second); + + ASSERT_EQ("192.168.1.101", nodes[3].addr.host); + ASSERT_EQ(7001, nodes[3].addr.port); + ASSERT_EQ("933970b4fd2d1ad06166ab1d893e8cac7b129ebd", nodes[3].node_id); + ASSERT_TRUE(nodes[3].is_master()); + + ASSERT_EQ(1, nodes[3].slot_ranges.size()); + slot_ranges = std::vector>( + nodes[3].slot_ranges.begin(), nodes[3].slot_ranges.end()); + ASSERT_EQ(4096, slot_ranges[0].first); + ASSERT_EQ(8191, slot_ranges[0].second); + + ASSERT_EQ("192.168.1.101", nodes[4].addr.host); + ASSERT_EQ(7007, nodes[4].addr.port); + ASSERT_EQ("d3adf40539ad749d214609987563bf9903a57ffc", nodes[4].node_id); + ASSERT_FALSE(nodes[4].is_master()); + + ASSERT_TRUE(nodes[4].slot_ranges.empty()); + + ASSERT_EQ("192.168.1.100", nodes[5].addr.host); + ASSERT_EQ(7000, nodes[5].addr.port); + ASSERT_EQ("6c001456aff0ae537ba242d4e86fb325c5babbea", nodes[5].node_id); + ASSERT_TRUE(nodes[5].is_master()); + + ASSERT_EQ(1, nodes[5].slot_ranges.size()); + slot_ranges = std::vector>( + nodes[5].slot_ranges.begin(), nodes[5].slot_ranges.end()); + ASSERT_EQ(12288, slot_ranges[0].first); + ASSERT_EQ(16383, slot_ranges[0].second); + } +} + +TEST(SlotMap, ReplaceNodesAllMasters) +{ + cerb::SlotMap slot_map; + + std::set replaced = slot_map.replace_map(cerb::parse_slot_map( + "21952b372055dfdb5fa25b2761857831040472e1 127.0.0.1:7001 master - 0 1428573582310 1 connected 0-3883\n" + "29fa34bf473c742c91cee391a908a30eb4139292 127.0.0.1:7000 myself,master - 0 0 0 connected 3884-16383", + "127.0.0.1"), nullptr); + + ASSERT_TRUE(replaced.empty()); + + for (cerb::slot s = 0; s < 3883; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("127.0.0.1", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7001, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 3884; s < 16384; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("127.0.0.1", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7000, svr->addr.port) << " slot #" << s; + } + + std::set to_be_replaced; + to_be_replaced.insert(slot_map.get_by_slot(0)); + to_be_replaced.insert(slot_map.get_by_slot(3884)); + + replaced = slot_map.replace_map(cerb::parse_slot_map( + "69853562969c74ff387f9e491d025b2a86ac478f 192.168.1.100:7002 master - 0 0 3 connected 8192-12287\n" + "2f53d0fb4a59274e83e47b1dca02697384822ca5 192.168.1.100:7006 slave 69853562969c74ff387f9e491d025b2a86ac478f 0 0 3 connected\n" + "2560c867f9ca2ef4cc872eb85ce985373ad9e815 192.168.1.101:7003 master - 0 0 2 connected 0-4095\n" + "933970b4fd2d1ad06166ab1d893e8cac7b129ebd 192.168.1.101:7001 master - 0 0 4 connected 4096-8191\n" + "d3adf40539ad749d214609987563bf9903a57ffc 192.168.1.101:7007 slave 2560c867f9ca2ef4cc872eb85ce985373ad9e815 0 0 2 connected\n" + "6c001456aff0ae537ba242d4e86fb325c5babbea 192.168.1.100:7000 myself,master - 0 0 1 connected 12288-16383\n", + "127.0.0.1"), nullptr); + + ASSERT_EQ(to_be_replaced, replaced); + + for (cerb::slot s = 0; s < 4096; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.101", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7003, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 4096; s < 8192; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.101", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7001, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 8192; s < 12288; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7002, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 12288; s < 16384; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7000, svr->addr.port) << " slot #" << s; + } + + to_be_replaced.clear(); + to_be_replaced.insert(slot_map.get_by_slot(0)); + + replaced = slot_map.replace_map(cerb::parse_slot_map( + "69853562969c74ff387f9e491d025b2a86ac478f 192.168.1.100:7002 master - 0 0 3 connected 8192-12287\n" + "2f53d0fb4a59274e83e47b1dca02697384822ca5 192.168.1.100:7006 slave 69853562969c74ff387f9e491d025b2a86ac478f 0 0 3 connected\n" + "2560c867f9ca2ef4cc872eb85ce985373ad9e815 192.168.1.102:7000 master - 0 0 2 connected 0-3839\n" + "933970b4fd2d1ad06166ab1d893e8cac7b129ebd 192.168.1.101:7001 master - 0 0 4 connected 3840-8191\n" + "6c001456aff0ae537ba242d4e86fb325c5babbea 192.168.1.100:7000 myself,master - 0 0 1 connected 12288-16383\n", + "127.0.0.1"), nullptr); + + ASSERT_EQ(to_be_replaced, replaced); + + for (cerb::slot s = 0; s < 3840; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.102", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7000, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 3840; s < 8192; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.101", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7001, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 8192; s < 12288; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7002, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 12288; s < 16384; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7000, svr->addr.port) << " slot #" << s; + } +} + +TEST(SlotMap, ReplaceNodesAlsoSlave) +{ + cerb::SlotMap::select_slave_if_possible(); + cerb::SlotMap slot_map; + + std::set replaced = slot_map.replace_map(cerb::parse_slot_map( + "69853562969c74ff387f9e491d025b2a86ac478f 192.168.1.100:7002 master - 0 0 3 connected 8192-12287\n" + "2f53d0fb4a59274e83e47b1dca02697384822ca5 192.168.1.100:7006 slave 69853562969c74ff387f9e491d025b2a86ac478f 0 0 3 connected\n" + "2560c867f9ca2ef4cc872eb85ce985373ad9e815 192.168.1.101:7003 master - 0 0 2 connected 0-4095\n" + "933970b4fd2d1ad06166ab1d893e8cac7b129ebd 192.168.1.101:7001 master - 0 0 4 connected 4096-8191\n" + "d3adf40539ad749d214609987563bf9903a57ffc 192.168.1.101:7007 slave 2560c867f9ca2ef4cc872eb85ce985373ad9e815 0 0 2 connected\n" + "6c001456aff0ae537ba242d4e86fb325c5babbea 192.168.1.100:7000 myself,master - 0 0 1 connected 12288-16383\n", + "127.0.0.1"), nullptr); + + ASSERT_TRUE(replaced.empty()); + + for (cerb::slot s = 0; s < 4096; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.101", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7007, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 4096; s < 8192; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.101", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7001, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 8192; s < 12288; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7006, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 12288; s < 16384; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7000, svr->addr.port) << " slot #" << s; + } + + std::set to_be_replaced; + to_be_replaced.insert(slot_map.get_by_slot(0)); + + replaced = slot_map.replace_map(cerb::parse_slot_map( + "69853562969c74ff387f9e491d025b2a86ac478f 192.168.1.100:7002 master - 0 0 3 connected 8192-12287\n" + "2f53d0fb4a59274e83e47b1dca02697384822ca5 192.168.1.100:7006 slave 69853562969c74ff387f9e491d025b2a86ac478f 0 0 3 connected\n" + "2560c867f9ca2ef4cc872eb85ce985373ad9e815 192.168.1.101:7003 master - 0 0 2 connected 0-4095\n" + "933970b4fd2d1ad06166ab1d893e8cac7b129ebd 192.168.1.101:7001 master - 0 0 4 connected 4096-8191\n" + "6c001456aff0ae537ba242d4e86fb325c5babbea 192.168.1.100:7000 myself,master - 0 0 1 connected 12288-16383\n", + "127.0.0.1"), nullptr); + + ASSERT_EQ(to_be_replaced, replaced); + + for (cerb::slot s = 0; s < 4096; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.101", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7003, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 4096; s < 8192; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.101", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7001, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 8192; s < 12288; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7006, svr->addr.port) << " slot #" << s; + } + + for (cerb::slot s = 12288; s < 16384; ++s) { + cerb::Server* svr = slot_map.get_by_slot(s); + ASSERT_NE(nullptr, svr) << " slot #" << s; + ASSERT_EQ("192.168.1.100", svr->addr.host) << " slot #" << s; + ASSERT_EQ(7000, svr->addr.port) << " slot #" << s; + } }