Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support maxmemory-clients config #2513

Closed
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
67db430
feat: support maxmemory-clients
AntiTopQuark Aug 29, 2024
03e44c4
update
AntiTopQuark Aug 31, 2024
1e5aba7
update
AntiTopQuark Aug 31, 2024
d388a23
update
AntiTopQuark Sep 2, 2024
5649f62
ci: add Debian environment to CI workflow (#2510)
c8ef Aug 30, 2024
bc4a195
feat(config): add `txn_context_enabled` to allow to enable the transa…
PokIsemaine Aug 31, 2024
8b41ab6
fix(test): remove meaningless `list-max-ziplist-size` (#2517)
PokIsemaine Sep 2, 2024
b2665a4
fix(replication): potential deadlock when switching master frequentl…
git-hulk Sep 2, 2024
75f6177
fix(typo): `filed -> field` (#2519)
AntiTopQuark Sep 2, 2024
a3bf6eb
feat: support maxmemory-clients
AntiTopQuark Aug 29, 2024
fac72fb
update
AntiTopQuark Sep 2, 2024
eb96c7b
Merge branch 'unstable' into limit_maxmemory_clients
AntiTopQuark Sep 3, 2024
2781747
fix data race error
AntiTopQuark Sep 5, 2024
a2c264a
Merge branch 'unstable' into limit_maxmemory_clients
AntiTopQuark Sep 5, 2024
ed6b30d
update unit case
AntiTopQuark Sep 5, 2024
63378b2
update
AntiTopQuark Sep 7, 2024
e711754
update
AntiTopQuark Sep 7, 2024
5c48461
Merge branch 'unstable' into limit_maxmemory_clients
AntiTopQuark Sep 7, 2024
4f19056
update
AntiTopQuark Sep 7, 2024
7aa400e
update
AntiTopQuark Sep 7, 2024
3feb149
u
AntiTopQuark Sep 8, 2024
10dffe1
update code style
AntiTopQuark Sep 8, 2024
90e79d9
update code style
AntiTopQuark Sep 8, 2024
93dc21a
update code style
AntiTopQuark Sep 8, 2024
5bfd3ce
Merge branch 'unstable' into limit_maxmemory_clients
AntiTopQuark Sep 8, 2024
104f354
Merge branch 'unstable' into limit_maxmemory_clients
AntiTopQuark Sep 8, 2024
40454db
Merge branch 'unstable' into limit_maxmemory_clients
AntiTopQuark Sep 12, 2024
0aa864d
Merge branch 'unstable' into limit_maxmemory_clients
AntiTopQuark Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,20 @@ json-storage-format json
# Default: no
txn-context-enabled no

# In some scenarios client connections can hog up memory leading to OOM
# errors or data eviction. To avoid this we can cap the accumulated memory
# used by all client connections (all pubsub and normal clients). Once we
# reach that limit connections will be dropped by the server freeing up
# memory. The server will attempt to drop the connections using the most
# memory first. We call this mechanism "client eviction".
#
# Client eviction is configured using the maxmemory-clients setting as follows:
# 0 - client eviction is disabled (default)
#
# A memory value can be used for the client eviction threshold,
# for example:
# maxmemory-clients 1g

################################## TLS ###################################

# By default, TLS/SSL is disabled, i.e. `tls-port` is set to 0.
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ void FeedSlaveThread::loop() {
// first batch here to work around this issue instead of waiting for enough batch size.
bool is_first_repl_batch = true;
uint32_t yield_microseconds = 2 * 1000;
std::string batches_bulk;
std::string &batches_bulk = conn_->GetSlaveOutputBuffer();
batches_bulk.clear();
size_t updates_in_batches = 0;
while (!IsStopped()) {
auto curr_seq = next_repl_seq_.load();
Expand Down
7 changes: 7 additions & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ class Commander {
}

virtual ~Commander() = default;
size_t GetMemoryUsage() const {
size_t total_memory = sizeof(*this);
for (const auto &arg : args_) {
total_memory += arg.capacity();
}
return total_memory;
}

protected:
std::vector<std::string> args_;
Expand Down
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ Config::Config() {
{"json-storage-format", false,
new EnumField<JsonStorageFormat>(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)},
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"maxmemory-clients", false, new IntWithUnitField<uint64_t>(&max_memory_clients, "0", 0, UINT64_MAX)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ struct Config {
// Enable transactional mode in engine::Context
bool txn_context_enabled = false;

uint64_t max_memory_clients = 0;

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down
38 changes: 37 additions & 1 deletion src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Connection::Connection(bufferevent *bev, Worker *owner)
int64_t now = util::GetTimeStamp();
create_time_ = now;
last_interaction_ = now;
output_buffer_.clear();
slave_output_buffer_.clear();
}

Connection::~Connection() {
Expand Down Expand Up @@ -368,8 +370,9 @@ static bool IsCmdForIndexing(const CommandAttributes *attr) {
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
GetOutputBuffer().clear();
const Config *config = srv_->GetConfig();
std::string reply;
std::string &reply = GetOutputBuffer();
std::string password = config->requirepass;

while (!to_process_cmds->empty()) {
Expand Down Expand Up @@ -559,4 +562,37 @@ void Connection::ResetMultiExec() {
DisableFlag(Connection::kMultiExec);
}

size_t Connection::GetConnectionMemoryUsed() const {
size_t total_memory = sizeof(*this); // 包含所有成员变量的静态内存大小
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please dont include chinese comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


total_memory += name_.capacity();
total_memory += ns_.capacity();
total_memory += ip_.capacity();
total_memory += announce_ip_.capacity();
total_memory += addr_.capacity();
total_memory += last_cmd_.capacity();
total_memory += output_buffer_.capacity();
total_memory += slave_output_buffer_.capacity();
total_memory += evbuffer_get_length(Output()) + evbuffer_get_length(Input());

for (const auto &channel : subscribe_channels_) {
total_memory += channel.capacity();
}
for (const auto &pattern : subscribe_patterns_) {
total_memory += pattern.capacity();
}
for (const auto &channel : subscribe_shard_channels_) {
total_memory += channel.capacity();
}
for (const auto &cmd : multi_cmds_) {
total_memory += cmd.capacity();
}

if (saved_current_command_) {
total_memory += saved_current_command_->GetMemoryUsage();
}
Comment on lines +568 to +593
Copy link
Member

@PragmaTwice PragmaTwice Sep 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code seems hard to maintain and also not like a precise estimation of memory usage.

I'm wondering if it's really necessary to limit the connection memory usage..


return total_memory;
}

} // namespace redis
8 changes: 8 additions & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ class Connection : public EvbufCallbackBase<Connection> {
void SetOwner(Worker *new_owner) { owner_ = new_owner; };
int GetFD() { return bufferevent_getfd(bev_); }
evbuffer *Input() { return bufferevent_get_input(bev_); }
evbuffer *Input() const { return bufferevent_get_input(bev_); }
evbuffer *Output() { return bufferevent_get_output(bev_); }
evbuffer *Output() const { return bufferevent_get_output(bev_); }
bufferevent *GetBufferEvent() { return bev_; }
void ExecuteCommands(std::deque<CommandTokens> *to_process_cmds);
Status ExecuteCommand(const std::string &cmd_name, const std::vector<std::string> &cmd_tokens, Commander *current_cmd,
Expand All @@ -180,6 +182,10 @@ class Connection : public EvbufCallbackBase<Connection> {
std::set<std::string> watched_keys;
std::atomic<bool> watched_keys_modified = false;

inline std::string &GetOutputBuffer() { return output_buffer_; }
inline std::string &GetSlaveOutputBuffer() { return slave_output_buffer_; }
size_t GetConnectionMemoryUsed() const;

private:
uint64_t id_ = 0;
std::atomic<int> flags_ = 0;
Expand Down Expand Up @@ -213,6 +219,8 @@ class Connection : public EvbufCallbackBase<Connection> {

bool importing_ = false;
RESP protocol_version_ = RESP::v2;
std::string output_buffer_;
std::string slave_output_buffer_;
};

} // namespace redis
1 change: 1 addition & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,7 @@ void Server::GetStatsInfo(std::string *info) {
string_stream << "sync_full:" << stats.fullsync_count << "\r\n";
string_stream << "sync_partial_ok:" << stats.psync_ok_count << "\r\n";
string_stream << "sync_partial_err:" << stats.psync_err_count << "\r\n";
string_stream << "evicted_clients:" << stats.stat_evictedclients << "\r\n";

auto db_stats = storage->GetDBStats();
string_stream << "keyspace_hits:" << db_stats->keyspace_hits << "\r\n";
Expand Down
49 changes: 46 additions & 3 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new())
if (!base_) throw std::runtime_error{"event base failed to be created"};

timer_.reset(NewEvent(base_, -1, EV_PERSIST));
timeval tm = {10, 0};
timeval tm = {1, 0};
evtimer_add(timer_.get(), &tm);

uint32_t ports[3] = {config->port, config->tls_port, 0};
Expand Down Expand Up @@ -102,8 +102,13 @@ Worker::~Worker() {

void Worker::TimerCB(int, [[maybe_unused]] int16_t events) {
auto config = srv->GetConfig();
if (config->timeout == 0) return;
KickoutIdleClients(config->timeout);
if (config->timeout != 0) {
KickoutIdleClients(config->timeout);
}

if (config->max_memory_clients > 0 && config->workers > 0) {
evictionClients(config->max_memory_clients / config->workers);
}
}

void Worker::newTCPConnection(evconnlistener *listener, evutil_socket_t fd, [[maybe_unused]] sockaddr *address,
Expand Down Expand Up @@ -339,6 +344,7 @@ redis::Connection *Worker::removeConnection(int fd) {
auto iter = conns_.find(fd);
if (iter != conns_.end()) {
conn = iter->second;
conn->GetOutputBuffer().clear();
conns_.erase(iter);
srv->DecrClientNum();
}
Expand Down Expand Up @@ -553,6 +559,43 @@ void Worker::KickoutIdleClients(int timeout) {
}
}

size_t Worker::GetConnectionsMemoryUsed() {
size_t mem = 0;
std::lock_guard<std::mutex> guard(conns_mu_);

for (auto &it : conns_) {
mem += it.second->GetConnectionMemoryUsed();
}
return mem;
}

void Worker::evictionClients(size_t max_memory) {
size_t mem = GetConnectionsMemoryUsed();
if (mem < max_memory) {
return;
}
using ConnWithMem = std::tuple<int, uint64_t, uint64_t>;
std::vector<ConnWithMem> conns;
{
std::lock_guard<std::mutex> guard(conns_mu_);
for (auto &iter : conns_) {
conns.emplace_back(iter.first, iter.second->GetID(), iter.second->GetConnectionMemoryUsed());
}

// sort Connections by memory used from high to low
std::sort(conns.begin(), conns.end(),
[](const ConnWithMem &a, const ConnWithMem &b) { return std::get<2>(a) < std::get<2>(b); });
}

while (mem > max_memory && conns.size() > 0) {
auto conn = conns.back();
conns.pop_back();
mem -= std::get<2>(conn);
srv->stats.IncrEvictedClients();
FreeConnectionByID(std::get<0>(conn), std::get<1>(conn));
}
}

void WorkerThread::Start() {
auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });

Expand Down
3 changes: 3 additions & 0 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,16 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {

lua_State *Lua() { return lua_; }
std::map<int, redis::Connection *> GetConnections() const { return conns_; }
std::mutex &GetConnectionsMutex() { return conns_mu_; }
size_t GetConnectionsMemoryUsed();
Server *srv;

private:
Status listenTCP(const std::string &host, uint32_t port, int backlog);
void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
redis::Connection *removeConnection(int fd);
void evictionClients(size_t max_memory);

event_base *base_;
UniqueEvent timer_;
Expand Down
3 changes: 3 additions & 0 deletions src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class Stats {
std::atomic<uint64_t> psync_err_count = {0};
std::atomic<uint64_t> psync_ok_count = {0};
std::map<std::string, CommandStat> commands_stats;
std::atomic<uint64_t> stat_evictedclients = {0};

Stats();
void IncrCalls(const std::string &command_name);
Expand All @@ -77,6 +78,8 @@ class Stats {
void IncrFullSyncCount() { fullsync_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncErrCount() { psync_err_count.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncOKCount() { psync_ok_count.fetch_add(1, std::memory_order_relaxed); }
void IncrEvictedClients() { stat_evictedclients.fetch_add(1, std::memory_order_relaxed); }

static int64_t GetMemoryRSS();
void TrackInstantaneousMetric(int metric, uint64_t current_reading);
uint64_t GetInstantaneousMetric(int metric) const;
Expand Down
53 changes: 53 additions & 0 deletions tests/gocase/unit/limits/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ package limits

import (
"context"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/apache/kvrocks/tests/gocase/util"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -94,3 +97,53 @@ func TestWriteBatchLimit(t *testing.T) {
require.NoError(t, rdb.ZAdd(ctx, key, memberScores...).Err())
})
}

func getEvictedClients(rdb *redis.Client, ctx context.Context) (int, error) {
info, err := rdb.Info(ctx, "stats").Result()
if err != nil {
return 0, err
}

lines := strings.Split(info, "\n")
for _, line := range lines {
if strings.HasPrefix(line, "evicted_clients:") {
parts := strings.Split(line, ":")
if len(parts) == 2 {
return strconv.Atoi(strings.TrimSpace(parts[1]))
}
}
}
return 0, fmt.Errorf("evicted_clients not found")
}

func TestMaxMemoryClientsLimits(t *testing.T) {
srv := util.StartServer(t, map[string]string{})
defer srv.Close()

t.Run("check if maxmemory-clients works well", func(t *testing.T) {
var clean []func()
defer func() {
for _, f := range clean {
f()
}
}()

ctx := context.Background()
rdb := srv.NewClient()
defer rdb.Close()

elem := strings.Repeat("a", 10240)
for i := 0; i < 1024; i++ {
require.NoError(t, rdb.RPush(ctx, "test_max_memory_clients", elem).Err())
}

require.NoError(t, rdb.ConfigSet(ctx, "maxmemory-clients", "10M").Err())
require.NoError(t, rdb.LRange(ctx, "test_max_memory_clients", 0, -1).Err())
time.Sleep(25 * time.Second)

require.NoError(t, rdb.ConfigSet(ctx, "maxmemory-clients", "0").Err())
r, err := getEvictedClients(rdb, ctx)
require.NoError(t, err)
require.Equal(t, 1, r)
})
}