diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index d94d81e73ec..d921c7666c1 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -1222,6 +1222,39 @@ class CommandAnalyze : public Commander { std::vector command_args_; }; +class CommandReset : public Commander { + public: + Status Execute(Server *srv, Connection *conn, std::string *output) override { + // 1. Discards the current MULTI transaction block, if one exists. + if (conn->IsFlagEnabled(Connection::kMultiExec)) { + conn->ResetMultiExec(); + } + // 2. Unwatches all keys WATCHed by the connection. + srv->ResetWatchedKeys(conn); + // 3. Disables CLIENT TRACKING, if in use. (not yet supported) + // 4. Sets the connection to READWRITE mode. + // 5. Cancels the connection's ASKING mode, if previously set. (not yet supported) + // 6. Sets CLIENT REPLY to ON. (not yet supported) + // 9. Exits MONITOR mode, when applicable. + if (conn->IsFlagEnabled(Connection::kMonitor)) { + conn->Owner()->QuitMonitorConn(conn); + } + // 10. Aborts Pub/Sub's subscription state (SUBSCRIBE and PSUBSCRIBE), when appropriate. + if (conn->SubscriptionsCount() != 0) { + conn->UnsubscribeAll(); + } + if (conn->PSubscriptionsCount() != 0) { + conn->PUnsubscribeAll(); + } + // 11. Deauthenticates the connection, requiring a call AUTH to reauthenticate when authentication is enabled. + conn->SetNamespace(kDefaultNamespace); + conn->BecomeAdmin(); + // 12. Turns off NO-EVICT / NO-TOUCH mode. (not yet supported) + *output = redis::SimpleString("RESET"); + return Status::OK(); + } +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loading", 0, 0, 0), MakeCmdAttr("ping", -1, "read-only", 0, 0, 0), MakeCmdAttr("select", 2, "read-only", 0, 0, 0), @@ -1257,6 +1290,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loadin MakeCmdAttr("slaveof", 3, "read-only exclusive no-script", 0, 0, 0), MakeCmdAttr("stats", 1, "read-only", 0, 0, 0), MakeCmdAttr("rdb", -3, "write exclusive", 0, 0, 0), - MakeCmdAttr("analyze", -1, "", 0, 0, 0), ) - + MakeCmdAttr("analyze", -1, "", 0, 0, 0), + MakeCmdAttr("reset", -1, "multi pub-sub", 0, 0, 0), ) } // namespace redis diff --git a/src/server/worker.cc b/src/server/worker.cc index 47042d030b7..1e8fed37441 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -464,6 +464,16 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) { conn->EnableFlag(redis::Connection::kMonitor); } +void Worker::QuitMonitorConn(redis::Connection *conn) { + { + std::lock_guard guard(conns_mu_); + monitor_conns_.erase(conn->GetFD()); + conns_[conn->GetFD()] = conn; + } + srv->DecrMonitorClientNum(); + conn->DisableFlag(redis::Connection::kMonitor); +} + void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { std::unique_lock lock(conns_mu_); diff --git a/src/server/worker.h b/src/server/worker.h index a9f618e57f5..b6918ba9296 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -62,6 +62,7 @@ class Worker : EventCallbackBase, EvconnlistenerBase { Status EnableWriteEvent(int fd); Status Reply(int fd, const std::string &reply); void BecomeMonitorConn(redis::Connection *conn); + void QuitMonitorConn(redis::Connection *conn); void FeedMonitorConns(redis::Connection *conn, const std::string &response); std::string GetClientsStr(); diff --git a/tests/gocase/unit/reset/reset_test.go b/tests/gocase/unit/reset/reset_test.go new file mode 100644 index 00000000000..9d16edb13a9 --- /dev/null +++ b/tests/gocase/unit/reset/reset_test.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package reset + +import ( + "context" + "fmt" + "testing" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/stretchr/testify/require" +) + +func TestReset(t *testing.T) { + srv := util.StartServer(t, map[string]string{}) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + t.Run("reset with ongoing txn", func(t *testing.T) { + require.NoError(t, rdb.Set(ctx, "x", "30", 0).Err()) + require.NoError(t, rdb.Do(ctx, "multi").Err()) + require.NoError(t, rdb.Set(ctx, "x", "40", 0).Err()) + require.NoError(t, rdb.Do(ctx, "reset").Err()) + + v1 := rdb.Do(ctx, "get", "x").Val() + require.Equal(t, "30", fmt.Sprintf("%v", v1)) + }) + + t.Run("unwatch keys", func(t *testing.T) { + require.NoError(t, rdb.Set(ctx, "x", 30, 0).Err()) + require.NoError(t, rdb.Do(ctx, "watch", "x").Err()) + require.NoError(t, rdb.Do(ctx, "multi").Err()) + require.NoError(t, rdb.Do(ctx, "ping").Err()) + require.NoError(t, rdb.Do(ctx, "reset").Err()) + + require.NoError(t, rdb.Set(ctx, "x", 40, 0).Err()) + require.NoError(t, rdb.Do(ctx, "multi").Err()) + require.NoError(t, rdb.Do(ctx, "ping").Err()) + require.Equal(t, rdb.Do(ctx, "exec").Val(), []interface{}{"PONG"}) + }) + + t.Run("unsub and punsub", func(t *testing.T) { + require.NoError(t, rdb.Do(ctx, "subscribe", "chan1").Err()) + require.NoError(t, rdb.Do(ctx, "reset").Err()) + require.Equal(t, rdb.Do(ctx, "subscribe", "chan2").Val(), []interface{}{"subscribe", "chan2", (int64)(1)}) + }) +}