Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce reusable query buffer for client reads #337

Open
wants to merge 20 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
int postponeClientRead(client *c);
char *getClientSockname(client *c);
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */
__thread sds thread_shared_qb = NULL;
__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using shared query
* buffer due to nested command execution. */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only now (when looking at the code for the first time) i realize the buffer isn't shared. it's re-usable.
i.e. not serving multiple clients at the same time.

that said, i suppose we won't want to rename it...

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's more of a public query buffer.
do you mean it should be thread_querybuffer or thread_querybuffer_reusable?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my eyes it's a bad term for this purpose. i'd think that a shared buffer is one that used by multiple entities at the same time. i'd just replace the term "shared" with "reusable", in both variable names and comments.
but considering we might wanna cherry pick later fixes from valkey, this rename might be unproductive.
so i'd be ok with keeping the name and just editing the comment that describes it.
your call.


/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
Expand Down Expand Up @@ -144,7 +147,7 @@ client *createClient(connection *conn) {
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->querybuf = NULL;
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
Expand Down Expand Up @@ -1575,6 +1578,28 @@ void deauthenticateAndCloseClient(client *c) {
}
}

/* Resets the shared query buffer used by the given client.
* If any data remained in the buffer, the client will take ownership of the buffer
* and a new empty buffer will be allocated for the shared buffer. */
static void resetSharedQueryBuf(client *c) {
serverAssert(c->flags & CLIENT_SHARED_QUERYBUFFER);
if (c->querybuf != thread_shared_qb || sdslen(c->querybuf) > c->qb_pos) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c->querybuf != thread_shared_qb how is this possible?
and if it is, do we really want to do thread_shared_qb = NULL?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c->querybuf != thread_shared_qb how is this possible?

c->querybuf may be expanded in processMultibulkBuffer().
https://github.com/redis/redis/blob/60f22ca830c59a630b4156b112f5e73ce75adc64/src/networking.c#L2401

and if it is, do we really want to do thread_shared_qb = NULL?

this means that c->querybuf has acquired ownership, the old pointer to thread_shared_qb is invalid, and we need to reset it so that it can be created again when it is used again.

/* If querybuf has been reallocated or there is still data left,
* let the client take ownership of the shared buffer. */
thread_shared_qb = NULL;
} else {
/* It is safe to dereference and reuse the shared query buffer. */
c->querybuf = NULL;
c->qb_pos = 0;
sdsclear(thread_shared_qb);
}

/* Mark that the client is no longer using the shared query buffer
* and indicate that it is no longer used by any client. */
c->flags &= ~CLIENT_SHARED_QUERYBUFFER;
thread_shared_qb_used = 0;
}

