diff --git a/README_CN.md b/README_CN.md index d027571e84..da2d56ed39 100644 --- a/README_CN.md +++ b/README_CN.md @@ -2,9 +2,9 @@ ## Pika简介 [English](https://github.com/Qihoo360/pika/blob/master/README.md) -Pika 是一个以 RocksDB 为存储引擎的的大容量、高性能、多租户、数据可持久化的弹性 KV 数据存储系统,完全兼容 Redis 协议,支持其常用的数据结构,如 string/hash/list/zset/set/geo/hyperloglog/pubsub/bitmap/stream 等 [Redis 接口](https://github.com/OpenAtomFoundation/pika/wiki/pika-%E6%94%AF%E6%8C%81%E7%9A%84redis%E6%8E%A5%E5%8F%A3%E5%8F%8A%E5%85%BC%E5%AE%B9%E6%83%85%E5%86%B5)。 +Pika 是一个以 RocksDB 为存储引擎的的大容量、高性能、多租户、数据可持久化的弹性 KV 数据存储系统,完全兼容 Redis 协议,支持其常用的数据结构,如 string/hash/list/zset/set/geo/hyperloglog/pubsub/bitmap/stream 等 [Redis 接口](https://github.com/OpenAtomFoundation/pika/wiki/pika-%E6%94%AF%E6%8C%81%E7%9A%84redis%E6%8E%A5%E5%8F%A3%E5%8F%8A%E5%85%BC%E5%AE%B9%E6%83%85%E5%86%B5)。 -Redis 的内存使用了超过 16GiB 时,会面临内存容量有限、单线程阻塞、启动恢复时间长、内存硬件成本贵、缓冲区容易写满、一主多从故障时切换代价大等问题。Pika 的出现并不是为了替代 Redis, 而是 Redis 补充。Pika 力求在完全兼容Redis 协议、继承 Redis 便捷运维设计的前提下,通过持久化存储的方式解决了 Redis 一旦存储数据量巨大就会出现内存容量不足的瓶颈问题,并且可以像 Redis 一样,支持使用 slaveof 命令实现主从模式,还支持数据的全量同步和增量同步。 +Redis 的内存使用量超过一定阈值【如 16GiB 】时,会面临内存容量有限、单线程阻塞、启动恢复时间长、内存硬件成本贵、缓冲区容易写满、一主多从故障时切换代价大等问题。Pika 的出现并不是为了替代 Redis, 而是 Redis 补充。Pika 力求在完全兼容Redis 协议、继承 Redis 便捷运维设计的前提下,通过持久化存储的方式解决了 Redis 一旦存储数据量巨大就会出现内存容量不足的瓶颈问题,并且可以像 Redis 一样,支持使用 slaveof 命令实现主从模式,还支持数据的全量同步和增量同步。 还可以通过 twemproxy or [Codis](https://github.com/OpenAtomFoundation/pika/tree/unstable/cluster) 以静态数据分片方式实现 Pika 集群。 diff --git a/codis/cmd/fe/assets/dashboard-fe.js b/codis/cmd/fe/assets/dashboard-fe.js index 35aa66a691..c6e63c6c3f 100644 --- a/codis/cmd/fe/assets/dashboard-fe.js +++ b/codis/cmd/fe/assets/dashboard-fe.js @@ -456,7 +456,7 @@ function isValidInput(text) { function processGroupStats(codis_stats) { var group_array = codis_stats.group.models; var group_stats = codis_stats.group.stats; - var keys = 0, memory = 0; + var keys = 0, memory = 0, pika_memory = 0; var dbkeyRegexp = /db\d+/ for (var i = 0; i < group_array.length; i++) { var g = group_array[i]; @@ -504,7 +504,7 @@ function processGroupStats(codis_stats) { if (s.stats["used_memory"]) { var v = parseInt(s.stats["used_memory"], 10); if (j == 0) { - memory += v; + pika_memory += v; } x.memory = humanSize(v); } @@ -552,7 +552,7 @@ function processGroupStats(codis_stats) { x.server_text = x.server; } } - return {group_array: group_array, keys: keys, memory: memory}; + return {group_array: group_array, keys: keys, pika_memory: pika_memory}; } dashboard.config(['$interpolateProvider', @@ -588,8 +588,7 @@ dashboard.controller('MainCodisCtrl', ['$scope', '$http', '$uibModal', '$timeout $scope.codis_coord_addr = "NA"; $scope.codis_qps = "NA"; $scope.codis_sessions = "NA"; - $scope.redis_mem = "NA"; - $scope.redis_keys = "NA"; + $scope.pika_mem = "NA"; $scope.slots_array = []; $scope.proxy_array = []; $scope.group_array = []; @@ -662,8 +661,7 @@ dashboard.controller('MainCodisCtrl', ['$scope', '$http', '$uibModal', '$timeout $scope.codis_qps = proxy_stats.qps; $scope.codis_sessions = proxy_stats.sessions; - $scope.redis_mem = humanSize(group_stats.memory); - $scope.redis_keys = group_stats.keys.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ","); + $scope.pika_mem = humanSize(group_stats.pika_memory); $scope.slots_array = merge($scope.slots_array, codis_stats.slots); $scope.proxy_array = merge($scope.proxy_array, proxy_stats.proxy_array); $scope.group_array = merge($scope.group_array, group_stats.group_array); diff --git a/codis/cmd/fe/assets/index.html b/codis/cmd/fe/assets/index.html index 41f2a8e29f..023bf98c49 100644 --- a/codis/cmd/fe/assets/index.html +++ b/codis/cmd/fe/assets/index.html @@ -63,12 +63,8 @@

