-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce reusable query buffer for client reads #337
base: unstable
Are you sure you want to change the base?
Changes from 19 commits
9ce10d3
14cf2d1
4723b01
c4b3090
908268d
ebf45de
b50f5cc
0569168
7125bc6
1c37cc5
9b8d07e
9d70fa3
0b64a50
e5a4a67
3ebb1b3
060cbb9
e472780
630c739
5ea62d0
95a5d9e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,9 @@ static void pauseClientsByClient(mstime_t end, int isPauseClientAll); | |
int postponeClientRead(client *c); | ||
char *getClientSockname(client *c); | ||
int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ | ||
__thread sds thread_shared_qb = NULL; | ||
__thread int thread_shared_qb_used = 0; /* Avoid multiple clients using reusable query | ||
* buffer due to nested command execution. */ | ||
|
||
/* Return the size consumed from the allocator, for the specified SDS string, | ||
* including internal fragmentation. This function is used in order to compute | ||
|
@@ -144,7 +147,7 @@ client *createClient(connection *conn) { | |
c->ref_repl_buf_node = NULL; | ||
c->ref_block_pos = 0; | ||
c->qb_pos = 0; | ||
c->querybuf = sdsempty(); | ||
c->querybuf = NULL; | ||
c->querybuf_peak = 0; | ||
c->reqtype = 0; | ||
c->argc = 0; | ||
|
@@ -1575,6 +1578,28 @@ void deauthenticateAndCloseClient(client *c) { | |
} | ||
} | ||
|
||
/* Resets the reusable query buffer used by the given client. | ||
* If any data remained in the buffer, the client will take ownership of the buffer | ||
* and a new empty buffer will be allocated for the reusable buffer. */ | ||
static void resetReusableQueryBuf(client *c) { | ||
serverAssert(c->flags & CLIENT_REUSABLE_QUERYBUFFER); | ||
if (c->querybuf != thread_shared_qb || sdslen(c->querybuf) > c->qb_pos) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
this means that |
||
/* If querybuf has been reallocated or there is still data left, | ||
* let the client take ownership of the reusable buffer. */ | ||
thread_shared_qb = NULL; | ||
} else { | ||
/* It is safe to dereference and reuse the reusable query buffer. */ | ||
c->querybuf = NULL; | ||
c->qb_pos = 0; | ||
sdsclear(thread_shared_qb); | ||
} | ||
|
||
/* Mark that the client is no longer using the reusable query buffer | ||
* and indicate that it is no longer used by any client. */ | ||
c->flags &= ~CLIENT_REUSABLE_QUERYBUFFER; | ||
thread_shared_qb_used = 0; | ||
} | ||
|
||
void freeClient(client *c) { | ||
listNode *ln; | ||
|
||
|
@@ -1629,6 +1654,8 @@ void freeClient(client *c) { | |
} | ||
|
||
/* Free the query buffer */ | ||
if (c->flags & CLIENT_REUSABLE_QUERYBUFFER) | ||
resetReusableQueryBuf(c); | ||
sdsfree(c->querybuf); | ||
c->querybuf = NULL; | ||
|
||
|
@@ -2674,6 +2701,11 @@ void readQueryFromClient(connection *conn) { | |
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 | ||
&& c->bulklen >= PROTO_MBULK_BIG_ARG) | ||
{ | ||
/* For big argv, the client always uses its private query buffer. | ||
* Using the reusable query buffer would eventually expand it beyond 32k, | ||
* causing the client to take ownership of the reusable query buffer. */ | ||
if (!c->querybuf) c->querybuf = sdsempty(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add a comment that we don't reuse the shared buffer here because we aim for the big arg optimization? or do you think it's clear form the context above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done with |
||
|
||
ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos); | ||
big_arg = 1; | ||
|
||
|
@@ -2685,6 +2717,26 @@ void readQueryFromClient(connection *conn) { | |
* but doesn't need align to the next arg, we can read more data. */ | ||
if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN) | ||
readlen = PROTO_IOBUF_LEN; | ||
} else if (c->querybuf == NULL) { | ||
if (unlikely(thread_shared_qb_used)) { | ||
/* The reusable query buffer is already used by another client, | ||
* switch to using the client's private query buffer. This only | ||
* occurs when commands are executed nested via processEventsWhileBlocked(). */ | ||
c->querybuf = sdsnewlen(NULL, PROTO_IOBUF_LEN); | ||
sdsclear(c->querybuf); | ||
} else { | ||
/* Create the reusable query buffer if it doesn't exist. */ | ||
if (!thread_shared_qb) { | ||
thread_shared_qb = sdsnewlen(NULL, PROTO_IOBUF_LEN); | ||
sdsclear(thread_shared_qb); | ||
} | ||
|
||
/* Assign the reusable query buffer to the client and mark it as in use. */ | ||
serverAssert(sdslen(thread_shared_qb) == 0); | ||
c->querybuf = thread_shared_qb; | ||
c->flags |= CLIENT_REUSABLE_QUERYBUFFER; | ||
thread_shared_qb_used = 1; | ||
} | ||
} | ||
|
||
qblen = sdslen(c->querybuf); | ||
|
@@ -2708,7 +2760,7 @@ void readQueryFromClient(connection *conn) { | |
nread = connRead(c->conn, c->querybuf+qblen, readlen); | ||
if (nread == -1) { | ||
if (connGetState(conn) == CONN_STATE_CONNECTED) { | ||
return; | ||
goto done; | ||
} else { | ||
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); | ||
freeClientAsync(c); | ||
|
@@ -2760,6 +2812,10 @@ void readQueryFromClient(connection *conn) { | |
c = NULL; | ||
|
||
done: | ||
if (c && (c->flags & CLIENT_REUSABLE_QUERYBUFFER)) { | ||
serverAssert(c->qb_pos == 0); /* Ensure the client's query buffer is trimmed in processInputBuffer */ | ||
resetReusableQueryBuf(c); | ||
} | ||
beforeNextClient(c); | ||
} | ||
|
||
|
@@ -2875,8 +2931,8 @@ sds catClientInfoString(sds s, client *client) { | |
" ssub=%i", (int) dictSize(client->pubsubshard_channels), | ||
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, | ||
" watch=%i", (int) listLength(client->watched_keys), | ||
" qbuf=%U", (unsigned long long) sdslen(client->querybuf), | ||
" qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), | ||
" qbuf=%U", client->querybuf ? (unsigned long long) sdslen(client->querybuf) : 0, | ||
" qbuf-free=%U", client->querybuf ? (unsigned long long) sdsavail(client->querybuf) : 0, | ||
" argv-mem=%U", (unsigned long long) client->argv_len_sum, | ||
" multi-mem=%U", (unsigned long long) client->mstate.argv_len_sums, | ||
" rbs=%U", (unsigned long long) client->buf_usable_size, | ||
|
@@ -3856,9 +3912,10 @@ size_t getClientOutputBufferMemoryUsage(client *c) { | |
* the client output buffer memory usage portion of the total. */ | ||
size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { | ||
size_t mem = getClientOutputBufferMemoryUsage(c); | ||
|
||
if (output_buffer_mem_usage != NULL) | ||
*output_buffer_mem_usage = mem; | ||
mem += sdsZmallocSize(c->querybuf); | ||
mem += c->querybuf ? sdsZmallocSize(c->querybuf) : 0; | ||
mem += zmalloc_size(c); | ||
mem += c->buf_usable_size; | ||
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,15 @@ | ||
# | ||
# Copyright (c) 2009-Present, Redis Ltd. | ||
# All rights reserved. | ||
# | ||
# Copyright (c) 2024-present, Valkey contributors. | ||
# All rights reserved. | ||
# | ||
# Licensed under your choice of the Redis Source Available License 2.0 | ||
# (RSALv2) or the Server Side Public License v1 (SSPLv1). | ||
# | ||
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. | ||
# | ||
proc client_idle_sec {name} { | ||
set clients [split [r client list] "\r\n"] | ||
set c [lsearch -inline $clients *name=$name*] | ||
|
@@ -24,16 +36,45 @@ start_server {tags {"querybuf slow"}} { | |
# The test will run at least 2s to check if client query | ||
# buffer will be resized when client idle 2s. | ||
test "query buffer resized correctly" { | ||
set rd [redis_client] | ||
|
||
set rd [redis_deferring_client] | ||
|
||
$rd client setname test_client | ||
$rd read | ||
|
||
# Make sure query buff has size of 0 bytes at start as the client uses the reusable qb. | ||
assert {[client_query_buffer test_client] == 0} | ||
|
||
# Pause cron to prevent premature shrinking (timing issue). | ||
r debug pause-cron 1 | ||
|
||
# Send partial command to client to make sure it doesn't use the reusable qb. | ||
$rd write "*3\r\n\$3\r\nset\r\n\$2\r\na" | ||
$rd flush | ||
# Wait for the client to start using a private query buffer. | ||
wait_for_condition 1000 10 { | ||
[client_query_buffer test_client] > 0 | ||
} else { | ||
fail "client should start using a private query buffer" | ||
} | ||
|
||
# send the rest of the command | ||
$rd write "a\r\n\$1\r\nb\r\n" | ||
$rd flush | ||
assert_equal {OK} [$rd read] | ||
|
||
set orig_test_client_qbuf [client_query_buffer test_client] | ||
# Make sure query buff has less than the peak resize threshold (PROTO_RESIZE_THRESHOLD) 32k | ||
# but at least the basic IO reading buffer size (PROTO_IOBUF_LEN) 16k | ||
assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf < 32768} | ||
set MAX_QUERY_BUFFER_SIZE [expr 32768 + 2] ; # 32k + 2, allowing for potential greedy allocation of (16k + 1) * 2 bytes for the query buffer. | ||
assert {$orig_test_client_qbuf >= 16384 && $orig_test_client_qbuf <= $MAX_QUERY_BUFFER_SIZE} | ||
|
||
# Allow shrinking to occur | ||
r debug pause-cron 0 | ||
|
||
# Check that the initial query buffer is resized after 2 sec | ||
wait_for_condition 1000 10 { | ||
[client_idle_sec test_client] >= 3 && [client_query_buffer test_client] == 0 | ||
[client_idle_sec test_client] >= 3 && [client_query_buffer test_client] < $orig_test_client_qbuf | ||
} else { | ||
fail "query buffer was not resized" | ||
} | ||
|
@@ -75,10 +116,27 @@ start_server {tags {"querybuf slow"}} { | |
test "query buffer resized correctly with fat argv" { | ||
set rd [redis_client] | ||
$rd client setname test_client | ||
|
||
# Pause cron to prevent premature shrinking (timing issue). | ||
r debug pause-cron 1 | ||
|
||
$rd write "*3\r\n\$3\r\nset\r\n\$1\r\na\r\n\$1000000\r\n" | ||
$rd flush | ||
|
||
# Wait for the client to start using a private query buffer of > 1000000 size. | ||
wait_for_condition 1000 10 { | ||
[client_query_buffer test_client] > 1000000 | ||
} else { | ||
fail "client should start using a private query buffer" | ||
} | ||
|
||
after 20 | ||
# Send the start of the arg and make sure the client is not using reusable qb for it rather a private buf of > 1000000 size. | ||
$rd write "a" | ||
$rd flush | ||
|
||
r debug pause-cron 0 | ||
|
||
after 120 | ||
if {[client_query_buffer test_client] < 1000000} { | ||
fail "query buffer should not be resized when client idle time smaller than 2s" | ||
} | ||
|
@@ -92,5 +150,24 @@ start_server {tags {"querybuf slow"}} { | |
|
||
$rd close | ||
} | ||
} | ||
|
||
start_server {tags {"querybuf"}} { | ||
test "Client executes small argv commands using reusable query buffer" { | ||
set rd [redis_deferring_client] | ||
$rd client setname test_client | ||
$rd read | ||
set res [r client list] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we have no guarantee that the previous command (setname) was run (missing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. missing |
||
|
||
# Verify that the client does not create a private query buffer after | ||
# executing a small parameter command. | ||
assert_match {*name=test_client * qbuf=0 qbuf-free=0 * cmd=client|setname *} $res | ||
|
||
# The client executing the command is currently using the reusable query buffer, | ||
# so the size shown is that of the reusable query buffer. It will be returned | ||
# to the reusable query buffer after command execution. | ||
assert_match {*qbuf=26 qbuf-free=* cmd=client|list *} $res | ||
|
||
$rd close | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you renamed everything, but didn't rename the variable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙃i only updated the name in the comments and the
resetReusableQueryBuf()
method (I added), so it looks like almost everything has changed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, it looked like you changed a lot of lines for that.. (could cause a lot of conflict).
i thought i'm suggesting just editing one comment.
but i didn't look at the original code. do what you think is best.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i actually reworked a pretty big chunk of code, the code is now much more efficient and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so feel free to rename the variable too