Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More stats / reset client on large request #1

Merged
merged 6 commits into from
May 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ all:main.d core_objs utilities libs_3rdparty

runtest:core_objs utilities libs_3rdparty
rm -f tmp.*.txt
make -f test/Makefile MODE=$(MODE) COMPILER=$(COMPILER) \
CHECK_MEM=$(CHECK_MEM)
@make -f test/Makefile MODE=$(MODE) COMPILER=$(COMPILER) \
CHECK_MEM=$(CHECK_MEM)

utilities:
make -f utils/Makefile MODE=$(MODE) COMPILER=$(COMPILER)
@make -f utils/Makefile MODE=$(MODE) COMPILER=$(COMPILER)

core_objs:
make -f core/Makefile MODE=$(MODE) COMPILER=$(COMPILER)
@make -f core/Makefile MODE=$(MODE) COMPILER=$(COMPILER)

libs_3rdparty:
mkdir -p $(LIBS_DIR)
make -f backtracpp/Makefile LIB_DIR=$(LIBS_DIR) REL_PATH=backtracpp
@mkdir -p $(LIBS_DIR)
@make -f backtracpp/Makefile LIB_DIR=$(LIBS_DIR) REL_PATH=backtracpp

clean:
rm -f tmp.*
Expand Down
13 changes: 1 addition & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ The argument is path of a configuration file, which should contains at least
* bind : (integer) local port to listen
* node : (address) one of active node in a cluster; format should be host:port
* thread: (integer) number of threads
* read-slave: (optional, default off) set to "1" to turn on read slave mode. A proxy in read-slave mode won't support writing commands like `SET`, `INCR`, `PUBLISH`, and it would select slave nodes for reading commands if possible.
* read-slave: (optional, default off) set to "1" to turn on read slave mode. A proxy in read-slave mode won't support writing commands like `SET`, `INCR`, `PUBLISH`, and it would select slave nodes for reading commands if possible. For more information please read [here (CN)](https://github.com/HunanTV/redis-cerberus/wiki/%E8%AF%BB%E5%86%99%E5%88%86%E7%A6%BB).

Commands in Particular
===
Expand Down Expand Up @@ -83,14 +83,3 @@ others: `PFADD`, `PFCOUNT`, `PFMERGE`,
`ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SLOWLOG`, `SYNC`, `TIME`,

For more information please read [here (CN)](https://github.com/HunanTV/redis-cerberus/wiki/Redis-%E9%9B%86%E7%BE%A4%E4%BB%A3%E7%90%86%E5%9F%BA%E6%9C%AC%E5%8E%9F%E7%90%86%E4%B8%8E%E4%BD%BF%E7%94%A8).

Readonly Commands
---

Only the following commands are allowed when a proxy runs with "read-slave" mode

DUMP EXISTS TTL PTTL TYPE GET BITCOUNT GETBIT GETRANGE STRLEN
HGET HGETALL HKEYS HVALS HLEN HEXISTS HMGET HSCAN
LINDEX LLEN LRANGE SCARD SISMEMBER SMEMBERS SSCAN
ZCARD ZSCAN ZCOUNT ZLEXCOUNT ZRANGE ZRANGEBYLEX ZREVRANGEBYLEX
ZRANGEBYSCORE ZRANK ZREVRANGE ZREVRANGEBYSCORE ZREVRANK ZSCORE
4 changes: 3 additions & 1 deletion common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include "utils/typetraits.hpp"

#define VERSION "0.6.0-2015-04-14"
#define VERSION "0.6.4-2015-04-30"

namespace cerb {

Expand All @@ -18,6 +18,8 @@ namespace cerb {
typedef Clock::time_point Time;
typedef std::chrono::duration<double> Interval;

int const CLUSTER_SLOT_COUNT = 16384;

}

#endif /* __CERBERUS_COMMON_HPP__ */
12 changes: 5 additions & 7 deletions core/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ void Client::on_events(int events)
}
} catch (BadRedisMessage& e) {
LOG(ERROR) << "Receive bad message from client " << this->fd
<< " because: " << e.what()
<< " dump buffer (before close): "
<< " because: " << e.what();
LOG(DEBUG) << "Dump buffer (before close): "
<< this->_buffer.to_string();
return this->close();
}
Expand Down Expand Up @@ -66,6 +66,9 @@ void Client::_write_response()
g->append_buffer_to(buffer_arr);
}
Buffer::writev(this->fd, buffer_arr);
for (auto const& g: this->_ready_groups) {
g->collect_stats(this->_proxy);
}
this->_ready_groups.clear();
this->_peers.clear();