Overview

[[codis_sessions]] - Redis Memory - [[redis_mem]] - - - Redis Keys - [[redis_keys]] + Pika Memory + [[pika_mem]] Slot Nums diff --git a/codis/config/proxy.toml b/codis/config/proxy.toml index 1c430855fb..5f46885413 100644 --- a/codis/config/proxy.toml +++ b/codis/config/proxy.toml @@ -128,3 +128,4 @@ metrics_report_statsd_prefix = "" # Maximum delay statistical time interval.(This value must be greater than 0.) max_delay_refresh_time_interval = "15s" + diff --git a/include/pika_admin.h b/include/pika_admin.h index a61c77cebe..a3e6f3217a 100644 --- a/include/pika_admin.h +++ b/include/pika_admin.h @@ -342,7 +342,7 @@ class ConfigCmd : public Cmd { std::vector config_args_v_; void DoInitial() override; void ConfigGet(std::string& ret); - void ConfigSet(std::string& ret, std::shared_ptr db); + void ConfigSet(std::shared_ptr db); void ConfigRewrite(std::string& ret); void ConfigResetstat(std::string& ret); void ConfigRewriteReplicationID(std::string& ret); diff --git a/include/pika_command.h b/include/pika_command.h index d6bd22be93..4b52eda9e5 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -441,7 +441,7 @@ class CmdRes { void AppendStringVector(const std::vector& strArray) { if (strArray.empty()) { - AppendArrayLen(-1); + AppendArrayLen(0); return; } AppendArrayLen(strArray.size()); diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 83d8f53204..9124968934 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1519,7 +1519,7 @@ void ConfigCmd::Do() { if (strcasecmp(config_args_v_[0].data(), "get") == 0) { ConfigGet(config_ret); } else if (strcasecmp(config_args_v_[0].data(), "set") == 0) { - ConfigSet(config_ret, db_); + ConfigSet(db_); } else if (strcasecmp(config_args_v_[0].data(), "rewrite") == 0) { ConfigRewrite(config_ret); } else if (strcasecmp(config_args_v_[0].data(), "resetstat") == 0) { @@ -2134,124 +2134,126 @@ void ConfigCmd::ConfigGet(std::string& ret) { } // Remember to sync change PikaConf::ConfigRewrite(); -void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { +void ConfigCmd::ConfigSet(std::shared_ptr db) { std::string set_item = config_args_v_[1]; if (set_item == "*") { - ret = "*29\r\n"; - EncodeString(&ret, "timeout"); - EncodeString(&ret, "requirepass"); - EncodeString(&ret, "masterauth"); - EncodeString(&ret, "slotmigrate"); - EncodeString(&ret, "userpass"); - EncodeString(&ret, "userblacklist"); - EncodeString(&ret, "dump-prefix"); - EncodeString(&ret, "maxclients"); - EncodeString(&ret, "dump-expire"); - EncodeString(&ret, "expire-logs-days"); - EncodeString(&ret, "expire-logs-nums"); - EncodeString(&ret, "root-connection-num"); - EncodeString(&ret, "slowlog-write-errorlog"); - EncodeString(&ret, "slowlog-log-slower-than"); - EncodeString(&ret, "slowlog-max-len"); - EncodeString(&ret, "write-binlog"); - EncodeString(&ret, "max-cache-statistic-keys"); - EncodeString(&ret, "small-compaction-threshold"); - EncodeString(&ret, "small-compaction-duration-threshold"); - EncodeString(&ret, "max-client-response-size"); - EncodeString(&ret, "db-sync-speed"); - EncodeString(&ret, "compact-cron"); - EncodeString(&ret, "compact-interval"); - EncodeString(&ret, "disable_auto_compactions"); - EncodeString(&ret, "slave-priority"); - EncodeString(&ret, "sync-window-size"); - EncodeString(&ret, "slow-cmd-list"); - // Options for storage engine - // MutableDBOptions - EncodeString(&ret, "max-cache-files"); - EncodeString(&ret, "max-background-compactions"); - EncodeString(&ret, "max-background-jobs"); - // MutableColumnFamilyOptions - EncodeString(&ret, "write-buffer-size"); - EncodeString(&ret, "max-write-buffer-num"); - EncodeString(&ret, "arena-block-size"); - EncodeString(&ret, "throttle-bytes-per-second"); - EncodeString(&ret, "max-rsync-parallel-num"); - EncodeString(&ret, "cache-num"); - EncodeString(&ret, "cache-model"); - EncodeString(&ret, "cache-type"); - EncodeString(&ret, "zset-cache-start-direction"); - EncodeString(&ret, "zset-cache-field-num-per-key"); - EncodeString(&ret, "cache-maxmemory"); - EncodeString(&ret, "cache-maxmemory-policy"); - EncodeString(&ret, "cache-maxmemory-samples"); - EncodeString(&ret, "cache-lfu-decay-time"); + std::vector replyVt({ + "timeout", + "requirepass", + "masterauth", + "slotmigrate", + "userpass", + "userblacklist", + "dump-prefix", + "maxclients", + "dump-expire", + "expire-logs-days", + "expire-logs-nums", + "root-connection-num", + "slowlog-write-errorlog", + "slowlog-log-slower-than", + "slowlog-max-len", + "write-binlog", + "max-cache-statistic-keys", + "small-compaction-threshold", + "small-compaction-duration-threshold", + "max-client-response-size", + "db-sync-speed", + "compact-cron", + "compact-interval", + "disable_auto_compactions", + "slave-priority", + "sync-window-size", + "slow-cmd-list", + // Options for storage engine + // MutableDBOptions + "max-cache-files", + "max-background-compactions", + "max-background-jobs", + // MutableColumnFamilyOptions + "write-buffer-size", + "max-write-buffer-num", + "arena-block-size", + "throttle-bytes-per-second", + "max-rsync-parallel-num", + "cache-num", + "cache-model", + "cache-type", + "zset-cache-start-direction", + "zset-cache-field-num-per-key", + "cache-maxmemory", + "cache-maxmemory-policy", + "cache-maxmemory-samples", + "cache-lfu-decay-time", + }); + res_.AppendStringVector(replyVt); return; } long int ival = 0; std::string value = config_args_v_[2]; if (set_item == "timeout") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'timeout'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'timeout'\r\n"); return; } g_pika_conf->SetTimeout(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "requirepass") { g_pika_conf->SetRequirePass(value); g_pika_server->Acl()->UpdateDefaultUserPassword(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "masterauth") { g_pika_conf->SetMasterAuth(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slotmigrate") { g_pika_conf->SetSlotMigrate(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "dump-prefix") { g_pika_conf->SetBgsavePrefix(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "maxclients") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'maxclients'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'maxclients'\r\n"); return; } g_pika_conf->SetMaxConnection(static_cast(ival)); g_pika_server->SetDispatchQueueLimit(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "dump-expire") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'dump-expire'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'dump-expire'\r\n"); return; } g_pika_conf->SetExpireDumpDays(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slave-priority") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slave-priority'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slave-priority'\r\n"); return; } g_pika_conf->SetSlavePriority(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "expire-logs-days") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-days'\r\n"; + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-days'\r\n"); return; } g_pika_conf->SetExpireLogsDays(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "expire-logs-nums") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n"); return; } g_pika_conf->SetExpireLogsNums(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "root-connection-num") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'root-connection-num'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'root-connection-num'\r\n"); return; } g_pika_conf->SetRootConnectionNum(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slowlog-write-errorlog") { bool is_write_errorlog; if (value == "yes") { @@ -2259,92 +2261,92 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { } else if (value == "no") { is_write_errorlog = false; } else { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-write-errorlog'\r\n"; + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-write-errorlog'\r\n"); return; } g_pika_conf->SetSlowlogWriteErrorlog(is_write_errorlog); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slowlog-log-slower-than") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n"); return; } g_pika_conf->SetSlowlogSlowerThan(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slowlog-max-len") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-max-len'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-max-len'\r\n"); return; } g_pika_conf->SetSlowlogMaxLen(static_cast(ival)); g_pika_server->SlowlogTrim(); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-cache-statistic-keys") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-statistic-keys'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-statistic-keys'\r\n"); return; } g_pika_conf->SetMaxCacheStatisticKeys(static_cast(ival)); g_pika_server->DBSetMaxCacheStatisticKeys(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "small-compaction-threshold") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-threshold'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-threshold'\r\n"); return; } g_pika_conf->SetSmallCompactionThreshold(static_cast(ival)); g_pika_server->DBSetSmallCompactionThreshold(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw( "+OK\r\n"); } else if (set_item == "small-compaction-duration-threshold") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-duration-threshold'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'small-compaction-duration-threshold'\r\n"); return; } g_pika_conf->SetSmallCompactionDurationThreshold(static_cast(ival)); g_pika_server->DBSetSmallCompactionDurationThreshold(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "disable_auto_compactions") { if (value != "true" && value != "false") { - ret = "-ERR invalid disable_auto_compactions (true or false)\r\n"; + res_.AppendStringRaw("-ERR invalid disable_auto_compactions (true or false)\r\n"); return; } std::unordered_map options_map{{"disable_auto_compactions", value}}; storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); if (!s.ok()) { - ret = "-ERR Set storage::OptionType::kColumnFamily disable_auto_compactions wrong: " + s.ToString() + "\r\n"; + res_.AppendStringRaw("-ERR Set storage::OptionType::kColumnFamily disable_auto_compactions wrong: " + s.ToString() + "\r\n"); return; } g_pika_conf->SetDisableAutoCompaction(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-client-response-size") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-client-response-size'\r\n"); return; } g_pika_conf->SetMaxClientResponseSize(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "write-binlog") { int role = g_pika_server->role(); if (role == PIKA_ROLE_SLAVE) { - ret = "-ERR need to close master-slave mode first\r\n"; + res_.AppendStringRaw("-ERR need to close master-slave mode first\r\n"); return; } else if (value != "yes" && value != "no") { - ret = "-ERR invalid write-binlog (yes or no)\r\n"; + res_.AppendStringRaw("-ERR invalid write-binlog (yes or no)\r\n"); return; } else { g_pika_conf->SetWriteBinlog(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } } else if (set_item == "db-sync-speed") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'db-sync-speed(MB)'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'db-sync-speed(MB)'\r\n"); return; } if (ival < 0 || ival > 1024) { ival = 1024; } g_pika_conf->SetDbSyncSpeed(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "compact-cron") { bool invalid = false; if (!value.empty()) { @@ -2379,11 +2381,11 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { } } if (invalid) { - ret = "-ERR invalid compact-cron\r\n"; + res_.AppendStringRaw("-ERR invalid compact-cron\r\n"); return; } else { g_pika_conf->SetCompactCron(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } } else if (set_item == "compact-interval") { bool invalid = false; @@ -2401,121 +2403,121 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { } } if (invalid) { - ret = "-ERR invalid compact-interval\r\n"; + res_.AppendStringRaw("-ERR invalid compact-interval\r\n"); return; } else { g_pika_conf->SetCompactInterval(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } } else if (set_item == "sync-window-size") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"); return; } if (ival <= 0 || ival > kBinlogReadWinMaxSize) { - ret = "-ERR Argument exceed range \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"; + res_.AppendStringRaw("-ERR Argument exceed range \'" + value + "\' for CONFIG SET 'sync-window-size'\r\n"); return; } g_pika_conf->SetSyncWindowSize(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "slow-cmd-list") { g_pika_conf->SetSlowCmd(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-cache-files") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-files'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-files'\r\n"); return; } std::unordered_map options_map{{"max_open_files", value}}; storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kDB, options_map); if (!s.ok()) { - ret = "-ERR Set max-cache-files wrong: " + s.ToString() + "\r\n"; + res_.AppendStringRaw("-ERR Set max-cache-files wrong: " + s.ToString() + "\r\n"); return; } g_pika_conf->SetMaxCacheFiles(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-background-compactions") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-background-compactions'\r\n"; + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-background-compactions'\r\n"); return; } std::unordered_map options_map{{"max_background_compactions", value}}; storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kDB, options_map); if (!s.ok()) { - ret = "-ERR Set max-background-compactions wrong: " + s.ToString() + "\r\n"; + res_.AppendStringRaw("-ERR Set max-background-compactions wrong: " + s.ToString() + "\r\n"); return; } g_pika_conf->SetMaxBackgroudCompactions(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-background-jobs") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-background-jobs'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-background-jobs'\r\n"); return; } std::unordered_map options_map{{"max_background_jobs", value}}; storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kDB, options_map); if (!s.ok()) { - ret = "-ERR Set max-background-jobs wrong: " + s.ToString() + "\r\n"; + res_.AppendStringRaw("-ERR Set max-background-jobs wrong: " + s.ToString() + "\r\n"); return; } g_pika_conf->SetMaxBackgroudJobs(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "write-buffer-size") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'write-buffer-size'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'write-buffer-size'\r\n"); return; } std::unordered_map options_map{{"write_buffer_size", value}}; storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); if (!s.ok()) { - ret = "-ERR Set write-buffer-size wrong: " + s.ToString() + "\r\n"; + res_.AppendStringRaw("-ERR Set write-buffer-size wrong: " + s.ToString() + "\r\n"); return; } g_pika_conf->SetWriteBufferSize(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-write-buffer-num") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-write-buffer-number'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-write-buffer-number'\r\n"); return; } std::unordered_map options_map{{"max_write_buffer_number", value}}; storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); if (!s.ok()) { - ret = "-ERR Set max-write-buffer-number wrong: " + s.ToString() + "\r\n"; + res_.AppendStringRaw("-ERR Set max-write-buffer-number wrong: " + s.ToString() + "\r\n"); return; } g_pika_conf->SetMaxWriteBufferNumber(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "arena-block-size") { if (pstd::string2int(value.data(), value.size(), &ival) == 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'arena-block-size'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'arena-block-size'\r\n"); return; } std::unordered_map options_map{{"arena_block_size", value}}; storage::Status s = g_pika_server->RewriteStorageOptions(storage::OptionType::kColumnFamily, options_map); if (!s.ok()) { - ret = "-ERR Set arena-block-size wrong: " + s.ToString() + "\r\n"; + res_.AppendStringRaw( "-ERR Set arena-block-size wrong: " + s.ToString() + "\r\n"); return; } g_pika_conf->SetArenaBlockSize(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "throttle-bytes-per-second") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n"); return; } g_pika_conf->SetThrottleBytesPerSecond(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-rsync-parallel-num") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival > kMaxRsyncParallelNum) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-rsync-parallel-num'\r\n"; + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-rsync-parallel-num'\r\n"); return; } g_pika_conf->SetMaxRsyncParallelNum(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "cache-num") { if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-num'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'cache-num'\r\n"); return; } @@ -2524,20 +2526,20 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { g_pika_conf->SetCacheNum(cache_num); g_pika_server->ResetCacheAsync(cache_num, db); } - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "cache-model") { if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-model'\r\n"; + res_.AppendStringRaw( "-ERR Invalid argument " + value + " for CONFIG SET 'cache-model'\r\n"); return; } if (PIKA_CACHE_NONE > ival || PIKA_CACHE_READ < ival) { - ret = "-ERR Invalid cache model\r\n"; + res_.AppendStringRaw("-ERR Invalid cache model\r\n"); } else { g_pika_conf->SetCacheModel(ival); if (PIKA_CACHE_NONE == ival) { g_pika_server->ClearCacheDbAsync(db); } - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } } else if (set_item == "cache-type") { pstd::StringToLower(value); @@ -2548,19 +2550,19 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { pstd::StringSplit(type_str, COMMA, types); for (auto& type : types) { if (available_types.find(type) == available_types.end()) { - ret = "-ERR Invalid cache type: " + type + "\r\n"; + res_.AppendStringRaw("-ERR Invalid cache type: " + type + "\r\n"); return; } } g_pika_conf->SetCacheType(value); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "zset-cache-start-direction") { if (!pstd::string2int(value.data(), value.size(), &ival)) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'zset-cache-start-direction'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'zset-cache-start-direction'\r\n"); return; } if (ival != CACHE_START_FROM_BEGIN && ival != CACHE_START_FROM_END) { - ret = "-ERR Invalid zset-cache-start-direction\r\n"; + res_.AppendStringRaw("-ERR Invalid zset-cache-start-direction\r\n"); return; } auto origin_start_pos = g_pika_conf->zset_cache_start_pos(); @@ -2568,69 +2570,69 @@ void ConfigCmd::ConfigSet(std::string& ret, std::shared_ptr db) { g_pika_conf->SetCacheStartPos(ival); g_pika_server->OnCacheStartPosChanged(ival, db); } - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "zset-cache-field-num-per-key") { if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'zset-cache-field-num-per-key'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'zset-cache-field-num-per-key'\r\n"); return; } g_pika_conf->SetCacheItemsPerKey(ival); g_pika_server->ResetCacheConfig(db); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "cache-maxmemory") { if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-maxmemory'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'cache-maxmemory'\r\n"); return; } int64_t cache_maxmemory = (PIKA_CACHE_SIZE_MIN > ival) ? PIKA_CACHE_SIZE_DEFAULT : ival; g_pika_conf->SetCacheMaxmemory(cache_maxmemory); g_pika_server->ResetCacheConfig(db); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "cache-maxmemory-policy") { if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-maxmemory-policy'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'cache-maxmemory-policy'\r\n"); return; } int cache_maxmemory_policy_ = (ival < 0|| ival > 5) ? 3 : ival; // default allkeys-lru g_pika_conf->SetCacheMaxmemoryPolicy(cache_maxmemory_policy_); g_pika_server->ResetCacheConfig(db); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "cache-maxmemory-samples") { if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-maxmemory-samples'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'cache-maxmemory-samples'\r\n"); return; } int cache_maxmemory_samples = (ival > 1) ? 5 : ival; g_pika_conf->SetCacheMaxmemorySamples(cache_maxmemory_samples); g_pika_server->ResetCacheConfig(db); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "cache-lfu-decay-time") { if (!pstd::string2int(value.data(), value.size(), &ival) || ival < 0) { - ret = "-ERR Invalid argument " + value + " for CONFIG SET 'cache-lfu-decay-time'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument " + value + " for CONFIG SET 'cache-lfu-decay-time'\r\n"); return; } int cache_lfu_decay_time = (ival < 0) ? 1 : ival; g_pika_conf->SetCacheLFUDecayTime(cache_lfu_decay_time); g_pika_server->ResetCacheConfig(db); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "acl-pubsub-default") { std::string v(value); pstd::StringToLower(v); if (v != "allchannels" && v != "resetchannels") { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'acl-pubsub-default'\r\n"; + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'acl-pubsub-default'\r\n"); return; } g_pika_conf->SetAclPubsubDefault(v); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "acllog-max-len") { if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival < 0) { - ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'acllog-max-len'\r\n"; + res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'acllog-max-len'\r\n"); return; } g_pika_conf->SetAclLogMaxLen(static_cast(ival)); - ret = "+OK\r\n"; + res_.AppendStringRaw("+OK\r\n"); } else { - ret = "-ERR Unsupported CONFIG parameter: " + set_item + "\r\n"; + res_.AppendStringRaw("-ERR Unsupported CONFIG parameter: " + set_item + "\r\n"); } } diff --git a/src/storage/src/custom_comparator.h b/src/storage/src/custom_comparator.h index 07648ef8d2..f0ea9dc045 100644 --- a/src/storage/src/custom_comparator.h +++ b/src/storage/src/custom_comparator.h @@ -99,15 +99,23 @@ class ZSetsScoreKeyComparatorImpl : public rocksdb::Comparator { auto b_size = static_cast(b.size()); int32_t key_a_len = DecodeFixed32(ptr_a); int32_t key_b_len = DecodeFixed32(ptr_b); - rocksdb::Slice key_a_prefix(ptr_a, key_a_len + 2 * sizeof(int32_t)); - rocksdb::Slice key_b_prefix(ptr_b, key_b_len + 2 * sizeof(int32_t)); - ptr_a += key_a_len + 2 * sizeof(int32_t); - ptr_b += key_b_len + 2 * sizeof(int32_t); + rocksdb::Slice key_a_prefix(ptr_a, key_a_len + sizeof(int32_t)); + rocksdb::Slice key_b_prefix(ptr_b, key_b_len + sizeof(int32_t)); + ptr_a += key_a_len + sizeof(int32_t); + ptr_b += key_b_len + sizeof(int32_t); int ret = key_a_prefix.compare(key_b_prefix); if (ret) { return ret; } + int32_t version_a = DecodeFixed32(ptr_a); + int32_t version_b = DecodeFixed32(ptr_b); + if (version_a != version_b) { + return version_a < version_b ? -1 : 1; + } + ptr_a += sizeof(int32_t); + ptr_b += sizeof(int32_t); + uint64_t a_i = DecodeFixed64(ptr_a); uint64_t b_i = DecodeFixed64(ptr_b); const void* ptr_a_score = reinterpret_cast(&a_i); diff --git a/src/storage/tests/custom_comparator_test.cc b/src/storage/tests/custom_comparator_test.cc index 9661d2619b..05b472e73e 100644 --- a/src/storage/tests/custom_comparator_test.cc +++ b/src/storage/tests/custom_comparator_test.cc @@ -143,6 +143,13 @@ TEST(ZSetScoreKeyComparator, FindShortestSeparatorTest) { // printf("**********************************************************************\n"); ASSERT_TRUE(impl.Compare(change_start_9, start_9) >= 0); ASSERT_TRUE(impl.Compare(change_start_9, limit_9) < 0); + + // ***************** Group 10 Test ***************** + ZSetsScoreKey zsets_score_key_start_10("Axlgrep", 1557212502, 3.1415, "abc"); + ZSetsScoreKey zsets_score_key_limit_10("Axlgrep", 1557212752, 3.1415, "abc"); + std::string start_10 = zsets_score_key_start_10.Encode().ToString(); + std::string limit_10 = zsets_score_key_limit_10.Encode().ToString(); + ASSERT_TRUE(impl.Compare(start_10, limit_10) < 0); } int main(int argc, char** argv) {