Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce reusable query buffer for client reads #337

Open
wants to merge 20 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
int postponeClientRead(client *c);
char *getClientSockname(client *c);
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
__thread sds thread_reusable_qb = NULL;
__thread int thread_reusable_qb_used = 0; /* Avoid multiple clients using reusable query
* buffer due to nested command execution. */

/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
Expand Down Expand Up @@ -144,7 +147,7 @@ client *createClient(connection *conn) {
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->querybuf = NULL;
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
Expand Down Expand Up @@ -1575,6 +1578,28 @@ void deauthenticateAndCloseClient(client *c) {
}
}

/* Resets the reusable query buffer used by the given client.
* If any data remained in the buffer, the client will take ownership of the buffer
* and a new empty buffer will be allocated for the reusable buffer. */
static void resetReusableQueryBuf(client *c) {
serverAssert(c->flags & CLIENT_REUSABLE_QUERYBUFFER);
if (c->querybuf != thread_reusable_qb || sdslen(c->querybuf) > c->qb_pos) {
/* If querybuf has been reallocated or there is still data left,
* let the client take ownership of the reusable buffer. */
thread_reusable_qb = NULL;
} else {
/* It is safe to dereference and reuse the reusable query buffer. */
c->querybuf = NULL;
c->qb_pos = 0;
sdsclear(thread_reusable_qb);
}

/* Mark that the client is no longer using the reusable query buffer
* and indicate that it is no longer used by any client. */
c->flags &= ~CLIENT_REUSABLE_QUERYBUFFER;
thread_reusable_qb_used = 0;
}

