Skip to content

Commit

Permalink
feat: support maxmemory-clients
Browse files Browse the repository at this point in the history
  • Loading branch information
AntiTopQuark committed Aug 31, 2024
1 parent fa3290c commit 67db430
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 2 deletions.
14 changes: 14 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ json-max-nesting-depth 1024
# Default: json
json-storage-format json

# In some scenarios client connections can hog up memory leading to OOM
# errors or data eviction. To avoid this we can cap the accumulated memory
# used by all client connections (all pubsub and normal clients). Once we
# reach that limit connections will be dropped by the server freeing up
# memory. The server will attempt to drop the connections using the most
# memory first. We call this mechanism "client eviction".
#
# Client eviction is configured using the maxmemory-clients setting as follows:
# 0 - client eviction is disabled (default)
#
# A memory value can be used for the client eviction threshold,
# for example:
# maxmemory-clients 1g

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ void FeedSlaveThread::loop() {
// first batch here to work around this issue instead of waiting for enough batch size.
bool is_first_repl_batch = true;
uint32_t yield_microseconds = 2 * 1000;
std::string batches_bulk;
std::string &batches_bulk = conn_->GetOutputBuffer();
batches_bulk.clear();
size_t updates_in_batches = 0;
while (!IsStopped()) {
auto curr_seq = next_repl_seq_.load();
Expand Down
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Config::Config() {
{"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 1024, 0, INT_MAX)},
{"json-storage-format", false,
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"maxmemory-clients", false, new IntWithUnitField<uint64_t>(&max_memory_clients, "0", 0, UINT64_MAX)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ struct Config {
int json_max_nesting_depth = 1024;
JsonStorageFormat json_storage_format = JsonStorageFormat::JSON;

uint64_t max_memory_clients = 0;

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down
36 changes: 35 additions & 1 deletion src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ Connection::Connection(bufferevent *bev, Worker *owner)
int64_t now = util::GetTimeStamp();
create_time_ = now;
last_interaction_ = now;
output_buffer_.clear();
}

Connection::~Connection() {
Expand Down Expand Up @@ -368,8 +369,9 @@ static bool IsCmdForIndexing(const CommandAttributes *attr) {
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
output_buffer_.clear();
const Config *config = srv_->GetConfig();
std::string reply;
std::string &reply = output_buffer_;
std::string password = config->requirepass;

while (!to_process_cmds->empty()) {
Expand Down Expand Up @@ -559,4 +561,36 @@ void Connection::ResetMultiExec() {
DisableFlag(Connection::kMultiExec);
}

size_t Connection::GetConnectionMemoryUsed() const {
size_t total_memory = sizeof(*this); // 包含所有成员变量的静态内存大小

total_memory += name_.capacity();
total_memory += ns_.capacity();
total_memory += ip_.capacity();
total_memory += announce_ip_.capacity();
total_memory += addr_.capacity();
total_memory += last_cmd_.capacity();
total_memory += output_buffer_.capacity();
total_memory += evbuffer_get_length(Output()) + evbuffer_get_length(Input());

for (const auto &channel : subscribe_channels_) {
total_memory += channel.capacity();
}
for (const auto &pattern : subscribe_patterns_) {
total_memory += pattern.capacity();
}
for (const auto &channel : subscribe_shard_channels_) {
total_memory += channel.capacity();
}
for (const auto &cmd : multi_cmds_) {
total_memory += cmd.capacity();
}

if (saved_current_command_) {
total_memory += saved_current_command_->GetMemoryUsage();
}

return total_memory;
}

} // namespace redis
5 changes: 5 additions & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class Connection : public EvbufCallbackBase<Connection> {
std::set<std::string> watched_keys;
std::atomic<bool> watched_keys_modified = false;

inline std::string &GetOutputBuffer() { return output_buffer_; }
size_t GetConnectionMemoryUsed() const;

private:
uint64_t id_ = 0;
std::atomic<int> flags_ = 0;
Expand Down Expand Up @@ -213,6 +216,8 @@ class Connection : public EvbufCallbackBase<Connection> {

bool importing_ = false;
RESP protocol_version_ = RESP::v2;
std::string output_buffer_;

};

} // namespace redis
44 changes: 44 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,11 @@ void Server::cron() {
cleanupExitedWorkerThreads(false);
}

// check if we need to evict connections every 10s
if (config_->max_memory_clients > 0 && counter != 0 && counter % 100 == 0) {
evictionClients();
}

CleanupExitedSlaves();
recordInstantaneousMetrics();
}
Expand Down Expand Up @@ -1136,6 +1141,7 @@ void Server::GetStatsInfo(std::string *info) {
string_stream << "sync_full:" << stats.fullsync_count << "\r\n";
string_stream << "sync_partial_ok:" << stats.psync_ok_count << "\r\n";
string_stream << "sync_partial_err:" << stats.psync_err_count << "\r\n";
string_stream << "evicted_clients:" << stats.stat_evictedclients << "\r\n";

auto db_stats = storage->GetDBStats();
string_stream << "keyspace_hits:" << db_stats->keyspace_hits << "\r\n";
Expand Down Expand Up @@ -2126,3 +2132,41 @@ AuthResult Server::AuthenticateUser(const std::string &user_password, std::strin
*ns = kDefaultNamespace;
return AuthResult::IS_ADMIN;
}

void Server::evictionClients() {
size_t mem = 0;
for (const auto &t : worker_threads_) {
mem += t->GetWorker()->GetConnectionsMemoryUsed();
}
if (mem < config_->max_memory_clients) {
return;
}

std::vector<redis::Connection *> conns;
for (const auto &t : worker_threads_) {
std::unique_lock<std::mutex> lock(t->GetWorker()->GetConnectionsMutex());
auto worker_conns = t->GetWorker()->GetConnections();
for (const auto &iter : worker_conns) {
conns.push_back(iter.second);
}
}

// sort Connections by memory used from high to low
std::sort(conns.begin(), conns.end(), [](const redis::Connection *a, const redis::Connection *b) {
size_t a_size = a ? a->GetConnectionMemoryUsed() : 0;
size_t b_size = b ? b->GetConnectionMemoryUsed() : 0;
return a_size < b_size;
});

while (mem > config_->max_memory_clients && conns.size() > 0) {
auto *conn = conns.back();
conns.pop_back();
if (conn == nullptr) {
continue;
} else {
mem -= conn->GetConnectionMemoryUsed();
stats.IncrEvictedClients();
conn->Close();
}
}
}
1 change: 1 addition & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ class Server {
void increaseWorkerThreads(size_t delta);
void decreaseWorkerThreads(size_t delta);
void cleanupExitedWorkerThreads(bool force);
void evictionClients();

std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
Expand Down
11 changes: 11 additions & 0 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ redis::Connection *Worker::removeConnection(int fd) {
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
conn = iter->second;
conn->GetOutputBuffer().clear();
conns_.erase(iter);
srv->DecrClientNum();
}
Expand Down Expand Up @@ -553,6 +554,16 @@ void Worker::KickoutIdleClients(int timeout) {
}
}

size_t Worker::GetConnectionsMemoryUsed() {
size_t mem = 0;
std::lock_guard<std::mutex> guard(conns_mu_);

for (auto &it : conns_) {
mem += it.second->GetConnectionMemoryUsed();
}
return mem;
}

void WorkerThread::Start() {
auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });

Expand Down
2 changes: 2 additions & 0 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {

lua_State *Lua() { return lua_; }
std::map<int, redis::Connection *> GetConnections() const { return conns_; }
std::mutex &GetConnectionsMutex() { return conns_mu_; }
size_t GetConnectionsMemoryUsed();
Server *srv;

private:
Expand Down
35 changes: 35 additions & 0 deletions tests/gocase/unit/limits/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,38 @@ func TestNetworkLimits(t *testing.T) {
require.Fail(t, "maxclients doesn't work refusing connections")
})
}


func TestMaxMemoryClientsLimits(t *testing.T) {
srv := util.StartServer(t, map[string]string{
"maxmemory-clients": "10m",
})
defer srv.Close()

t.Run("check if maxmemory-clients works well", func(t *testing.T) {
var clean []func()
defer func() {
for _, f := range clean {
f()
}
}()

ctx := context.Background()
rdbA := srv.NewClient()
defer rdbA.Close()
elem := strings.Repeat("a", 10240)
for i := 0; i < 1024; i++ {
require.NoError(t, rdbA.RPush(ctx, "test_max_memory_clients", elem).Err())
}

rdbB := srv.NewClient()
defer rdbB.Close()
require.NoError(t, rdbB.LRange(ctx, "test_max_memory_clients", 0, -1).Err())

time.Sleep(25 * time.Second)

r, err := getEvictedClients(rdbA, ctx)
require.NoError(t, err)
require.Equal(t, 1, r)
})
}

0 comments on commit 67db430

Please sign in to comment.