Expand Down Expand Up @@ -169,11 +172,6 @@ void Client::add_peer(Server* svr)
this->_peers.insert(svr);
}

void Client::stat_proccessed(Interval cmd_elapse)
{
_proxy->stat_proccessed(cmd_elapse);
}

void Client::push_command(util::sptr<CommandGroup> g)
{
this->_parsed_groups.push_back(std::move(g));
Expand Down
1 change: 0 additions & 1 deletion core/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ namespace cerb {
void add_peer(Server* svr);
void reactivate(util::sref<Command> cmd);
void push_command(util::sptr<CommandGroup> g);
void stat_proccessed(Interval cmd_elapse);
};

}
Expand Down
99 changes: 64 additions & 35 deletions core/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "globals.hpp"
#include "utils/logging.hpp"
#include "utils/random.hpp"
#include "utils/string.h"

using namespace cerb;

Expand All @@ -21,7 +22,7 @@ namespace {
std::string const RSP_OK_STR("+OK\r\n");
Buffer RSP_OK(Buffer::from_string(RSP_OK_STR));

Server* select_server_for(Proxy* proxy, Command* cmd, slot key_slot)
Server* select_server_for(Proxy* proxy, DataCommand* cmd, slot key_slot)
{
Server* svr = proxy->get_server_by_slot(key_slot);
if (svr == nullptr) {
Expand All @@ -34,12 +35,12 @@ namespace {
}

class OneSlotCommand
: public Command
: public DataCommand
{
slot const key_slot;
public:
OneSlotCommand(Buffer b, util::sref<CommandGroup> g, slot ks)
: Command(std::move(b), g, true)
: DataCommand(std::move(b), g)
, key_slot(ks)
{
LOG(DEBUG) << "-Keyslot = " << this->key_slot;
Expand All @@ -52,15 +53,15 @@ namespace {
};

class MultiStepsCommand
: public Command
: public DataCommand
{
public:
slot current_key_slot;
std::function<void(Buffer, bool)> on_rsp;

MultiStepsCommand(util::sref<CommandGroup> group, slot s,
std::function<void(Buffer, bool)> r)
: Command(group, true)
: DataCommand(group)
, current_key_slot(s)
, on_rsp(std::move(r))
{}
Expand All @@ -70,7 +71,7 @@ namespace {
return ::select_server_for(proxy, this, this->current_key_slot);
}

void copy_response(Buffer rsp, bool error)
void on_remote_responsed(Buffer rsp, bool error)
{
on_rsp(std::move(rsp), error);
}
Expand All @@ -84,7 +85,7 @@ namespace {
{
public:
DirectCommand(Buffer b, util::sref<CommandGroup> g)
: Command(std::move(b), g, false)
: Command(std::move(b), g)
{}

Server* select_server(Proxy*)
Expand Down Expand Up @@ -131,13 +132,6 @@ namespace {
class StatsCommandGroup
: public CommandGroup
{
public:
~StatsCommandGroup()
{
if (this->complete) {
this->client->stat_proccessed(Clock::now() - this->creation);
}
}
protected:
explicit StatsCommandGroup(util::sref<Client> cli)
: CommandGroup(cli)
Expand All @@ -152,13 +146,21 @@ namespace {
{
return true;
}

void collect_stats(Proxy* p) const
{
p->stat_proccessed(Clock::now() - this->creation,
this->avg_commands_remote_cost());
}

virtual Interval avg_commands_remote_cost() const = 0;
};

class SingleCommandGroup
: public StatsCommandGroup
{
public:
util::sptr<Command> command;
util::sptr<DataCommand> command;

explicit SingleCommandGroup(util::sref<Client> cli)
: StatsCommandGroup(cli)
Expand Down Expand Up @@ -190,26 +192,29 @@ namespace {
{
command->select_server(proxy);
}

Interval avg_commands_remote_cost() const
{
return command->remote_cost();
}
};

class MultipleCommandsGroup
: public StatsCommandGroup
{
public:
Buffer arr_payload;
std::vector<util::sptr<Command>> commands;
std::vector<util::sptr<DataCommand>> commands;
int awaiting_count;

explicit MultipleCommandsGroup(util::sref<Client> c)
: StatsCommandGroup(c)
, awaiting_count(0)
{}

void append_command(util::sptr<Command> c)
void append_command(util::sptr<DataCommand> c)
{
if (c->need_send) {
awaiting_count += 1;
}
awaiting_count += 1;
commands.push_back(std::move(c));
}

Expand All @@ -229,19 +234,17 @@ namespace {
void append_buffer_to(std::vector<util::sref<Buffer>>& b)
{
b.push_back(util::mkref(arr_payload));
for (auto const& c: commands) {
for (auto const& c: this->commands) {
b.push_back(util::mkref(c->buffer));
}
}

int total_buffer_size() const
{
int i = arr_payload.size();
std::for_each(commands.begin(), commands.end(),
[&](util::sptr<Command> const& command)
{
i += command->buffer.size();
});
for (auto const& c: this->commands) {
i += c->buffer.size();
}
return i;
}

Expand All @@ -251,6 +254,19 @@ namespace {
c->select_server(proxy);
}
}

Interval avg_commands_remote_cost() const
{
if (this->commands.empty()) {
return Interval(0);
}
return std::accumulate(
this->commands.begin(), this->commands.end(), Interval(0),
[](Interval a, util::sptr<DataCommand> const& c)
{
return a + c->remote_cost();
}) / this->commands.size();
}
};

class LongCommandGroup
Expand Down Expand Up @@ -323,7 +339,7 @@ namespace {
virtual util::sptr<CommandGroup> spawn_commands(
util::sref<Client> c, Buffer::iterator end) = 0;

SpecialCommandParser() {}
SpecialCommandParser() = default;
SpecialCommandParser(SpecialCommandParser const&) = delete;
};

Expand Down Expand Up @@ -568,25 +584,25 @@ namespace {
{
if (error) {
this->buffer = std::move(rsp);
return this->group->command_responsed();
return this->responsed();
}
if (rsp.same_as_string("$-1\r\n")) {
this->buffer = Buffer::from_string(
"-ERR no such key\r\n");
return this->group->command_responsed();
return this->responsed();
}
this->buffer = Buffer::from_string("*3\r\n$3\r\nSET\r\n");
this->buffer.append_from(new_key.begin(), new_key.end());
this->buffer.append_from(rsp.begin(), rsp.end());
this->current_key_slot = new_key_slot;
this->on_rsp =
[&](Buffer rsp, bool error)
[this](Buffer rsp, bool error)
{
if (error) {
this->buffer = std::move(rsp);
return this->group->command_responsed();
return this->responsed();
}
rsp_set();
this->rsp_set();
};
this->group->client->reactivate(util::mkref(*this));
}
Expand All @@ -597,10 +613,10 @@ namespace {
this->buffer.append_from(old_key.begin(), old_key.end());
this->current_key_slot = old_key_slot;
this->on_rsp =
[&](Buffer, bool)
[this](Buffer, bool)
{
this->buffer = Buffer::from_string("+OK\r\n");
this->group->command_responsed();
this->responsed();
};
this->group->client->reactivate(util::mkref(*this));
}
Expand Down Expand Up @@ -995,6 +1011,14 @@ namespace {

void on_arr(cerb::rint size, Buffer::iterator i)
{
/*
* Redis server will reset a request of more than 1M args.
* See also
* https://github.com/antirez/redis/blob/3.0/src/networking.c#L1014
*/
if (size > 1024 * 1024) {
throw BadRedisMessage("Request is too large");
}
if (!_nested_array_element_count.empty()) {
throw BadRedisMessage("Invalid nested array as client command");
}
Expand All @@ -1009,9 +1033,14 @@ namespace {

}

void Command::copy_response(Buffer rsp, bool)
void Command::on_remote_responsed(Buffer rsp, bool)
{
this->buffer = std::move(rsp);
this->responsed();
}

void Command::responsed()
{
this->group->command_responsed();
}

Expand Down
Loading