void freeClient(client *c) {
listNode *ln;

Expand Down Expand Up @@ -1629,6 +1654,8 @@ void freeClient(client *c) {
}

/* Free the query buffer */
if (c->flags & CLIENT_SHARED_QUERYBUFFER)
resetSharedQueryBuf(c);
sdsfree(c->querybuf);
c->querybuf = NULL;

Expand Down Expand Up @@ -2674,6 +2701,7 @@ void readQueryFromClient(connection *conn) {
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
if (!c->querybuf) c->querybuf = sdsempty();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment that we don't reuse the shared buffer here because we aim for the big arg optimization? or do you think it's clear form the context above?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with 3ebb1b3 (#337)

ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;

Expand All @@ -2685,6 +2713,26 @@ void readQueryFromClient(connection *conn) {
* but doesn't need align to the next arg, we can read more data. */
if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
readlen = PROTO_IOBUF_LEN;
} else if (c->querybuf == NULL) {
if (unlikely(thread_shared_qb_used)) {
/* The shared query buffer is already used by another client,
* switch to using the client's private query buffer. This only
* occurs when commands are executed nested via processEventsWhileBlocked(). */
c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(c->querybuf);
} else {
/* Create the shared query buffer if it doesn't exist. */
if (!thread_shared_qb) {
thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN);
sdsclear(thread_shared_qb);
}

/* Assign the shared query buffer to the client and mark it as in use. */
serverAssert(sdslen(thread_shared_qb) == 0);
c->querybuf = thread_shared_qb;
c->flags |= CLIENT_SHARED_QUERYBUFFER;
thread_shared_qb_used = 1;
}
}

qblen = sdslen(c->querybuf);
Expand All @@ -2708,7 +2756,7 @@ void readQueryFromClient(connection *conn) {
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
goto done;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
Expand Down Expand Up @@ -2760,6 +2808,10 @@ void readQueryFromClient(connection *conn) {
c = NULL;

done:
if (c && (c->flags & CLIENT_SHARED_QUERYBUFFER)) {
serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */
resetSharedQueryBuf(c);
}
beforeNextClient(c);
}

Expand Down Expand Up @@ -2875,8 +2927,8 @@ sds catClientInfoString(sds s, client *client) {
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
" watch=%i", (int) listLength(client->watched_keys),
" qbuf=%U", (unsigned long long) sdslen(client->querybuf),
" qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf),
" qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0,
" qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0,
" argv-mem=%U", (unsigned long long) client->argv_len_sum,
" multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums,
" rbs=%U", (unsigned long long) client->buf_usable_size,
Expand Down Expand Up @@ -3856,9 +3908,10 @@ size_t getClientOutputBufferMemoryUsage(client *c) {
* the client output buffer memory usage portion of the total. */
size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
size_t mem = getClientOutputBufferMemoryUsage(c);

if (output_buffer_mem_usage != NULL)
*output_buffer_mem_usage = mem;
mem += sdsZmallocSize(c->querybuf);
mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
mem += zmalloc_size(c);
mem += c->buf_usable_size;
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
Expand Down
7 changes: 6 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/


#include "server.h"
#include "cluster.h"
#include "bio.h"
Expand Down Expand Up @@ -1765,6 +1767,9 @@ void replicationCreateMasterClient(connection *conn, int dbid) {
* connection. */
server.master->flags |= CLIENT_MASTER;

/* Allocate a private query buffer for the master client instead of using the shared query buffer.
* This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */
server.master->querybuf = sdsempty();
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
Expand Down
29 changes: 24 additions & 5 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/

#include "server.h"
Expand Down Expand Up @@ -734,6 +739,8 @@ long long getInstantaneousMetric(int metric) {
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
/* If the client query buffer is NULL, it is using the shared query buffer and there is nothing to do. */
if (c->querybuf == NULL) return 0;
size_t querybuf_size = sdsalloc(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;

Expand All @@ -743,7 +750,18 @@ int clientsCronResizeQueryBuffer(client *c) {
/* There are two conditions to resize the query buffer: */
if (idletime > 2) {
/* 1) Query is idle for a long time. */
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
size_t remaining = sdslen(c->querybuf) - c->qb_pos;
if (!(c->flags & CLIENT_MASTER) && !remaining) {
/* If the client is not a master and no data is pending,
* The client can safely use the shared query buffer in the next read - free the client's querybuf. */
sdsfree(c->querybuf);
/* By setting the querybuf to NULL, the client will use the shared query buffer in the next read.
* We don't move the client to the shared query buffer immediately, because if we allocated a private
* query buffer for the client, it's likely that the client will use it again soon. */
c->querybuf = NULL;
} else {
c->querybuf = sdsRemoveFreeSpace(c->querybuf, 1);
}
} else if (querybuf_size > PROTO_RESIZE_THRESHOLD && querybuf_size/2 > c->querybuf_peak) {
/* 2) Query buffer is too big for latest peak and is larger than
* resize threshold. Trim excess space but only up to a limit,
Expand All @@ -759,7 +777,7 @@ int clientsCronResizeQueryBuffer(client *c) {

/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = sdslen(c->querybuf);
c->querybuf_peak = c->querybuf ? sdslen(c->querybuf) : 0;
/* We reset to either the current used, or currently processed bulk size,
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen + 2 > c->querybuf_peak) c->querybuf_peak = c->bulklen + 2;
Expand Down Expand Up @@ -834,8 +852,9 @@ size_t ClientsPeakMemInput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};
size_t ClientsPeakMemOutput[CLIENTS_PEAK_MEM_USAGE_SLOTS] = {0};

int clientsCronTrackExpansiveClients(client *c, int time_idx) {
size_t in_usage = sdsZmallocSize(c->querybuf) + c->argv_len_sum +
(c->argv ? zmalloc_size(c->argv) : 0);
size_t qb_size = c->querybuf ? sdsZmallocSize(c->querybuf) : 0;
size_t argv_size = c->argv ? zmalloc_size(c->argv) : 0;
size_t in_usage = qb_size + c->argv_len_sum + argv_size;
size_t out_usage = getClientOutputBufferMemoryUsage(c);

/* Track the biggest values observed so far in this slot. */
Expand Down Expand Up @@ -6567,7 +6586,7 @@ void dismissMemory(void* ptr, size_t size_hint) {
void dismissClientMemory(client *c) {
/* Dismiss client query buffer and static reply buffer. */
dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
if (c->querybuf) dismissSds(c->querybuf);
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_SHARED_QUERYBUFFER (1ULL<<51) /* The client is using the shared query buffer. */

/* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */
#define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE)
Expand Down
72 changes: 68 additions & 4 deletions tests/unit/querybuf.tcl
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
proc client_idle_sec {name} {
set clients [split [r client list] "\r\n"]
set c [lsearch -inline $clients *name=$name*]
Expand All @@ -24,16 +36,45 @@ start_server {tags {"querybuf slow"}} {
# The test will run at least 2s to check if client query
# buffer will be resized when client idle 2s.
test "query buffer resized correctly" {
set rd [redis_client]

set rd [redis_deferring_client]

$rd client setname test_client
$rd read

# Make sure query buff has size of 0 bytes at start as the client uses the shared qb.
assert {[client_query_buffer test_client] == 0}

# Pause cron to prevent premature shrinking (timing issue).
r debug pause-cron 1

# Send partial command to client to make sure it doesn't use the shared qb.
$rd write "*3\r\n\$3\r\nset\r\n\$2\r\na"
$rd flush
# Wait for the client to start using a private query buffer.
wait_for_condition 1000 10 {
[client_query_buffer test_client] > 0
} else {
fail "client should start using a private query buffer"
}

# send the rest of the command
$rd write "a\r\n\$1\r\nb\r\n"
$rd flush
assert_equal {OK} [$rd read]

set orig_test_client_qbuf [client_query_buffer test_client]
# Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k
# but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k
assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf < 32768}
set MAX_QUERY_BUFFER_SIZE [expr 32768 + 2] ; # 32k + 2, allowing for potential greedy allocation of (16k + 1) * 2 bytes for the query buffer.
assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf <= $MAX_QUERY_BUFFER_SIZE}

# Allow shrinking to occur
r debug pause-cron 0

# Check that the initial query buffer is resized after 2 sec
wait_for_condition 1000 10 {
[client_idle_sec test_client] >= 3 && [client_query_buffer test_client] == 0
[client_idle_sec test_client] >= 3 && [client_query_buffer test_client] < $orig_test_client_qbuf
} else {
fail "query buffer was not resized"
}
Expand Down Expand Up @@ -63,7 +104,7 @@ start_server {tags {"querybuf slow"}} {
# Write something smaller, so query buf peak can shrink
$rd set x [string repeat A 100]
set new_test_client_qbuf [client_query_buffer test_client]
if {$new_test_client_qbuf < $orig_test_client_qbuf} { break }
if {$new_test_client_qbuf < $orig_test_client_qbuf && $new_test_client_qbuf > 0} { break }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was there a race condition here?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is CP mistake from the unstable Valkey.

if {[expr [clock milliseconds] - $t] > 1000} { break }
after 10
}
Expand All @@ -78,6 +119,11 @@ start_server {tags {"querybuf slow"}} {
$rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n"
$rd flush

after 200

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to wait for redis to read that incomplete command? maybe we better use wait_for to avoid timing issues.

same in theory applies for the after 20 below, but i'm not sure we can detect it.
maybe by looking at the argv-mem and qbuf fields in CLIENT LIST?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to wait for redis to read that incomplete command? maybe we better use wait_for to avoid timing issues.

fixed in e5a4a67 (#337).

same in theory applies for the after 20 below, but i'm not sure we can detect it. maybe by looking at the argv-mem and qbuf fields in CLIENT LIST?

IIRC, the after 20 is used to verify that the next client cron doesn't shrink the client's query buffer.
in e5a4a67 (#337), we turn on cron before after, so after 120 should be more reasonable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, that can still have a race condition, if we want to wait for cron, we can maybe use a wait_for on some new tick metric. but we don't do that elsewhere. i suppose 120 is good.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but in theory it's hard to run for more than 2 seconds, I'll check it from daily CI.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need. if we'll ever see it fail we'll adjust. we have similar after 120 in other places.

# Send the start of the arg and make sure the client is not using shared qb for it rather a private buf of > 1000000 size.
$rd write "a"
$rd flush

after 20
if {[client_query_buffer test_client] < 1000000} {
fail "query buffer should not be resized when client idle time smaller than 2s"
Expand All @@ -92,5 +138,23 @@ start_server {tags {"querybuf slow"}} {

$rd close
}
}

start_server {tags {"querybuf"}} {
test "Client executes small argv commands using shared query buffer" {
set rd [redis_deferring_client]
$rd client setname test_client
set res [r client list]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have no guarantee that the previous command (setname) was run (missing rd read)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing rd read, fixed it.


# Verify that the client does not create a private query buffer after
# executing a small parameter command.
assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res

# The client executing the command is currently using the shared query buffer,
# so the size shown is that of the shared query buffer. It will be returned
# to the shared query buffer after command execution.
assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res

$rd close
}
}
Loading