Skip to content

Commit

Permalink
Add support of READONLY and READWRITE (#2173)
Browse files Browse the repository at this point in the history
Co-authored-by: 纪华裕 <[email protected]>
  • Loading branch information
LiuYuHui and jihuayu authored Mar 17, 2024
1 parent 787555e commit 20e01c4
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
3 changes: 2 additions & 1 deletion src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,8 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons
}

if (myself_ && myself_->role == kClusterSlave && !(attributes->flags & redis::kCmdWrite) &&
nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot]) {
nodes_.find(myself_->master_id) != nodes_.end() && nodes_[myself_->master_id] == slots_nodes_[slot] &&
conn->IsFlagEnabled(redis::Connection::KReadOnly)) {
return Status::OK(); // My master is serving this slot
}

Expand Down
24 changes: 22 additions & 2 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "cluster/sync_migrate_context.h"
#include "commander.h"
#include "error_constants.h"
#include "status.h"

namespace redis {

Expand Down Expand Up @@ -292,8 +293,27 @@ static uint64_t GenerateClusterFlag(const std::vector<std::string> &args) {
return 0;
}

class CommandReadOnly : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
*output = redis::SimpleString("OK");
conn->EnableFlag(redis::Connection::KReadOnly);
return Status::OK();
}
};

class CommandReadWrite : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
*output = redis::SimpleString("OK");
conn->DisableFlag(redis::Connection::KReadOnly);
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandCluster>("cluster", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster no-script", 0, 0, 0,
GenerateClusterFlag), )
MakeCmdAttr<CommandClusterX>("clusterx", -2, "cluster no-script", 0, 0, 0, GenerateClusterFlag),
MakeCmdAttr<CommandReadOnly>("readonly", 1, "cluster no-multi", 0, 0, 0),
MakeCmdAttr<CommandReadWrite>("readwrite", 1, "cluster no-multi", 0, 0, 0), )

} // namespace redis
1 change: 1 addition & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class Connection : public EvbufCallbackBase<Connection> {
kCloseAfterReply = 1 << 6,
kCloseAsync = 1 << 7,
kMultiExec = 1 << 8,
KReadOnly = 1 << 9,
};

explicit Connection(bufferevent *bev, Worker *owner);
Expand Down
19 changes: 19 additions & 0 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ func TestClusterMultiple(t *testing.T) {
require.ErrorContains(t, rdb[3].Set(ctx, util.SlotTable[16383], 16383, 0).Err(), "MOVED")
// request a read-only command to node3 that serve slot 16383, that's ok
util.WaitForOffsetSync(t, rdb[2], rdb[3])
//the default option is READWRITE, which will redirect both read and write to master
require.ErrorContains(t, rdb[3].Get(ctx, util.SlotTable[16383]).Err(), "MOVED")

require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())

require.Equal(t, "16383", rdb[3].Get(ctx, util.SlotTable[16383]).Val())
})

Expand Down Expand Up @@ -369,4 +374,18 @@ func TestClusterMultiple(t *testing.T) {
require.ErrorContains(t, rdb[1].Do(ctx, "EXEC").Err(), "EXECABORT")
require.Equal(t, "no-multi", rdb[1].Get(ctx, util.SlotTable[0]).Val())
})

t.Run("requests on cluster are ok when enable readonly", func(t *testing.T) {

require.NoError(t, rdb[3].Do(ctx, "READONLY").Err())
require.NoError(t, rdb[2].Set(ctx, util.SlotTable[8192], 8192, 0).Err())
util.WaitForOffsetSync(t, rdb[2], rdb[3])
// request node3 that serves slot 8192, that's ok
require.Equal(t, "8192", rdb[3].Get(ctx, util.SlotTable[8192]).Val())

require.NoError(t, rdb[3].Do(ctx, "READWRITE").Err())

// when enable READWRITE, request node3 that serves slot 8192, that's not ok
util.ErrorRegexp(t, rdb[3].Get(ctx, util.SlotTable[8192]).Err(), fmt.Sprintf("MOVED 8192.*%d.*", srv[2].Port()))
})
}

0 comments on commit 20e01c4

Please sign in to comment.