void freeClient(client *c) {
listNode *ln;

Expand Down Expand Up @@ -1629,6 +1654,8 @@ void freeClient(client *c) {
}

/* Free the query buffer */
if (c->flags & CLIENT_REUSABLE_QUERYBUFFER)
resetReusableQueryBuf(c);
sdsfree(c->querybuf);
c->querybuf = NULL;

Expand Down Expand Up @@ -2674,6 +2701,11 @@ void readQueryFromClient(connection *conn) {
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
/* For big argv, the client always uses its private query buffer.
* Using the reusable query buffer would eventually expand it beyond 32k,
* causing the client to take ownership of the reusable query buffer. */
if (!c->querybuf) c->querybuf = sdsempty();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment that we don't reuse the shared buffer here because we aim for the big arg optimization? or do you think it's clear form the context above?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with 3ebb1b3 (#337)


ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;

Expand All @@ -2685,6 +2717,26 @@ void readQueryFromClient(connection *conn) {
* but doesn't need align to the next arg, we can read more data. */
if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
readlen = PROTO_IOBUF_LEN;
} else if (c->querybuf == NULL) {
if (unlikely(thread_reusable_qb_used)) {
/* The reusable query buffer is already used by another client,
* switch to using the client's private query buffer. This only
* occurs when commands are executed nested via processEventsWhileBlocked(). */
c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(c->querybuf);
} else {
/* Create the reusable query buffer if it doesn't exist. */
if (!thread_reusable_qb) {
thread_reusable_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(thread_reusable_qb);
}

/* Assign the reusable query buffer to the client and mark it as in use. */
serverAssert(sdslen(thread_reusable_qb) == 0);
c->querybuf = thread_reusable_qb;
c->flags |= CLIENT_REUSABLE_QUERYBUFFER;
thread_reusable_qb_used = 1;
}
}

qblen = sdslen(c->querybuf);
Expand All @@ -2708,7 +2760,7 @@ void readQueryFromClient(connection *conn) {
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
goto done;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
Expand Down Expand Up @@ -2760,6 +2812,10 @@ void readQueryFromClient(connection *conn) {
c = NULL;

done:
if (c && (c->flags & CLIENT_REUSABLE_QUERYBUFFER)) {
serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */
resetReusableQueryBuf(c);
}
beforeNextClient(c);
}

Expand Down Expand Up @@ -2875,8 +2931,8 @@ sds catClientInfoString(sds s, client *client) {
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
" watch=%i", (int) listLength(client->watched_keys),
" qbuf=%U", (unsigned long long) sdslen(client->querybuf),
" qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf),
" qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0,
" qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0,
" argv-mem=%U", (unsigned long long) client->argv_len_sum,
" multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums,
" rbs=%U", (unsigned long long) client->buf_usable_size,
Expand Down Expand Up @@ -3856,9 +3912,10 @@ size_t getClientOutputBufferMemoryUsage(client *c) {
* the client output buffer memory usage portion of the total. */
size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
size_t mem = getClientOutputBufferMemoryUsage(c);

if (output_buffer_mem_usage != NULL)
*output_buffer_mem_usage = mem;
mem += sdsZmallocSize(c->querybuf);
mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
mem += zmalloc_size(c);
mem += c->buf_usable_size;
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
Expand Down
7 changes: 6 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/


#include "server.h"
#include "cluster.h"
#include "bio.h"
Expand Down Expand Up @@ -1765,6 +1767,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) {
* connection. */
server.master->flags |= CLIENT_MASTER;

/* Allocate a private query buffer for the master client instead of using the reusable query buffer.
* This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */
server.master->querybuf = sdsempty();
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
Expand Down
29 changes: 24 additions & 5 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/

#include "server.h"
Expand Down Expand Up @@ -734,6 +739,8 @@ long long getInstantaneousMetric(int metric) {
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
/* If the client query buffer is NULL, it is using the reusable query buffer and there is nothing to do. */
if (c->querybuf == NULL) return 0;
size_t querybuf_size = sdsalloc(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;

Expand All @@ -743,7 +750,18 @@ int clientsCronResizeQueryBuffer(client *c) {
/* There are two conditions to resize the query buffer: */
if (idletime > 2) {
/* 1) Query is idle for a long time. */
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
size_t remaining = sdslen(c->querybuf) - c->qb_pos;
if (!(c->flags & CLIENT_MASTER) && !remaining) {
/* If the client is not a master and no data is pending,
* The client can safely use the reusable query buffer in the next read - free the client's querybuf. */
sdsfree(c->querybuf);
/* By setting the querybuf to NULL, the client will use the reusable query buffer in the next read.
* We don't move the client to the reusable query buffer immediately, because if we allocated a private
* query buffer for the client, it's likely that the client will use it again soon. */
c->querybuf = NULL;
} else {
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
}
} else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) {
/* 2) Query buffer is too big for latest peak and is larger than
* resize threshold. Trim excess space but only up to a limit,
Expand All @@ -759,7 +777,7 @@ int clientsCronResizeQueryBuffer(client *c) {

/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = sdslen(c->querybuf);
c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0;
/* We reset to either the current used, or currently processed bulk size,
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2;
Expand Down Expand Up @@ -834,8 +852,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};

int clientsCronTrackExpansiveClients(client *c, int time_idx) {
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum +
(c->argv ? zmalloc_size(c->argv) : 0);
size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0;
size_t in_usage = qb_size + c->argv_len_sum + argv_size;
size_t out_usage = getClientOutputBufferMemoryUsage(c);

/* Track the biggest values observed so far in this slot. */
Expand Down Expand Up @@ -6567,7 +6586,7 @@ void dismissMemory(void* ptr, size_t size_hint) {
void dismissClientMemory(client *c) {
/* Dismiss client query buffer and static reply buffer. */
dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
if (c->querybuf) dismissSds(c->querybuf);
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_REUSABLE_QUERYBUFFER (1ULL<<51) /* The client is using the reusable query buffer. */

/* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */
#define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE)
Expand Down
85 changes: 81 additions & 4 deletions tests/unit/querybuf.tcl
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
proc client_idle_sec {name} {
set clients [split [r client list] "\r\n"]
set c [lsearch -inline $clients *name=$name*]
Expand All @@ -24,16 +36,45 @@ start_server {tags {"querybuf slow"}} {
# The test will run at least 2s to check if client query
# buffer will be resized when client idle 2s.
test "query buffer resized correctly" {
set rd [redis_client]

set rd [redis_deferring_client]

$rd client setname test_client
$rd read

# Make sure query buff has size of 0 bytes at start as the client uses the reusable qb.
assert {[client_query_buffer test_client] == 0}

# Pause cron to prevent premature shrinking (timing issue).
r debug pause-cron 1

# Send partial command to client to make sure it doesn't use the reusable qb.
$rd write "*3\r\n\$3\r\nset\r\n\$2\r\na"
$rd flush
# Wait for the client to start using a private query buffer.
wait_for_condition 1000 10 {
[client_query_buffer test_client] > 0
} else {
fail "client should start using a private query buffer"
}

# send the rest of the command
$rd write "a\r\n\$1\r\nb\r\n"
$rd flush
assert_equal {OK} [$rd read]

set orig_test_client_qbuf [client_query_buffer test_client]
# Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k
# but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k
assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf < 32768}
set MAX_QUERY_BUFFER_SIZE [expr 32768 + 2] ; # 32k + 2, allowing for potential greedy allocation of (16k + 1) * 2 bytes for the query buffer.
assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf <= $MAX_QUERY_BUFFER_SIZE}

# Allow shrinking to occur
r debug pause-cron 0

# Check that the initial query buffer is resized after 2 sec
wait_for_condition 1000 10 {
[client_idle_sec test_client] >= 3 && [client_query_buffer test_client] == 0
[client_idle_sec test_client] >= 3 && [client_query_buffer test_client] < $orig_test_client_qbuf
} else {
fail "query buffer was not resized"
}
Expand Down Expand Up @@ -75,10 +116,27 @@ start_server {tags {"querybuf slow"}} {
test "query buffer resized correctly with fat argv" {
set rd [redis_client]
$rd client setname test_client

# Pause cron to prevent premature shrinking (timing issue).
r debug pause-cron 1

$rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n"
$rd flush

# Wait for the client to start using a private query buffer of > 1000000 size.
wait_for_condition 1000 10 {
[client_query_buffer test_client] > 1000000
} else {
fail "client should start using a private query buffer"
}

after 20
# Send the start of the arg and make sure the client is not using reusable qb for it rather a private buf of > 1000000 size.
$rd write "a"
$rd flush

r debug pause-cron 0

after 120
if {[client_query_buffer test_client] < 1000000} {
fail "query buffer should not be resized when client idle time smaller than 2s"
}
Expand All @@ -92,5 +150,24 @@ start_server {tags {"querybuf slow"}} {

$rd close
}
}

start_server {tags {"querybuf"}} {
test "Client executes small argv commands using reusable query buffer" {
set rd [redis_deferring_client]
$rd client setname test_client
$rd read
set res [r client list]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have no guarantee that the previous command (setname) was run (missing rd read)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing rd read, fixed it.


# Verify that the client does not create a private query buffer after
# executing a small parameter command.
assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res

# The client executing the command is currently using the reusable query buffer,
# so the size shown is that of the reusable query buffer. It will be returned
# to the reusable query buffer after command execution.
assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res

$rd close
}
}
Loading